flow.record.adapter.elastic
¶
Module Contents¶
Classes¶
Attributes¶
- flow.record.adapter.elastic.HAS_ELASTIC = True¶
- flow.record.adapter.elastic.__usage__ = Multiline-String¶
Show Value
""" ElasticSearch adapter --- Write usage: rdump -w elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX] Read usage: rdump elastic+[PROTOCOL]://[IP]:[PORT]?index=[INDEX] [IP]:[PORT]: ip and port to elastic host [PROTOCOL]: http or https. Defaults to https when "+[PROTOCOL]" is omitted Optional arguments: [API_KEY]: base64 encoded api key to authenticate with (default: False) [QUEUE_SIZE]: maximum queue size for writing records; limits memory usage (default: 100000) [INDEX]: name of the index to use (default: records) [VERIFY_CERTS]: verify certs of Elasticsearch instance (default: True) [HASH_RECORD]: make record unique by hashing record [slow] (default: False) [REQUEST_TIMEOUT]: maximum duration in seconds for a request to Elastic (default: 30) [MAX_RETRIES]: maximum retries before a record is marked as failed (default: 3) [_META_*]: record metadata fields (default: None) """
- flow.record.adapter.elastic.log¶
- class flow.record.adapter.elastic.ElasticWriter(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, hash_record: str | bool = False, api_key: str | None = None, queue_size: int = 100000, request_timeout: int = 30, max_retries: int = 3, **kwargs)¶
Bases:
flow.record.adapter.AbstractWriter
- index = 'records'¶
- uri¶
- hash_record = False¶
- queue: queue.Queue[flow.record.base.Record | StopIteration]¶
- event¶
- exception: Exception | None = None¶
- es¶
- json_packer¶
- thread¶
- metadata_fields¶
- excepthook(exc: threading.ExceptHookArgs, *args, **kwargs) None ¶
- record_to_document(record: flow.record.base.Record, index: str) dict ¶
Convert a record to a Elasticsearch compatible document dictionary
- document_stream() collections.abc.Iterator[dict] ¶
Generator of record documents on the Queue
- streaming_bulk_thread() None ¶
Thread that streams the documents to ES via the bulk api.
- write(record: flow.record.base.Record) None ¶
Write a record.
- flush() None ¶
Flush any buffered writes.
- close() None ¶
Close the Writer, no more writes will be possible.
- class flow.record.adapter.elastic.ElasticReader(uri: str, index: str = 'records', verify_certs: str | bool = True, http_compress: str | bool = True, selector: None | flow.record.selector.Selector | flow.record.selector.CompiledSelector = None, api_key: str | None = None, request_timeout: int = 30, max_retries: int = 3, **kwargs)¶
Bases:
flow.record.adapter.AbstractReader
- index = 'records'¶
- uri¶
- selector = None¶
- es¶
- __iter__() collections.abc.Iterator[flow.record.base.Record] ¶
Return a record iterator.
- close() None ¶
Close the Reader, can be overriden to properly free resources.