dissect.target.tools.dump.utils#

Module Contents#

Classes#

Compression

Generic enumeration.

Serialization

Generic enumeration.

JsonLinesWriter

SortedKeysJsonRecordPacker

Functions#

get_nested_attr

get_sink_dir_by_target

get_sink_dir_by_func

slugify_descriptor_name

get_sink_filename

Return a sink filename for provided record descriptor, serialization

get_relative_sink_path

Return a sink path relative to an output directory.

open_path

Open path using mode, with specified compression and return a file object

get_sink_writer

cached_sink_writers

get_current_utc_time

parse_datetime_iso

Attributes#

dissect.target.tools.dump.utils.HAS_LZ4 = True#
dissect.target.tools.dump.utils.HAS_ZSTD = True#
dissect.target.tools.dump.utils.log#
class dissect.target.tools.dump.utils.Compression#

Bases: enum.Enum

Generic enumeration.

Derive from this class to define new enumerations.

BZIP2 = 'bzip2'#
GZIP = 'gzip'#
LZ4 = 'lz4'#
ZSTD = 'zstandard'#
NONE#
class dissect.target.tools.dump.utils.Serialization#

Bases: enum.Enum

Generic enumeration.

Derive from this class to define new enumerations.

JSONLINES = 'jsonlines'#
MSGPACK = 'msgpack'#
dissect.target.tools.dump.utils.COMPRESSION_TO_EXT#
dissect.target.tools.dump.utils.DEST_DIR_CACHE_SIZE = 10#
dissect.target.tools.dump.utils.DEST_FILENAME_CACHE_SIZE = 10#
dissect.target.tools.dump.utils.OPEN_WRITERS_LIMIT = 10#
dissect.target.tools.dump.utils.get_nested_attr(obj: Any, nested_attr: str) Any#
dissect.target.tools.dump.utils.get_sink_dir_by_target(target: dissect.target.Target, function: str) pathlib.Path#
dissect.target.tools.dump.utils.get_sink_dir_by_func(target: dissect.target.Target, function: str) pathlib.Path#
dissect.target.tools.dump.utils.slugify_descriptor_name(descriptor_name: str) str#
dissect.target.tools.dump.utils.get_sink_filename(record_descriptor: flow.record.RecordDescriptor, serialization: Serialization, compression: Compression | None = None) str#

Return a sink filename for provided record descriptor, serialization and compression.

dissect.target.tools.dump.utils.get_relative_sink_path(element, serialization, compression=None)#

Return a sink path relative to an output directory.

dissect.target.tools.dump.utils.open_path(path: pathlib.Path, mode: str, compression: Compression | None = None) BinaryIO#

Open path using mode, with specified compression and return a file object

class dissect.target.tools.dump.utils.JsonLinesWriter(fp, **kwargs)#

Bases: flow.record.adapter.jsonfile.JsonfileWriter

flush()#

Flush any buffered writes.

close()#

Close the Writer, no more writes will be possible.

class dissect.target.tools.dump.utils.SortedKeysJsonRecordPacker(indent=None, pack_descriptors=True)#

Bases: flow.record.jsonpacker.JsonRecordPacker

pack(obj)#
dissect.target.tools.dump.utils.SERIALIZERS#
dissect.target.tools.dump.utils.get_sink_writer(full_sink_path: pathlib.Path, serialization: Serialization, compression: Compression | None = None, new_sink: bool = True) flow.record.adapter.jsonfile.JsonfileWriter | flow.record.RecordStreamWriter#
dissect.target.tools.dump.utils.cached_sink_writers(state) Iterator[Callable]#
dissect.target.tools.dump.utils.get_current_utc_time() datetime.datetime#
dissect.target.tools.dump.utils.parse_datetime_iso(datetime_str: str) datetime.datetime#