flow.record.base
¶
Module Contents¶
Classes¶
GroupedRecord acts like a normal Record, but can contain multiple records. |
|
Built-in mutable sequence. |
|
Record Descriptor class for defining a Record type and its fields. |
|
Functions¶
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope |
|
Context manager to temporarily ignore fields for comparison. |
|
Open |
|
Return the FieldType class for the given field type class path. |
|
Create a newly merged RecordDescriptor from a list of RecordDescriptors. |
|
Extend |
|
Returns a normalized version of |
|
Yields timestamped annotated records for each datetime fieldtype in record. |
Attributes¶
- flow.record.base.HAS_LZ4 = True¶
- flow.record.base.HAS_BZ2 = True¶
- flow.record.base.HAS_ZSTD = True¶
- flow.record.base.HAS_AVRO = True¶
- flow.record.base.log¶
- flow.record.base.RECORD_VERSION = 1¶
- flow.record.base.RESERVED_FIELDS¶
- flow.record.base.GZIP_MAGIC = b'\x1f\x8b'¶
- flow.record.base.BZ2_MAGIC = b'BZh'¶
- flow.record.base.LZ4_MAGIC = b'\x04"M\x18'¶
- flow.record.base.ZSTD_MAGIC = b'(\xb5/\xfd'¶
- flow.record.base.AVRO_MAGIC = b'Obj'¶
- flow.record.base.RECORDSTREAM_MAGIC = b'RECORDSTREAM\n'¶
- flow.record.base.RECORDSTREAM_MAGIC_DEPTH¶
- flow.record.base.RE_VALID_FIELD_NAME¶
- flow.record.base.RE_VALID_RECORD_TYPE_NAME¶
- flow.record.base.RECORD_CLASS_TEMPLATE = Multiline-String¶
Show Value
""" class {name}(Record): _desc = None _field_types = {field_types} __slots__ = {slots_tuple} def __init__(__self, {args}): {init_code} @classmethod def _unpack(__cls, {args}): {unpack_code} """
- flow.record.base.IGNORE_FIELDS_FOR_COMPARISON¶
- flow.record.base.set_ignored_fields_for_comparison(ignored_fields: Iterable[str]) None ¶
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope
- flow.record.base.ignore_fields_for_comparison(ignored_fields: Iterable[str])¶
Context manager to temporarily ignore fields for comparison.
- class flow.record.base.FieldType¶
- classmethod default()¶
Return the default value for the field in the Record template.
- class flow.record.base.Record¶
- __slots__ = ()¶
- __eq__(other)¶
- __setattr__(k, v)¶
Enforce setting the fields to their respective types.
- __hash__() int ¶
- __repr__()¶
- class flow.record.base.GroupedRecord(name, records)¶
Bases:
Record
GroupedRecord acts like a normal Record, but can contain multiple records.
See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail.
- name¶
- records = []¶
- descriptors = []¶
- flat_fields = []¶
- get_record_by_type(type_name)¶
Get record in a GroupedRecord by type_name.
- Parameters:
type_name (str) – The record type name (for example wq/meta).
- Returns:
None or the record
- __repr__()¶
- __setattr__(attr, val)¶
Enforce setting the fields to their respective types.
- __getattr__(attr)¶
- flow.record.base.is_valid_field_name(name, check_reserved=True)¶
- flow.record.base.parse_def(definition)¶
- class flow.record.base.RecordField(name: str, typename: str)¶
- name = None¶
- typename = None¶
- type = None¶
- __repr__()¶
- class flow.record.base.RecordFieldSet¶
Bases:
list
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
- class flow.record.base.RecordDescriptor(name: str, fields: Sequence[tuple[str, str]] | None = None)¶
Record Descriptor class for defining a Record type and its fields.
- name: str = None¶
- recordType: type = None¶
- static get_required_fields() Mapping[str, RecordField] ¶
Get required fields mapping. eg:
- {
“_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “vaeint”),
}
- Returns:
Mapping of required fields
- property fields: Mapping[str, RecordField]¶
Get fields mapping (without required fields). eg:
- {
“foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”),
}
- Returns:
Mapping of Record fields
- get_all_fields() Mapping[str, RecordField] ¶
Get all fields including required meta fields. eg:
- {
“ts”: RecordField(“ts”, “datetime”), “foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”), “_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “varint”),
}
- Returns:
Mapping of all Record fields
- getfields(typename: str) RecordFieldSet ¶
Get fields of a given type.
- Parameters:
typename – The typename of the fields to return. eg: “string” or “datetime”
- Returns:
RecordFieldSet of fields with the given typename
- init_from_dict(rdict: dict[str, Any], raise_unknown=False) Record ¶
Create a new Record initialized with key, value pairs from rdict.
If raise_unknown=True then fields on rdict that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)
- Returns:
Record with data from rdict
- init_from_record(record: Record, raise_unknown=False) Record ¶
Create a new Record initialized with data from another record.
If raise_unknown=True then fields on record that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)
- Returns:
Record with data from record
- extend(fields: Sequence[tuple[str, str]]) RecordDescriptor ¶
Returns a new RecordDescriptor with the extended fields
- Returns:
RecordDescriptor with extended fields
- get_field_tuples() tuple[tuple[str, str]] ¶
Returns a tuple containing the (typename, name) tuples, eg:
((‘boolean’, ‘foo’), (‘string’, ‘bar’))
- Returns:
Tuple of (typename, name) tuples
- static calc_descriptor_hash(name, fields: Sequence[tuple[str, str]]) int ¶
Calculate and return the (cached) descriptor hash as a 32 bit integer.
The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types.
- property descriptor_hash: int¶
Returns the (cached) descriptor hash
- property identifier: tuple[str, int]¶
Returns a tuple containing the descriptor name and hash
- __hash__() int ¶
- __eq__(other: RecordDescriptor) bool ¶
- __repr__() str ¶
- definition(reserved: bool = True) str ¶
Return the RecordDescriptor as Python definition string.
If reserved is True it will also return the reserved fields.
- Returns:
Descriptor definition string
- base(**kwargs_sink)¶
- flow.record.base.DynamicDescriptor(name, fields)¶
- flow.record.base.open_stream(fp: BinaryIO, mode: str) BinaryIO ¶
- flow.record.base.find_adapter_for_stream(fp: BinaryIO) tuple[BinaryIO, str | None] ¶
- flow.record.base.open_path_or_stream(path: str | pathlib.Path | BinaryIO, mode: str, clobber: bool = True) IO ¶
- flow.record.base.open_path(path: str, mode: str, clobber: bool = True) IO ¶
Open
path
usingmode
and returns a file object.It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream.
- Parameters:
path – Filename or path to filename to open
mode – Could be “r”, “rb” to open file for reading, “w”, “wb” for writing
clobber – Overwrite file if it already exists if clobber=True, else raises IOError.
- flow.record.base.RecordAdapter(url: str | None = None, out: bool = False, selector: str | None = None, clobber: bool = True, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractWriter | flow.record.adapter.AbstractReader ¶
- flow.record.base.RecordReader(url: str | None = None, selector: str | None = None, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractReader ¶
- flow.record.base.RecordWriter(url: str | None = None, clobber: bool = True, **kwargs) flow.record.adapter.AbstractWriter ¶
- flow.record.base.stream(src, dst)¶
- flow.record.base.fieldtype(clspath: str) FieldType ¶
Return the FieldType class for the given field type class path.
- Parameters:
clspath – class path of the field type. eg:
uint32
,net.ipaddress
,string[]
- Returns:
The FieldType class.
- flow.record.base.merge_record_descriptors(descriptors: tuple[RecordDescriptor], replace: bool = False, name: str | None = None) RecordDescriptor ¶
Create a newly merged RecordDescriptor from a list of RecordDescriptors. This function uses a cache to avoid creating the same descriptor multiple times.
Duplicate fields are ignored in
descriptors
unlessreplace=True
.- Parameters:
descriptors – Tuple of RecordDescriptors to merge.
replace – if
True
, it will replace existing field names. Last descriptor always wins.name – rename the RecordDescriptor name to
name
. Otherwise, use name from first descriptor.
- Returns:
Merged RecordDescriptor
- flow.record.base.extend_record(record: Record, other_records: list[Record], replace: bool = False, name: str | None = None) Record ¶
Extend
record
with fields and values fromother_records
.Duplicate fields are ignored in
other_records
unlessreplace=True
.- Parameters:
record – Initial Record to extend.
other_records – List of Records to use for extending/replacing.
replace – if
True
, it will replace existing fields and values inrecord
from fields and values fromother_records
. Last record always wins.name – rename the RecordDescriptor name to
name
. Otherwise, use name from initialrecord
.
- Returns:
Extended Record
- flow.record.base.normalize_fieldname(field_name: str) str ¶
Returns a normalized version of
field_name
.Some (field) names are not allowed in flow.record, while they can be allowed in other formats. This normalizes the name so it can still be used in flow.record. Reserved field_names are not normalized.
>>> normalize_fieldname("my-variable-name-with-dashes") 'my_variable_name_with_dashes' >>> normalize_fieldname("_my_name_starting_with_underscore") 'x__my_name_starting_with_underscore' >>> normalize_fieldname("1337") 'x_1337' >>> normalize_fieldname("my name with spaces") 'my_name_with_spaces' >>> normalize_fieldname("my name (with) parentheses") 'my_name__with__parentheses' >>> normalize_fieldname("_generated") '_generated'
- class flow.record.base.DynamicFieldtypeModule(path='')¶
- path¶
- __getattr__(path)¶
- gettypename()¶
- __call__(*args, **kwargs)¶
- flow.record.base.net¶
- flow.record.base.dynamic_fieldtype¶
- flow.record.base.TimestampRecord¶
- flow.record.base.iter_timestamped_records(record: Record) Iterator[Record] ¶
Yields timestamped annotated records for each datetime fieldtype in record. If record does not have any datetime fields the original record is returned.
- Parameters:
record – Record to add timestamp fields for.
- Yields:
Record annotated with ts and ts_description fields for each datetime fieldtype.