flow.record.base¶
Module Contents¶
Classes¶
GroupedRecord acts like a normal Record, but can contain multiple records. |
|
Built-in mutable sequence. |
|
Record Descriptor class for defining a Record type and its fields. |
|
Functions¶
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope |
|
Context manager to temporarily ignore fields for comparison. |
|
Open |
|
Return the FieldType class for the given field type class path. |
|
Create a newly merged RecordDescriptor from a list of RecordDescriptors. |
|
Extend |
|
Returns a normalized version of |
|
Yields timestamped annotated records for each |
Attributes¶
- flow.record.base.HAS_LZ4 = True¶
- flow.record.base.HAS_BZ2 = True¶
- flow.record.base.HAS_ZSTD = True¶
- flow.record.base.HAS_AVRO = True¶
- flow.record.base.log¶
- flow.record.base.RECORD_VERSION = 1¶
- flow.record.base.RESERVED_FIELDS¶
- flow.record.base.GZIP_MAGIC = b'\x1f\x8b'¶
- flow.record.base.BZ2_MAGIC = b'BZh'¶
- flow.record.base.LZ4_MAGIC = b'\x04"M\x18'¶
- flow.record.base.ZSTD_MAGIC = b'(\xb5/\xfd'¶
- flow.record.base.AVRO_MAGIC = b'Obj'¶
- flow.record.base.RECORDSTREAM_MAGIC = b'RECORDSTREAM\n'¶
- flow.record.base.RECORDSTREAM_MAGIC_DEPTH = 19¶
- flow.record.base.RE_VALID_FIELD_NAME¶
- flow.record.base.RE_VALID_RECORD_TYPE_NAME¶
- flow.record.base.RECORD_CLASS_TEMPLATE = Multiline-String¶
Show Value
""" class {name}(Record): _desc = None _field_types = {field_types} __slots__ = {slots_tuple} def __init__(__self, {args}): {init_code} @classmethod def _unpack(__cls, {args}): {unpack_code} """
- flow.record.base.IGNORE_FIELDS_FOR_COMPARISON¶
- flow.record.base.set_ignored_fields_for_comparison(ignored_fields: collections.abc.Iterator[str]) None¶
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope
- flow.record.base.ignore_fields_for_comparison(ignored_fields: collections.abc.Iterator[str]) collections.abc.Iterator[None]¶
Context manager to temporarily ignore fields for comparison.
- class flow.record.base.FieldType¶
- classmethod default() None¶
Return the default value for the field in the Record template.
- class flow.record.base.Record¶
- __slots__ = ()¶
- __eq__(other: object) bool¶
- __getattr__(name: str) Any¶
- __setattr__(k: str, v: Any) None¶
Enforce setting the fields to their respective types.
- __hash__() int¶
- __repr__() str¶
- class flow.record.base.GroupedRecord(name: str, records: list[Record | GroupedRecord])¶
Bases:
RecordGroupedRecord acts like a normal Record, but can contain multiple records.
See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail.
- name¶
- records = []¶
- descriptors = []¶
- flat_fields = []¶
- get_record_by_type(type_name: str) Record | None¶
Get record in a GroupedRecord by type_name.
- Parameters:
type_name (str) – The record type name (for example wq/meta).
- Returns:
None or the record
- __repr__() str¶
- __setattr__(attr: str, val: Any) None¶
Enforce setting the fields to their respective types.
- __getattr__(attr: str) Any¶
- flow.record.base.is_valid_field_name(name: str, check_reserved: bool = True) bool¶
- flow.record.base.parse_def(definition: str) tuple[str, list[tuple[str, str]]]¶
- class flow.record.base.RecordField(name: str, typename: str)¶
- name = None¶
- typename = None¶
- type = None¶
- __repr__()¶
- class flow.record.base.RecordFieldSet¶
Bases:
listBuilt-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
- class flow.record.base.RecordDescriptor(name: str, fields: collections.abc.Sequence[tuple[str, str]] | None = None)¶
Record Descriptor class for defining a Record type and its fields.
- name: str = None¶
- recordType: type = None¶
- static get_required_fields() collections.abc.Mapping[str, RecordField]¶
Get required fields mapping. eg:
{ "_source": RecordField("_source", "string"), "_classification": RecordField("_classification", "datetime"), "_generated": RecordField("_generated", "datetime"), "_version": RecordField("_version", "vaeint"), }- Returns:
Mapping of required fields
- property fields: collections.abc.Mapping[str, RecordField]¶
Get fields mapping (without required fields). eg:
{ "foo": RecordField("foo", "string"), "bar": RecordField("bar", "varint"), }- Returns:
Mapping of Record fields
- get_all_fields() collections.abc.Mapping[str, RecordField]¶
Get all fields including required meta fields. eg:
{ "ts": RecordField("ts", "datetime"), "foo": RecordField("foo", "string"), "bar": RecordField("bar", "varint"), "_source": RecordField("_source", "string"), "_classification": RecordField("_classification", "datetime"), "_generated": RecordField("_generated", "datetime"), "_version": RecordField("_version", "varint"), }- Returns:
Mapping of all Record fields
- getfields(typename: str) RecordFieldSet¶
Get fields of a given type.
- Parameters:
typename – The typename of the fields to return. eg: “string” or “datetime”
- Returns:
RecordFieldSet of fields with the given typename
- init_from_dict(rdict: dict[str, Any], raise_unknown: bool = False) Record¶
Create a new Record initialized with key, value pairs from
rdict.If
raise_unknown=Truethen fields onrdictthat are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)- Returns:
Record with data from
rdict
- init_from_record(record: Record, raise_unknown: bool = False) Record¶
Create a new Record initialized with data from another
record.If
raise_unknown=Truethen fields onrecordthat are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)- Returns:
Record with data from
record
- extend(fields: collections.abc.Sequence[tuple[str, str]]) RecordDescriptor¶
Returns a new RecordDescriptor with the extended fields
- Returns:
RecordDescriptor with extended fields
- get_field_tuples() tuple[tuple[str, str]]¶
Returns a tuple containing the (typename, name) tuples, eg:
(('boolean', 'foo'), ('string', 'bar'))- Returns:
Tuple of (typename, name) tuples
- static calc_descriptor_hash(name: str, fields: collections.abc.Sequence[tuple[str, str]]) int¶
Calculate and return the (cached) descriptor hash as a 32 bit integer.
The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types.
- property descriptor_hash: int¶
Returns the (cached) descriptor hash
- property identifier: tuple[str, int]¶
Returns a tuple containing the descriptor name and hash
- __hash__() int¶
- __eq__(other: RecordDescriptor) bool¶
- __repr__() str¶
- definition(reserved: bool = True) str¶
Return the RecordDescriptor as Python definition string.
If
reservedis True it will also return the reserved fields.- Returns:
Descriptor definition string
- flow.record.base.DynamicDescriptor(name: str, fields: list[str]) RecordDescriptor¶
- flow.record.base.open_stream(fp: BinaryIO, mode: str) BinaryIO¶
- flow.record.base.find_adapter_for_stream(fp: BinaryIO) tuple[BinaryIO, str | None]¶
- flow.record.base.open_path_or_stream(path: str | pathlib.Path | BinaryIO, mode: str, clobber: bool = True) IO¶
- flow.record.base.open_path(path: str, mode: str, clobber: bool = True) IO¶
Open
pathusingmodeand returns a file object.It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream.
- Parameters:
path – Filename or path to filename to open
mode – Could be “r”, “rb” to open file for reading, “w”, “wb” for writing
clobber – Overwrite file if it already exists if
clobber=True, else raises IOError.
- flow.record.base.RecordAdapter(url: str | None = None, out: bool = False, selector: str | None = None, clobber: bool = True, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractWriter | flow.record.adapter.AbstractReader¶
- flow.record.base.RecordReader(url: str | None = None, selector: str | None = None, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractReader¶
- flow.record.base.RecordWriter(url: str | None = None, clobber: bool = True, **kwargs) flow.record.adapter.AbstractWriter¶
- flow.record.base.stream(src: flow.record.adapter.AbstractReader, dst: flow.record.adapter.AbstractWriter) None¶
- flow.record.base.fieldtype(clspath: str) FieldType¶
Return the FieldType class for the given field type class path.
- Parameters:
clspath – class path of the field type. eg:
uint32,net.ipaddress,string[]- Returns:
The FieldType class.
- flow.record.base.merge_record_descriptors(descriptors: tuple[RecordDescriptor], replace: bool = False, name: str | None = None) RecordDescriptor¶
Create a newly merged RecordDescriptor from a list of RecordDescriptors. This function uses a cache to avoid creating the same descriptor multiple times.
Duplicate fields are ignored in
descriptorsunlessreplace=True.- Parameters:
descriptors – Tuple of RecordDescriptors to merge.
replace – if
True, it will replace existing field names. Last descriptor always wins.name – rename the RecordDescriptor name to
name. Otherwise, use name from first descriptor.
- Returns:
Merged RecordDescriptor
- flow.record.base.extend_record(record: Record, other_records: list[Record], replace: bool = False, name: str | None = None) Record¶
Extend
recordwith fields and values fromother_records.Duplicate fields are ignored in
other_recordsunlessreplace=True.- Parameters:
record – Initial Record to extend.
other_records – List of Records to use for extending/replacing.
replace – if
True, it will replace existing fields and values inrecordfrom fields and values fromother_records. Last record always wins.name – rename the RecordDescriptor name to
name. Otherwise, use name from initialrecord.
- Returns:
Extended Record
- flow.record.base.normalize_fieldname(field_name: str) str¶
Returns a normalized version of
field_name.Some (field) names are not allowed in flow.record, while they can be allowed in other formats. This normalizes the name so it can still be used in flow.record. Reserved field_names are not normalized.
>>> normalize_fieldname("my-variable-name-with-dashes") 'my_variable_name_with_dashes' >>> normalize_fieldname("_my_name_starting_with_underscore") 'x__my_name_starting_with_underscore' >>> normalize_fieldname("1337") 'x_1337' >>> normalize_fieldname("my name with spaces") 'my_name_with_spaces' >>> normalize_fieldname("my name (with) parentheses") 'my_name__with__parentheses' >>> normalize_fieldname("_generated") '_generated'
- class flow.record.base.DynamicFieldtypeModule(path: str = '')¶
- path = ''¶
- __getattr__(path: str) DynamicFieldtypeModule¶
- gettypename() str | None¶
- __call__(*args, **kwargs) Any¶
- flow.record.base.net¶
- flow.record.base.dynamic_fieldtype¶
- flow.record.base.TimestampRecord¶
- flow.record.base.iter_timestamped_records(record: Record) collections.abc.Iterator[Record]¶
Yields timestamped annotated records for each
datetimefieldtype inrecord. Ifrecorddoes not have anydatetimefields the original record is returned.- Parameters:
record – Record to add timestamp fields for.
- Yields:
Record annotated with
tsandts_descriptionfields for eachdatetimefieldtype.