flow.record.adapter.elastic

Module Contents

Classes

Attributes

flow.record.adapter.elastic.HAS_ELASTIC = True
flow.record.adapter.elastic.__usage__ = Multiline-String
Show Value
"""
ElasticSearch adapter
---
Write usage: rdump -w elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
Read usage: rdump elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX]
[IP]:[PORT]: ip and port to elastic host
[PROTOCOL]: http or https. Defaults to https when "+[PROTOCOL]" is omitted

Optional arguments:
  [API_KEY]: base64 encoded api key to authenticate with (default: False)
  [QUEUE_SIZE]: maximum queue size for writing records; limits memory usage (default: 100000)
  [INDEX]: name of the index to use (default: records)
  [VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True)
  [HASH_RECORD]: make record unique by hashing record [slow] (default: False)
  [REQUEST_TIMEOUT]: maximum duration in seconds for a request to Elastic (default: 30)
  [MAX_RETRIES]: maximum retries before a record is marked as failed (default: 3)
  [_META_*]: record metadata fields (default: None)
"""
flow.record.adapter.elastic.log
class flow.record.adapter.elastic.ElasticWriter(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, hash_record: str | bool = False, api_key: str | None = None, queue_size: int = 100000, request_timeout: int = 30, max_retries: int = 3, **kwargs)

Bases: flow.record.adapter.AbstractWriter

index = 'records'
uri
hash_record = False
queue: queue.Queue[flow.record.base.Record | StopIteration]
event
exception: Exception | None = None
es
json_packer
thread
metadata_fields
excepthook(exc: threading.ExceptHookArgs, *args, **kwargs) None
record_to_document(record: flow.record.base.Record, index: str) dict

Convert a record to a Elasticsearch compatible document dictionary

document_stream() collections.abc.Iterator[dict]

Generator of record documents on the Queue

streaming_bulk_thread() None

Thread that streams the documents to ES via the bulk api.

Resources:
write(record: flow.record.base.Record) None

Write a record.

flush() None

Flush any buffered writes.

close() None

Close the Writer, no more writes will be possible.

class flow.record.adapter.elastic.ElasticReader(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, selector: None | flow.record.selector.Selector | flow.record.selector.CompiledSelector = None, api_key: str | None = None, request_timeout: int = 30, max_retries: int = 3, **kwargs)

Bases: flow.record.adapter.AbstractReader

index = 'records'
uri
selector = None
es
__iter__() collections.abc.Iterator[flow.record.base.Record]

Return a record iterator.

close() None

Close the Reader, can be overriden to properly free resources.