flow.record.adapter.elastic#

Module Contents#

Classes#

Attributes#

flow.record.adapter.elastic.__usage__ = Multiline-String#
Show Value
"""
ElasticSearch adapter
---
Write usage: rdump -w elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
Read usage: rdump elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
[IP]:[PORT]: ip and port to elastic host
[PROTOCOL]: http or https. Defaults to https when "+[PROTOCOL]" is omitted

Optional arguments:
  [INDEX]: name of the index to use (default: records)
  [VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True)
  [HASH_RECORD]: make record unique by hashing record [slow] (default: False)
"""
flow.record.adapter.elastic.log#
class flow.record.adapter.elastic.ElasticWriter(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, hash_record: str | bool = False, **kwargs)#

Bases: flow.record.adapter.AbstractWriter

record_to_document(record: flow.record.base.Record, index: str) dict#

Convert a record to a Elasticsearch compatible document dictionary

document_stream() Iterator[dict]#

Generator of record documents on the Queue

streaming_bulk_thread() None#

Thread that streams the documents to ES via the bulk api

write(record: flow.record.base.Record) None#

Write a record.

flush() None#

Flush any buffered writes.

close() None#

Close the Writer, no more writes will be possible.

class flow.record.adapter.elastic.ElasticReader(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, selector: None | flow.record.selector.Selector | flow.record.selector.CompiledSelector = None, **kwargs)#

Bases: flow.record.adapter.AbstractReader

__iter__() Iterator[flow.record.base.Record]#

Return a record iterator.

close() None#

Close the Reader, can be overriden to properly free resources.