Module Contents



flow.record.adapter.elastic.__usage__ = Multiline-String
Show Value
ElasticSearch adapter
Write usage: rdump -w elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
Read usage: rdump elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
[IP]:[PORT]: ip and port to elastic host
[PROTOCOL]: http or https. Defaults to https when "+[PROTOCOL]" is omitted

Optional arguments:
  [INDEX]: name of the index to use (default: records)
  [VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True)
  [HASH_RECORD]: make record unique by hashing record [slow] (default: False)
class flow.record.adapter.elastic.ElasticWriter(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, hash_record: str | bool = False, **kwargs)

Bases: flow.record.adapter.AbstractWriter

record_to_document(record: flow.record.base.Record, index: str) dict

Convert a record to a Elasticsearch compatible document dictionary

document_stream() Iterator[dict]

Generator of record documents on the Queue

streaming_bulk_thread() None

Thread that streams the documents to ES via the bulk api

write(record: flow.record.base.Record) None

Write a record.

flush() None

Flush any buffered writes.

close() None

Close the Writer, no more writes will be possible.

class flow.record.adapter.elastic.ElasticReader(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, selector: None | flow.record.selector.Selector | flow.record.selector.CompiledSelector = None, **kwargs)

Bases: flow.record.adapter.AbstractReader

__iter__() Iterator[flow.record.base.Record]

Return a record iterator.

close() None

Close the Reader, can be overriden to properly free resources.