dissect.target.plugins.scrape.scrape

Module Contents

Classes

ScrapePlugin

Base class for plugins.

class dissect.target.plugins.scrape.scrape.ScrapePlugin(target: dissect.target.Target)

Bases: dissect.target.plugin.Plugin

Base class for plugins.

Plugins can optionally be namespaced by specifying the __namespace__ class attribute. Namespacing results in your plugin needing to be prefixed with this namespace when being called. For example, if your plugin has specified test as namespace and a function called example, you must call your plugin with test.example.

A Plugin class has the following private class attributes:

  • __namespace__

  • __record_descriptors__

With the following two being assigned in register():

  • __functions__

  • __exports__

Additionally, the methods and attributes of Plugin receive more private attributes by using decorators.

The export() decorator adds the following private attributes

  • __exported__

  • __output__: Set with the export() decorator.

  • __record__: Set with the export() decorator.

The internal() decorator and InternalPlugin set the __internal__ attribute. Finally. args() decorator sets the __args__ attribute.

The alias() decorator populates the __aliases__ private attribute of Plugin methods. Resulting clones of the Plugin are populated with the boolean __alias__ attribute set to True.

Parameters:

target – The Target object to load the plugin for.

__namespace__ = 'scrape'

Defines the plugin namespace.

check_compatible() None

Perform a compatibility check with the target.

This function should return None if the plugin is compatible with the current target (self.target). For example, check if a certain file exists. Otherwise it should raise an UnsupportedPluginError.

Raises:

UnsupportedPluginError – If the plugin could not be loaded.

create_streams(*, encrypted: bool = True, lvm: bool = True, all: bool = False) collections.abc.Iterator[tuple[dissect.target.container.Container | dissect.target.volume.Volume, dissect.util.stream.MappingStream]]

Yields streams for all disks and volumes of a target.

At the basis, each disk of a target is represented as a stream of itself. If the target has volumes, these are checked to be either encrypted or LVM volumes (if the respective flags are set). If so, that volume is replaced in the disk stream with a stream of the volume itself, or removed.

In the case of encrypted volumes, the encrypted volume is replaced with the decrypted volume. This is done transparently on the disk stream level, so the consumer does not need to worry about the decryption process.

In the case of LVM volumes (including any sort of RAID), the base volumes are removed from the disk stream, and a new stream will be yielded for each logical volume. This ensures that no unnecessary scraping is performed on base volumes of a logical volume, but only on the reconstructed logical volume.

Parameters:
  • encrypted – Whether to replace encrypted volumes for their decrypted counterpart.

  • lvm – Whether to skip base volumes of LVM volumes and yield the logical volume itself.

  • all – Whether to yield all disks and volumes, regardless of their type.

Yields:

A tuple containing the disk (or volume, in the case of an LVM volume) and a stream of that disk.

find(needles: dissect.target.helpers.scrape.Needle | list[dissect.target.helpers.scrape.Needle], lock_seek: bool = True, block_size: int = io.DEFAULT_BUFFER_SIZE, progress: Callable[[dissect.target.container.Container | dissect.target.volume.Volume, int, int], None] | None = None) collections.abc.Iterator[tuple[dissect.target.container.Container | dissect.target.volume.Volume, dissect.util.stream.MappingStream, dissect.target.helpers.scrape.Needle, int]]

Yields needles and their offsets found in all disks and volumes of a target.

Parameters:
  • needles – The needle or list of needles to search for.

  • lock_seek – Whether the file position is maintained by the scraper or the consumer. Setting this to False wil allow the consumer to seek the file pointer, i.e. to skip forward.

  • block_size – The block size to use for reading from the byte stream.

  • progress – A function to call with the current disk, offset and size of the stream.

scrape_chunks_from_disks(chunk_parser: Callable[[dissect.target.helpers.scrape.Needle, bytes], collections.abc.Iterator[dissect.target.helpers.record.TargetRecordDescriptor]], needle_chunk_size_map: dict[dissect.target.helpers.scrape.Needle, int] | None = None, needle: dissect.target.helpers.scrape.Needle | None = None, chunk_size: int | None = None, chunk_reader: Callable[[BinaryIO, dissect.target.helpers.scrape.Needle, int, int], bytes] | None = None, block_size: int = io.DEFAULT_BUFFER_SIZE) collections.abc.Iterator[dissect.target.helpers.record.TargetRecordDescriptor]

Yields records scraped from chunks found in target.disks.

Parameters:
  • chunk_parser – A function to parse a chunk and yield records.

  • needle_chunk_size_map – A dictionary with needle as keys and chunk sizes as values.

  • needle – A single needle to search for.

  • chunk_size – The size of the chunks to scrape, when providing a single needle.

  • chunk_reader – A function to read a chunk from a byte stream for provided needle, offset and chunk size.

  • block_size – The block size to use for reading from the byte stream.

scrape_needles_from_disks(needle: dissect.target.helpers.scrape.Needle | None = None, needles: list[dissect.target.helpers.scrape.Needle] | None = None, block_size: int = io.DEFAULT_BUFFER_SIZE) collections.abc.Iterator[tuple[BinaryIO, bytes, int]]

Yields (bytestream, needle, offset) tuples, scraped from target.disks.

Parameters:
  • needles – A list of byte needles to search for.

  • needle – A single byte needle to search for.

  • block_size – The block size to use for reading from the byte stream.