flow.record#

Subpackages#

Submodules#

Package Contents#

Classes#

FieldType

GroupedRecord

GroupedRecord acts like a normal Record, but can contain multiple records.

Record

RecordDescriptor

Record Descriptor class for defining a Record type and its fields.

RecordField

JsonRecordPacker

Functions#

DynamicDescriptor

RecordAdapter

RecordReader

RecordWriter

extend_record

Extend record with fields and values from other_records.

iter_timestamped_records

Yields timestamped annotated records for each datetime fieldtype in record.

open_path

Open path using mode and returns a file object.

open_path_or_stream

open_stream

stream

Attributes#

flow.record.RECORD_VERSION = 1#
flow.record.RECORDSTREAM_MAGIC = b'RECORDSTREAM\n'#
flow.record.DynamicDescriptor(name, fields)#
class flow.record.FieldType#
classmethod default()#

Return the default value for the field in the Record template.

class flow.record.GroupedRecord(name, records)#

Bases: Record

GroupedRecord acts like a normal Record, but can contain multiple records.

See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail.

get_record_by_type(type_name)#

Get record in a GroupedRecord by type_name.

Parameters:

type_name (str) – The record type name (for example wq/meta).

Returns:

None or the record

__repr__()#

Return repr(self).

__setattr__(attr, val)#

Enforce setting the fields to their respective types.

__getattr__(attr)#
class flow.record.Record#
__slots__ = ()#
__eq__(other)#

Return self==value.

__setattr__(k, v)#

Enforce setting the fields to their respective types.

__repr__()#

Return repr(self).

flow.record.RecordAdapter(url: str | None = None, out: bool = False, selector: str | None = None, clobber: bool = True, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractWriter | flow.record.adapter.AbstractReader#
class flow.record.RecordDescriptor(name: str, fields: Sequence[tuple[str, str]] | None = None)#

Record Descriptor class for defining a Record type and its fields.

property fields: Mapping[str, RecordField]#

Get fields mapping (without required fields). eg:

{

“foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”),

}

Returns:

Mapping of Record fields

property descriptor_hash: int#

Returns the (cached) descriptor hash

property identifier: tuple[str, int]#

Returns a tuple containing the descriptor name and hash

name: str#
recordType: type#
static get_required_fields() Mapping[str, RecordField]#

Get required fields mapping. eg:

{

“_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “vaeint”),

}

Returns:

Mapping of required fields

get_all_fields() Mapping[str, RecordField]#

Get all fields including required meta fields. eg:

{

“ts”: RecordField(“ts”, “datetime”), “foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”), “_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “vaeint”),

}

Returns:

Mapping of all Record fields

getfields(typename: str) RecordFieldSet#

Get fields of a given type.

Parameters:

typename – The typename of the fields to return. eg: “string” or “datetime”

Returns:

RecordFieldSet of fields with the given typename

__call__(*args, **kwargs) Record#

Create a new Record initialized with args and kwargs.

init_from_dict(rdict: dict[str, Any], raise_unknown=False) Record#

Create a new Record initialized with key, value pairs from rdict.

If raise_unknown=True then fields on rdict that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)

Returns:

Record with data from rdict

init_from_record(record: Record, raise_unknown=False) Record#

Create a new Record initialized with data from another record.

If raise_unknown=True then fields on record that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)

Returns:

Record with data from record

extend(fields: Sequence[tuple[str, str]]) RecordDescriptor#

Returns a new RecordDescriptor with the extended fields

Returns:

RecordDescriptor with extended fields

get_field_tuples() tuple[tuple[str, str]]#

Returns a tuple containing the (typename, name) tuples, eg:

((‘boolean’, ‘foo’), (‘string’, ‘bar’))

Returns:

Tuple of (typename, name) tuples

static calc_descriptor_hash(name, fields: Sequence[tuple[str, str]]) int#

Calculate and return the (cached) descriptor hash as a 32 bit integer.

The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types.

__hash__() int#

Return hash(self).

__eq__(other: RecordDescriptor) bool#

Return self==value.

__repr__() str#

Return repr(self).

definition(reserved: bool = True) str#

Return the RecordDescriptor as Python definition string.

If reserved is True it will also return the reserved fields.

Returns:

Descriptor definition string

base(**kwargs_sink)#
exception flow.record.RecordDescriptorError#

Bases: Exception

Raised when there is an error constructing a record descriptor

class flow.record.RecordField(name: str, typename: str)#
name#
typename#
type#
__repr__()#

Return repr(self).

flow.record.RecordReader(url: str | None = None, selector: str | None = None, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractReader#
flow.record.RecordWriter(url: str | None = None, clobber: bool = True, **kwargs) flow.record.adapter.AbstractWriter#
flow.record.dynamic_fieldtype#
flow.record.extend_record(record: Record, other_records: list[Record], replace: bool = False, name: str | None = None) Record#

Extend record with fields and values from other_records.

Duplicate fields are ignored in other_records unless replace=True.

Parameters:
  • record – Initial Record to extend.

  • other_records – List of Records to use for extending/replacing.

  • replace – if True, it will replace existing fields and values in record from fields and values from other_records. Last record always wins.

  • name – rename the RecordDescriptor name to name. Otherwise, use name from initial record.

Returns:

Extended Record

flow.record.iter_timestamped_records(record: Record) Iterator[Record]#

Yields timestamped annotated records for each datetime fieldtype in record. If record does not have any datetime fields the original record is returned.

Parameters:

record – Record to add timestamp fields for.

Yields:

Record annotated with ts and ts_description fields for each datetime fieldtype.

flow.record.open_path(path: str, mode: str, clobber: bool = True) IO#

Open path using mode and returns a file object.

It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream.

Parameters:
  • path – Filename or path to filename to open

  • mode – Could be “r”, “rb” to open file for reading, “w”, “wb” for writing

  • clobber – Overwrite file if it already exists if clobber=True, else raises IOError.

flow.record.open_path_or_stream(path: str | pathlib.Path | BinaryIO, mode: str, clobber: bool = True) IO#
flow.record.open_stream(fp: BinaryIO, mode: str) BinaryIO#
flow.record.stream(src, dst)#
class flow.record.JsonRecordPacker(indent=None, pack_descriptors=True)#
register(desc, notify=False)#
pack_obj(obj)#
unpack_obj(obj)#
pack(obj)#
unpack(d)#