flow.record.base

Module Contents

Classes

FieldType

Record

GroupedRecord

GroupedRecord acts like a normal Record, but can contain multiple records.

RecordField

RecordFieldSet

Built-in mutable sequence.

RecordDescriptor

Record Descriptor class for defining a Record type and its fields.

DynamicFieldtypeModule

Functions

set_ignored_fields_for_comparison

Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope

ignore_fields_for_comparison

Context manager to temporarily ignore fields for comparison.

is_valid_field_name

parse_def

DynamicDescriptor

open_stream

find_adapter_for_stream

open_path_or_stream

open_path

Open path using mode and returns a file object.

RecordAdapter

RecordReader

RecordWriter

stream

fieldtype

Return the FieldType class for the given field type class path.

merge_record_descriptors

Create a newly merged RecordDescriptor from a list of RecordDescriptors.

extend_record

Extend record with fields and values from other_records.

normalize_fieldname

Returns a normalized version of field_name.

iter_timestamped_records

Yields timestamped annotated records for each datetime fieldtype in record.

Attributes

flow.record.base.HAS_LZ4 = True
flow.record.base.HAS_BZ2 = True
flow.record.base.HAS_ZSTD = True
flow.record.base.HAS_AVRO = True
flow.record.base.log
flow.record.base.RECORD_VERSION = 1
flow.record.base.RESERVED_FIELDS
flow.record.base.GZIP_MAGIC = b'\x1f\x8b'
flow.record.base.BZ2_MAGIC = b'BZh'
flow.record.base.LZ4_MAGIC = b'\x04"M\x18'
flow.record.base.ZSTD_MAGIC = b'(\xb5/\xfd'
flow.record.base.AVRO_MAGIC = b'Obj'
flow.record.base.RECORDSTREAM_MAGIC = b'RECORDSTREAM\n'
flow.record.base.RECORDSTREAM_MAGIC_DEPTH = 19
flow.record.base.RE_VALID_FIELD_NAME
flow.record.base.RE_VALID_RECORD_TYPE_NAME
flow.record.base.RECORD_CLASS_TEMPLATE = Multiline-String
Show Value
"""
class {name}(Record):
    _desc = None
    _field_types = {field_types}

    __slots__ = {slots_tuple}

    def __init__(__self, {args}):
{init_code}

    @classmethod
    def _unpack(__cls, {args}):
{unpack_code}
"""
flow.record.base.IGNORE_FIELDS_FOR_COMPARISON
flow.record.base.set_ignored_fields_for_comparison(ignored_fields: collections.abc.Iterator[str]) None

Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope

flow.record.base.ignore_fields_for_comparison(ignored_fields: collections.abc.Iterator[str]) collections.abc.Iterator[None]

Context manager to temporarily ignore fields for comparison.

class flow.record.base.FieldType
classmethod default() None

Return the default value for the field in the Record template.

class flow.record.base.Record
__slots__ = ()
__eq__(other: object) bool
__setattr__(k: str, v: Any) None

Enforce setting the fields to their respective types.

__hash__() int
__repr__() str
class flow.record.base.GroupedRecord(name: str, records: list[Record | GroupedRecord])

Bases: Record

GroupedRecord acts like a normal Record, but can contain multiple records.

See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail.

name
records = []
descriptors = []
flat_fields = []
get_record_by_type(type_name: str) Record | None

Get record in a GroupedRecord by type_name.

Parameters:

type_name (str) – The record type name (for example wq/meta).

Returns:

None or the record

__repr__() str
__setattr__(attr: str, val: Any) None

Enforce setting the fields to their respective types.

__getattr__(attr: str) Any
flow.record.base.is_valid_field_name(name: str, check_reserved: bool = True) bool
flow.record.base.parse_def(definition: str) tuple[str, list[tuple[str, str]]]
class flow.record.base.RecordField(name: str, typename: str)
name = None
typename = None
type = None
__repr__()
class flow.record.base.RecordFieldSet

Bases: list

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

class flow.record.base.RecordDescriptor(name: str, fields: collections.abc.Sequence[tuple[str, str]] | None = None)

Record Descriptor class for defining a Record type and its fields.

name: str = None
recordType: type = None
static get_required_fields() collections.abc.Mapping[str, RecordField]

Get required fields mapping. eg:

{
    "_source": RecordField("_source", "string"),
    "_classification": RecordField("_classification", "datetime"),
    "_generated": RecordField("_generated", "datetime"),
    "_version": RecordField("_version", "vaeint"),
}
Returns:

Mapping of required fields

property fields: collections.abc.Mapping[str, RecordField]

Get fields mapping (without required fields). eg:

{
    "foo": RecordField("foo", "string"),
    "bar": RecordField("bar", "varint"),
}
Returns:

Mapping of Record fields

get_all_fields() collections.abc.Mapping[str, RecordField]

Get all fields including required meta fields. eg:

{
    "ts": RecordField("ts", "datetime"),
    "foo": RecordField("foo", "string"),
    "bar": RecordField("bar", "varint"),
    "_source": RecordField("_source", "string"),
    "_classification": RecordField("_classification", "datetime"),
    "_generated": RecordField("_generated", "datetime"),
    "_version": RecordField("_version", "varint"),
}
Returns:

Mapping of all Record fields

getfields(typename: str) RecordFieldSet

Get fields of a given type.

Parameters:

typename – The typename of the fields to return. eg: “string” or “datetime”

Returns:

RecordFieldSet of fields with the given typename

__call__(*args, **kwargs) Record

Create a new Record initialized with args and kwargs.

init_from_dict(rdict: dict[str, Any], raise_unknown: bool = False) Record

Create a new Record initialized with key, value pairs from rdict.

If raise_unknown=True then fields on rdict that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)

Returns:

Record with data from rdict

init_from_record(record: Record, raise_unknown: bool = False) Record

Create a new Record initialized with data from another record.

If raise_unknown=True then fields on record that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)

Returns:

Record with data from record

extend(fields: collections.abc.Sequence[tuple[str, str]]) RecordDescriptor

Returns a new RecordDescriptor with the extended fields

Returns:

RecordDescriptor with extended fields

get_field_tuples() tuple[tuple[str, str]]

Returns a tuple containing the (typename, name) tuples, eg:

(('boolean', 'foo'), ('string', 'bar'))
Returns:

Tuple of (typename, name) tuples

static calc_descriptor_hash(name: str, fields: collections.abc.Sequence[tuple[str, str]]) int

Calculate and return the (cached) descriptor hash as a 32 bit integer.

The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types.

property descriptor_hash: int

Returns the (cached) descriptor hash

property identifier: tuple[str, int]

Returns a tuple containing the descriptor name and hash

__hash__() int
__eq__(other: RecordDescriptor) bool
__repr__() str
definition(reserved: bool = True) str

Return the RecordDescriptor as Python definition string.

If reserved is True it will also return the reserved fields.

Returns:

Descriptor definition string

base(**kwargs_sink) Callable[Ellipsis, Record]
flow.record.base.DynamicDescriptor(name: str, fields: list[str]) RecordDescriptor
flow.record.base.open_stream(fp: BinaryIO, mode: str) BinaryIO
flow.record.base.find_adapter_for_stream(fp: BinaryIO) tuple[BinaryIO, str | None]
flow.record.base.open_path_or_stream(path: str | pathlib.Path | BinaryIO, mode: str, clobber: bool = True) IO
flow.record.base.open_path(path: str, mode: str, clobber: bool = True) IO

Open path using mode and returns a file object.

It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream.

Parameters:
  • path – Filename or path to filename to open

  • mode – Could be “r”, “rb” to open file for reading, “w”, “wb” for writing

  • clobber – Overwrite file if it already exists if clobber=True, else raises IOError.

flow.record.base.RecordAdapter(url: str | None = None, out: bool = False, selector: str | None = None, clobber: bool = True, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractWriter | flow.record.adapter.AbstractReader
flow.record.base.RecordReader(url: str | None = None, selector: str | None = None, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractReader
flow.record.base.RecordWriter(url: str | None = None, clobber: bool = True, **kwargs) flow.record.adapter.AbstractWriter
flow.record.base.stream(src: flow.record.adapter.AbstractReader, dst: flow.record.adapter.AbstractWriter) None
flow.record.base.fieldtype(clspath: str) FieldType

Return the FieldType class for the given field type class path.

Parameters:

clspath – class path of the field type. eg: uint32, net.ipaddress, string[]

Returns:

The FieldType class.

flow.record.base.merge_record_descriptors(descriptors: tuple[RecordDescriptor], replace: bool = False, name: str | None = None) RecordDescriptor

Create a newly merged RecordDescriptor from a list of RecordDescriptors. This function uses a cache to avoid creating the same descriptor multiple times.

Duplicate fields are ignored in descriptors unless replace=True.

Parameters:
  • descriptors – Tuple of RecordDescriptors to merge.

  • replace – if True, it will replace existing field names. Last descriptor always wins.

  • name – rename the RecordDescriptor name to name. Otherwise, use name from first descriptor.

Returns:

Merged RecordDescriptor

flow.record.base.extend_record(record: Record, other_records: list[Record], replace: bool = False, name: str | None = None) Record

Extend record with fields and values from other_records.

Duplicate fields are ignored in other_records unless replace=True.

Parameters:
  • record – Initial Record to extend.

  • other_records – List of Records to use for extending/replacing.

  • replace – if True, it will replace existing fields and values in record from fields and values from other_records. Last record always wins.

  • name – rename the RecordDescriptor name to name. Otherwise, use name from initial record.

Returns:

Extended Record

flow.record.base.normalize_fieldname(field_name: str) str

Returns a normalized version of field_name.

Some (field) names are not allowed in flow.record, while they can be allowed in other formats. This normalizes the name so it can still be used in flow.record. Reserved field_names are not normalized.

>>> normalize_fieldname("my-variable-name-with-dashes")
'my_variable_name_with_dashes'
>>> normalize_fieldname("_my_name_starting_with_underscore")
'x__my_name_starting_with_underscore'
>>> normalize_fieldname("1337")
'x_1337'
>>> normalize_fieldname("my name with spaces")
'my_name_with_spaces'
>>> normalize_fieldname("my name (with) parentheses")
'my_name__with__parentheses'
>>> normalize_fieldname("_generated")
'_generated'
class flow.record.base.DynamicFieldtypeModule(path: str = '')
path = ''
__getattr__(path: str) DynamicFieldtypeModule
gettypename() str | None
__call__(*args, **kwargs) Any
flow.record.base.net
flow.record.base.dynamic_fieldtype
flow.record.base.TimestampRecord
flow.record.base.iter_timestamped_records(record: Record) collections.abc.Iterator[Record]

Yields timestamped annotated records for each datetime fieldtype in record. If record does not have any datetime fields the original record is returned.

Parameters:

record – Record to add timestamp fields for.

Yields:

Record annotated with ts and ts_description fields for each datetime fieldtype.