:py:mod:`flow.record.stream` ============================ .. py:module:: flow.record.stream Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: flow.record.stream.RecordPrinter flow.record.stream.RecordStreamWriter flow.record.stream.RecordStreamReader flow.record.stream.PathTemplateWriter flow.record.stream.RecordArchiver flow.record.stream.RecordFieldRewriter Functions ~~~~~~~~~ .. autoapisummary:: :nosignatures: flow.record.stream.RecordOutput flow.record.stream.record_stream Attributes ~~~~~~~~~~ .. autoapisummary:: flow.record.stream.log flow.record.stream.aRepr .. py:data:: log .. py:data:: aRepr .. py:function:: RecordOutput(fp: IO) -> RecordPrinter | RecordStreamWriter Return a RecordPrinter if `fp` is a tty otherwise a RecordStreamWriter. .. py:class:: RecordPrinter(fp: BinaryIO, flush: bool = True) Records are printed as textual representation (repr) to fp. .. py:attribute:: fp :value: None .. py:attribute:: auto_flush :value: True .. py:method:: write(obj: flow.record.base.Record) -> None .. py:method:: flush() -> None .. py:method:: close() -> None .. py:class:: RecordStreamWriter(fp: BinaryIO) Records are written as binary (serialized) to fp. .. py:attribute:: fp :value: None .. py:attribute:: packer :value: None .. py:attribute:: header_written :value: False .. py:method:: __del__() -> None .. py:method:: on_new_descriptor(descriptor: flow.record.base.RecordDescriptor) -> None .. py:method:: close() -> None .. py:method:: flush() -> None .. py:method:: write(obj: flow.record.base.Record | flow.record.base.RecordDescriptor) -> None .. py:method:: writeheader() -> None .. py:class:: RecordStreamReader(fp: BinaryIO, selector: str | None = None) Bases: :py:obj:`flow.record.adapter.AbstractReader` .. py:attribute:: fp :value: None .. py:attribute:: recordtype :value: None .. py:attribute:: descs :value: None .. py:attribute:: packer :value: None .. py:attribute:: closed :value: False .. py:attribute:: selector :value: None .. py:method:: readheader() -> None .. py:method:: read() -> flow.record.base.Record | flow.record.base.RecordDescriptor .. py:method:: close() -> None Close the Reader, can be overriden to properly free resources. .. py:method:: __iter__() -> collections.abc.Iterator[flow.record.base.Record] Return a record iterator. .. py:function:: record_stream(sources: list[str], selector: str | None = None) -> collections.abc.Iterator[flow.record.base.Record] Return a Record stream generator from the given Record sources. If there are multiple sources, exceptions are caught and logged, and the stream continues with the next source. .. py:class:: PathTemplateWriter(path_template: str | None = None, name: str | None = None) Write records to a path on disk, path can be a template string. This allows for archiving records on disk based on timestamp for example. Default template string is: '{name}-{record._generated:%Y%m%dT%H}.records.gz' Available template fields: `name` defaults to "records", but can be overridden in the initializer. `record` is the record object `ts` is record._generated If the destination path already exists it will rename the existing file using the current datetime. .. py:attribute:: DEFAULT_TEMPLATE :value: '{name}-{record._generated:%Y%m%dT%H}.records.gz' .. py:attribute:: path_template :value: '{name}-{record._generated:%Y%m%dT%H}.records.gz' .. py:attribute:: name :value: 'records' .. py:attribute:: current_path :value: None .. py:attribute:: writer :value: None .. py:attribute:: stream :value: None .. py:method:: rotate_existing_file(path: pathlib.Path) -> None .. py:method:: record_stream_for_path(path: str) -> flow.record.adapter.AbstractWriter .. py:method:: write(record: flow.record.base.Record) -> None .. py:method:: close() -> None .. py:class:: RecordArchiver(archive_path: str, path_template: str | None = None, name: str | None = None) Bases: :py:obj:`PathTemplateWriter` RecordWriter that writes/archives records to a path with YYYY/mm/dd. .. py:class:: RecordFieldRewriter(fields: list[str] | None = None, exclude: list[str] | None = None, expression: str | None = None) Rewrite records using a new RecordDescriptor for chosen fields and/or excluded or new record fields. .. py:attribute:: fields :value: [] .. py:attribute:: exclude :value: [] .. py:attribute:: expression .. py:attribute:: record_descriptor_for_fields .. py:method:: rewrite(record: flow.record.base.Record) -> flow.record.base.Record