flow.record.stream#

Module Contents#

Classes#

RecordPrinter

Records are printed as textual representation (repr) to fp.

RecordStreamWriter

Records are written as binary (serialized) to fp.

RecordStreamReader

PathTemplateWriter

Write records to a path on disk, path can be a template string.

RecordArchiver

RecordWriter that writes/archives records to a path with YYYY/mm/dd.

RecordFieldRewriter

Rewrite records using a new RecordDescriptor for chosen fields and/or excluded or new record fields.

Functions#

RecordOutput

Return a RecordPrinter if fp is a tty otherwise a RecordStreamWriter.

record_stream

Return a Record stream generator from the given Record sources.

Attributes#

flow.record.stream.log#
flow.record.stream.aRepr#
flow.record.stream.RecordOutput(fp)#

Return a RecordPrinter if fp is a tty otherwise a RecordStreamWriter.

class flow.record.stream.RecordPrinter(fp, flush=True)#

Records are printed as textual representation (repr) to fp.

fp#
write(obj)#
flush()#
close()#
class flow.record.stream.RecordStreamWriter(fp)#

Records are written as binary (serialized) to fp.

fp#
packer#
__del__()#
on_new_descriptor(descriptor)#
close()#
flush()#
write(obj)#
writeheader()#
class flow.record.stream.RecordStreamReader(fp, selector=None)#
fp#
recordtype#
descs#
packer#
readheader()#
read()#
close()#
__iter__()#
flow.record.stream.record_stream(sources, selector=None)#

Return a Record stream generator from the given Record sources.

Exceptions in a Record source will be caught so the stream is not interrupted.

class flow.record.stream.PathTemplateWriter(path_template=None, name=None)#

Write records to a path on disk, path can be a template string.

This allows for archiving records on disk based on timestamp for example.

Default template string is:

‘{name}-{record._generated:%Y%m%dT%H}.records.gz’

Available template fields:

name defaults to “records”, but can be overridden in the initializer. record is the record object ts is record._generated

If the destination path already exists it will rename the existing file using the current datetime.

DEFAULT_TEMPLATE = '{name}-{record._generated:%Y%m%dT%H}.records.gz'#
rotate_existing_file(path)#
record_stream_for_path(path)#
write(record)#
close()#
class flow.record.stream.RecordArchiver(archive_path, path_template=None, name=None)#

Bases: PathTemplateWriter

RecordWriter that writes/archives records to a path with YYYY/mm/dd.

class flow.record.stream.RecordFieldRewriter(fields=None, exclude=None, expression=None)#

Rewrite records using a new RecordDescriptor for chosen fields and/or excluded or new record fields.

record_descriptor_for_fields(descriptor, fields=None, exclude=None, new_fields=None)#
rewrite(record)#