flow.record
¶
Subpackages¶
flow.record.adapter
flow.record.adapter.archive
flow.record.adapter.avro
flow.record.adapter.broker
flow.record.adapter.csvfile
flow.record.adapter.duckdb
flow.record.adapter.elastic
flow.record.adapter.jsonfile
flow.record.adapter.line
flow.record.adapter.mongo
flow.record.adapter.split
flow.record.adapter.splunk
flow.record.adapter.sqlite
flow.record.adapter.stream
flow.record.adapter.text
flow.record.adapter.xlsx
flow.record.fieldtypes
flow.record.tools
Submodules¶
Package Contents¶
Classes¶
GroupedRecord acts like a normal Record, but can contain multiple records. |
|
Record Descriptor class for defining a Record type and its fields. |
|
Functions¶
Extend |
|
Context manager to temporarily ignore fields for comparison. |
|
Yields timestamped annotated records for each datetime fieldtype in record. |
|
Open |
|
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope |
|
Attributes¶
- flow.record.IGNORE_FIELDS_FOR_COMPARISON¶
- flow.record.RECORD_VERSION = 1¶
- flow.record.RECORDSTREAM_MAGIC = b'RECORDSTREAM\n'¶
- flow.record.DynamicDescriptor(name, fields)¶
- class flow.record.FieldType¶
- classmethod default()¶
Return the default value for the field in the Record template.
- class flow.record.GroupedRecord(name, records)¶
Bases:
Record
GroupedRecord acts like a normal Record, but can contain multiple records.
See it as a flat Record view on top of multiple Records. If two Records have the same fieldname, the first one will prevail.
- name¶
- records = []¶
- descriptors = []¶
- flat_fields = []¶
- get_record_by_type(type_name)¶
Get record in a GroupedRecord by type_name.
- Parameters:
type_name (str) – The record type name (for example wq/meta).
- Returns:
None or the record
- __repr__()¶
- __setattr__(attr, val)¶
Enforce setting the fields to their respective types.
- __getattr__(attr)¶
- class flow.record.Record¶
- __slots__ = ()¶
- __eq__(other)¶
- __setattr__(k, v)¶
Enforce setting the fields to their respective types.
- __hash__() int ¶
- __repr__()¶
- flow.record.RecordAdapter(url: str | None = None, out: bool = False, selector: str | None = None, clobber: bool = True, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractWriter | flow.record.adapter.AbstractReader ¶
- class flow.record.RecordDescriptor(name: str, fields: Sequence[tuple[str, str]] | None = None)¶
Record Descriptor class for defining a Record type and its fields.
- name: str = None¶
- recordType: type = None¶
- static get_required_fields() Mapping[str, RecordField] ¶
Get required fields mapping. eg:
- {
“_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “vaeint”),
}
- Returns:
Mapping of required fields
- property fields: Mapping[str, RecordField]¶
Get fields mapping (without required fields). eg:
- {
“foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”),
}
- Returns:
Mapping of Record fields
- get_all_fields() Mapping[str, RecordField] ¶
Get all fields including required meta fields. eg:
- {
“ts”: RecordField(“ts”, “datetime”), “foo”: RecordField(“foo”, “string”), “bar”: RecordField(“bar”, “varint”), “_source”: RecordField(“_source”, “string”), “_classification”: RecordField(“_classification”, “datetime”), “_generated”: RecordField(“_generated”, “datetime”), “_version”: RecordField(“_version”, “varint”),
}
- Returns:
Mapping of all Record fields
- getfields(typename: str) RecordFieldSet ¶
Get fields of a given type.
- Parameters:
typename – The typename of the fields to return. eg: “string” or “datetime”
- Returns:
RecordFieldSet of fields with the given typename
- init_from_dict(rdict: dict[str, Any], raise_unknown=False) Record ¶
Create a new Record initialized with key, value pairs from rdict.
If raise_unknown=True then fields on rdict that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)
- Returns:
Record with data from rdict
- init_from_record(record: Record, raise_unknown=False) Record ¶
Create a new Record initialized with data from another record.
If raise_unknown=True then fields on record that are unknown to this RecordDescriptor will raise a TypeError exception due to initializing with unknown keyword arguments. (default: False)
- Returns:
Record with data from record
- extend(fields: Sequence[tuple[str, str]]) RecordDescriptor ¶
Returns a new RecordDescriptor with the extended fields
- Returns:
RecordDescriptor with extended fields
- get_field_tuples() tuple[tuple[str, str]] ¶
Returns a tuple containing the (typename, name) tuples, eg:
((‘boolean’, ‘foo’), (‘string’, ‘bar’))
- Returns:
Tuple of (typename, name) tuples
- static calc_descriptor_hash(name, fields: Sequence[tuple[str, str]]) int ¶
Calculate and return the (cached) descriptor hash as a 32 bit integer.
The descriptor hash is the first 4 bytes of the sha256sum of the descriptor name and field names and types.
- property descriptor_hash: int¶
Returns the (cached) descriptor hash
- property identifier: tuple[str, int]¶
Returns a tuple containing the descriptor name and hash
- __hash__() int ¶
- __eq__(other: RecordDescriptor) bool ¶
- __repr__() str ¶
- definition(reserved: bool = True) str ¶
Return the RecordDescriptor as Python definition string.
If reserved is True it will also return the reserved fields.
- Returns:
Descriptor definition string
- base(**kwargs_sink)¶
- exception flow.record.RecordDescriptorError¶
Bases:
Exception
Raised when there is an error constructing a record descriptor
- class flow.record.RecordField(name: str, typename: str)¶
- name = None¶
- typename = None¶
- type = None¶
- __repr__()¶
- flow.record.RecordReader(url: str | None = None, selector: str | None = None, fileobj: BinaryIO | None = None, **kwargs) flow.record.adapter.AbstractReader ¶
- flow.record.RecordWriter(url: str | None = None, clobber: bool = True, **kwargs) flow.record.adapter.AbstractWriter ¶
- flow.record.dynamic_fieldtype¶
- flow.record.extend_record(record: Record, other_records: list[Record], replace: bool = False, name: str | None = None) Record ¶
Extend
record
with fields and values fromother_records
.Duplicate fields are ignored in
other_records
unlessreplace=True
.- Parameters:
record – Initial Record to extend.
other_records – List of Records to use for extending/replacing.
replace – if
True
, it will replace existing fields and values inrecord
from fields and values fromother_records
. Last record always wins.name – rename the RecordDescriptor name to
name
. Otherwise, use name from initialrecord
.
- Returns:
Extended Record
- flow.record.ignore_fields_for_comparison(ignored_fields: Iterable[str])¶
Context manager to temporarily ignore fields for comparison.
- flow.record.iter_timestamped_records(record: Record) Iterator[Record] ¶
Yields timestamped annotated records for each datetime fieldtype in record. If record does not have any datetime fields the original record is returned.
- Parameters:
record – Record to add timestamp fields for.
- Yields:
Record annotated with ts and ts_description fields for each datetime fieldtype.
- flow.record.open_path(path: str, mode: str, clobber: bool = True) IO ¶
Open
path
usingmode
and returns a file object.It handles special cases if path is meant to be stdin or stdout. And also supports compression based on extension or file header of stream.
- Parameters:
path – Filename or path to filename to open
mode – Could be “r”, “rb” to open file for reading, “w”, “wb” for writing
clobber – Overwrite file if it already exists if clobber=True, else raises IOError.
- flow.record.open_path_or_stream(path: str | pathlib.Path | BinaryIO, mode: str, clobber: bool = True) IO ¶
- flow.record.open_stream(fp: BinaryIO, mode: str) BinaryIO ¶
- flow.record.set_ignored_fields_for_comparison(ignored_fields: Iterable[str]) None ¶
Can be used to update the IGNORE_FIELDS_FOR_COMPARISON from outside the flow.record package scope
- flow.record.stream(src, dst)¶