Source code for cle.memory

import bisect
import struct
from mmap import mmap
from typing import List, Tuple, Union

import archinfo

__all__ = ("ClemoryBase", "Clemory", "ClemoryView", "ClemoryTranslator", "UninitializedClemory")


[docs]class ClemoryBase: __slots__ = ("_arch", "_pointer")
[docs] def __init__(self, arch): self._arch = arch self._pointer = 0
def __getitem__(self, k): raise NotImplementedError def __setitem__(self, k, v): raise NotImplementedError def __contains__(self, k): raise NotImplementedError
[docs] def load(self, addr, n): raise NotImplementedError
[docs] def store(self, addr, data): raise NotImplementedError
[docs] def backers(self, addr=0): raise NotImplementedError
[docs] def find(self, data, search_min=None, search_max=None): raise NotImplementedError
[docs] def unpack(self, addr, fmt): """ Use the ``struct`` module to unpack the data at address `addr` with the format `fmt`. """ try: start, backer = next(self.backers(addr)) except StopIteration: raise KeyError(addr) # pylint: disable=raise-missing-from if start > addr: raise KeyError(addr) try: return struct.unpack_from(fmt, backer, addr - start) except struct.error as e: if len(backer) - (addr - start) >= struct.calcsize(fmt): raise e raise KeyError(addr) # pylint: disable=raise-missing-from
[docs] def unpack_word(self, addr, size=None, signed=False, endness=None): """ Use the ``struct`` module to unpack a single integer from the address `addr`. You may override any of the attributes of the word being extracted: :param int size: The size in bytes to pack/unpack. Defaults to wordsize (e.g. 4 bytes on a 32 bit architecture) :param bool signed: Whether the data should be extracted signed/unsigned. Default unsigned :param archinfo.Endness endness: The endian to use in packing/unpacking. Defaults to memory endness """ if size is not None and size > 8: # support larger wordsizes via recursive algorithm subsize = size >> 1 if size != subsize << 1: raise ValueError("Cannot unpack non-power-of-two sizes") if endness is None: endness = self._arch.memory_endness if endness == archinfo.Endness.BE: lo_off, hi_off = subsize, 0 elif endness == archinfo.Endness.LE: lo_off, hi_off = 0, subsize else: raise ValueError("Unsupported endness value %s." % endness) lo = self.unpack_word(addr + lo_off, size=subsize, signed=False, endness=endness) hi = self.unpack_word(addr + hi_off, size=subsize, signed=signed, endness=endness) return (hi << (subsize << 3)) | lo return self.unpack(addr, self._arch.struct_fmt(size=size, signed=signed, endness=endness))[0]
[docs] def pack(self, addr, fmt, *data): """ Use the ``struct`` module to pack `data` into memory at address `addr` with the format `fmt`. """ try: start, backer = next(self.backers(addr)) except StopIteration: raise KeyError(addr) # pylint: disable=raise-missing-from if start > addr: raise KeyError(addr) # pylint: disable=raise-missing-from try: return struct.pack_into(fmt, backer, addr - start, *data) except struct.error as e: if len(backer) - (addr - start) >= struct.calcsize(fmt): raise e raise KeyError(addr) # pylint: disable=raise-missing-from
[docs] def pack_word(self, addr, data, size=None, signed=False, endness=None): """ Use the ``struct`` module to pack a single integer `data` into memory at the address `addr`. You may override any of the attributes of the word being packed: :param int size: The size in bytes to pack/unpack. Defaults to wordsize (e.g. 4 bytes on a 32 bit architecture) :param bool signed: Whether the data should be extracted signed/unsigned. Default unsigned :param archinfo.Endness endness: The endian to use in packing/unpacking. Defaults to memory endness """ if not signed: data &= (1 << (size * 8 if size is not None else self._arch.bits)) - 1 return self.pack(addr, self._arch.struct_fmt(size=size, signed=signed, endness=endness), data)
[docs] def read(self, nbytes): """ The stream-like function that reads up to a number of bytes starting from the current position and updates the current position. Use with :func:`seek`. Up to `nbytes` bytes will be read, halting at the beginning of the first unmapped region encountered. """ try: out = self.load(self._pointer, nbytes) except KeyError: return b"" else: self._pointer += len(out) return out
[docs] def seek(self, value): """ The stream-like function that sets the "file's" current position. Use with :func:`read()`. :param value: The position to seek to. """ self._pointer = value
[docs] def tell(self): return self._pointer
[docs] def close(self): # pylint: disable=no-self-use pass
[docs]class Clemory(ClemoryBase): """ An object representing a memory space. Accesses can be made with [index] notation. """ __slots__ = ("_backers", "_root", "consecutive", "min_addr", "max_addr")
[docs] def __init__(self, arch, root=False): super().__init__(arch) self._backers: List[Tuple[int, Union[bytearray, Clemory, List[int]]]] = [] self._root = root self.consecutive = True self.min_addr = 0 self.max_addr = 0
[docs] def add_backer(self, start, data, overwrite=False): """ Adds a backer to the memory. :param start: The address where the backer should be loaded. :param data: The backer itself. Can be either a bytestring or another :class:`Clemory`. :param overwrite: If True and the range overlaps any existing backer, the existing backer will be split up and the overlapping part will be replaced with the new backer. """ if not data: raise ValueError("Backer is empty!") if not isinstance(data, (bytes, bytearray, list, Clemory, mmap)): raise TypeError("Data must be a bytes, list, or Clemory object.") if overwrite: if isinstance(data, Clemory): raise TypeError("Cannot perform an overwrite-add with a Clemory") self.split_backer(start) self.split_backer(start + len(data)) try: self.remove_backer(start) except ValueError: pass try: existing, _ = next(self.backers(start + len(data))) except StopIteration: pass else: if existing < start + len(data): self.remove_backer(existing) else: try: existing, _ = next(self.backers(start)) except StopIteration: pass else: if existing <= start: raise ValueError("Address %#x is already backed!" % start) if isinstance(data, Clemory) and data._root: raise ValueError("Cannot add a root clemory as a backer!") if isinstance(data, bytes): data = bytearray(data) bisect.insort(self._backers, (start, data)) self._update_min_max()
[docs] def split_backer(self, addr): """ Ensures that ``addr`` is the start of a backer, if it is backed. """ try: start_addr, backer = next(self.backers(addr)) except StopIteration: return if addr <= start_addr: return if isinstance(backer, ClemoryBase): raise ValueError("Cannot split a backer which is itself a clemory") if addr >= start_addr + len(backer): return self.remove_backer(start_addr) self.add_backer(start_addr, backer[: addr - start_addr]) self.add_backer(addr, backer[addr - start_addr :])
def __repr__(self) -> str: return f"<{self.__class__.__name__} [{hex(self.min_addr)}:{hex(self.max_addr)}]>"
[docs] def update_backer(self, start, data): if not isinstance(data, (bytes, list, Clemory)): raise TypeError("Data must be a bytes, list, or Clemory object.") if isinstance(data, bytes): data = bytearray(data) for i, (oldstart, _) in enumerate(self._backers): if oldstart == start: self._backers[i] = (start, data) break else: raise ValueError("Can't find backer to update") self._update_min_max()
[docs] def remove_backer(self, start): for i, (oldstart, _) in enumerate(self._backers): if oldstart == start: self._backers.pop(i) break else: raise ValueError("Can't find backer to remove") self._update_min_max()
def __iter__(self): for start, string in self._backers: if isinstance(string, (bytes, list)): for x in range(len(string)): yield start + x else: for x in string: yield start + x def __getitem__(self, k): for start, data in self._backers: if isinstance(data, (bytearray, list)): if 0 <= k - start < len(data): return data[k - start] elif isinstance(data, Clemory): if data.min_addr <= k - start < data.max_addr: try: return data[k - start] except KeyError: pass raise KeyError(k) def __setitem__(self, k, v): for start, data in self._backers: if isinstance(data, (bytearray, list)): if 0 <= k - start < len(data): data[k - start] = v return elif isinstance(data, Clemory): if data.min_addr <= k - start < data.max_addr: try: data[k - start] = v return except KeyError: pass raise KeyError(k) def __contains__(self, k): # Fast path if self.consecutive: return self.min_addr <= k < self.max_addr else: # Check if this is an empty Clemory instance if not self._backers: return False # Check if it is out of the memory range if k < self.min_addr or k >= self.max_addr: return False try: self.__getitem__(k) except KeyError: return False else: return True def __getstate__(self): s = { "_arch": self._arch, "_backers": self._backers, "_pointer": self._pointer, "_root": self._root, "consecutive": self.consecutive, "min_addr": self.min_addr, "max_addr": self.max_addr, } return s def __setstate__(self, s): self._arch = s["_arch"] self._backers = s["_backers"] self._pointer = s["_pointer"] self._root = s["_root"] self.consecutive = s["consecutive"] self.min_addr = s["min_addr"] self.max_addr = s["max_addr"]
[docs] def backers(self, addr=0): """ Iterate through each backer for this clemory and all its children, yielding tuples of ``(start_addr, backer)`` where each backer is a bytearray. :param addr: An optional starting address - all backers before and not including this address will be skipped. """ started = False for start, backer in self._backers: if not started: end = start + backer.max_addr if isinstance(backer, Clemory) else start + len(backer) if addr >= end: continue started = True if isinstance(backer, Clemory): for s, b in backer.backers(addr - start): yield s + start, b else: yield start, backer
[docs] def load(self, addr, n): """ Read up to `n` bytes at address `addr` in memory and return a bytes object. Reading will stop at the beginning of the first unallocated region found, or when `n` bytes have been read. """ views = [] for start, backer in self.backers(addr): if start > addr: break offset = addr - start if not views and offset + n < len(backer): return bytes(memoryview(backer)[offset : offset + n]) size = len(backer) - offset views.append(memoryview(backer)[offset : offset + n]) addr += size n -= size if n <= 0: break if not views: raise KeyError(addr) return b"".join(views)
[docs] def store(self, addr, data): """ Write bytes from `data` at address `addr`. Note: If the store runs off the end of a backer and into unbacked space, this function will update the backer but also raise ``KeyError``. """ for start, backer in self.backers(addr): if start > addr: raise KeyError(addr) offset = addr - start size = len(backer) - offset backer[offset : offset + len(data)] = data if len(data) <= size else data[:size] addr += size data = data[size:] if not data: break if data: raise KeyError(addr)
[docs] def find(self, data, search_min=None, search_max=None): """ Find all occurances of a bytestring in memory. :param bytes data: The bytestring to search for :param int search_min: Optional: The first address to include as valid :param int search_max: Optional: The last address to include as valid :return Iterator[int]: Iterates over addresses at which the bytestring occurs """ if search_min is None: search_min = self.min_addr if search_max is None: search_max = self.max_addr for start, backer in self._backers: if isinstance(backer, Clemory): if search_max < backer.min_addr + start or search_min > backer.max_addr + start: continue yield from (addr + start for addr in backer.find(data, search_min - start, search_max - start)) elif isinstance(backer, list): raise TypeError("find is not supported for list-backed clemories") else: if search_max < start or search_min > start + len(data): continue ptr = search_min - start - 1 while True: ptr += 1 ptr = backer.find(data, max(0, ptr)) if ptr == -1 or ptr + len(data) > search_max - start - 1: break yield ptr + start
def _update_min_max(self): """ Update the three properties of Clemory: consecutive, min_addr, and max_addr. """ is_consecutive = True next_start = None min_addr, max_addr = None, None for start, backer in self._backers: if min_addr is None: min_addr = start if next_start is not None: # Check the predicted start equals to the real one if next_start != start: is_consecutive = False if isinstance(backer, (bytearray, list, mmap)): backer_length = len(backer) # Update max_addr if max_addr is None or start + backer_length > max_addr: max_addr = start + backer_length # Update the predicted starting address next_start = start + backer_length elif isinstance(backer, Clemory): if backer.max_addr is not None and backer.min_addr is not None: # Update max_addr if max_addr is None or start + backer.max_addr > max_addr: max_addr = start + backer.max_addr if backer.min_addr > 0: is_consecutive = False # Update the predicted starting address next_start = start + backer.max_addr if not backer.consecutive: is_consecutive = False else: raise TypeError("Unsupported backer type %s." % type(backer)) self.consecutive = is_consecutive self.min_addr = min_addr self.max_addr = max_addr
[docs]class ClemoryView(ClemoryBase):
[docs] def __init__(self, backer, start, end, offset=0): """ A Clemory which presents a subset of another Clemory as an address space :param backer: The parent clemory to use :param start: The address in the parent to start at :param end: The address in the parent to end at (exclusive) :param offset: Where the address space should start in this Clemory. Default 0. """ super().__init__(backer._arch) self._backer = backer self._start = start self._end = end self._offset = offset self._endoffset = offset + (end - start) self._rebase = self._start - self._offset
def __getitem__(self, k): if not self._offset <= k < self._endoffset: raise KeyError(k) return self._backer[k + self._rebase] def __setitem__(self, k, v): if not self._offset <= k < self._endoffset: raise KeyError(k) return self._backer[k + self._rebase] def __contains__(self, k): if not self._offset <= k < self._endoffset: raise KeyError(k) return k + self._rebase in self._backer
[docs] def backers(self, addr=0): for oaddr, backer in self._backer.backers(addr=addr + self._rebase): taddr = oaddr - self._rebase if self._offset <= taddr < self._endoffset and self._offset <= taddr + len(backer) - 1 < self._endoffset: yield taddr, backer elif taddr >= self._endoffset or taddr + len(backer) - 1 < self._offset: continue else: # clamp it via a memoryview view = memoryview(backer) if taddr + len(backer) - 1 >= self._endoffset: clamp_end = len(backer) - self._endoffset + taddr else: clamp_end = len(backer) if taddr < self._offset: clamp_start = self._offset - taddr else: clamp_start = 0 yield taddr, view[clamp_start:clamp_end]
[docs] def load(self, addr, n): if n == 0: return b"" if not self._offset <= addr < self._endoffset: raise KeyError(addr) if not self._offset <= addr + n - 1 < self._endoffset: raise KeyError(addr + n - 1) return self._backer.load(addr + self._rebase, n)
[docs] def store(self, addr, data): if not data: return if not self._offset <= addr < self._endoffset: raise KeyError(addr) if not self._offset <= addr + len(data) - 1 < self._endoffset: raise KeyError(addr + len(data) - 1) self._backer.store(addr + self._rebase, data)
[docs] def find(self, data, search_min=None, search_max=None): if search_min is None or search_min < self._start: search_min = self._start if search_max is None or search_max > self._end: search_max = self._end return self._backer.find(data, search_min=search_min + self._rebase, search_max=search_max + self._rebase)
[docs]class ClemoryTranslator(ClemoryBase): """ Uses a function to translate between address spaces when accessing a child clemory. Intended to be used only as a stream object. """
[docs] def __init__(self, backer: ClemoryBase, func): super().__init__(backer._arch) self.backer = backer self.func = func
def __getitem__(self, k): return self.backer[self.func(k)] def __setitem__(self, k, v): self.backer[self.func(k)] = v def __contains__(self, k): return self.func(k) in self.backer
[docs] def load(self, addr, n): return self.backer.load(self.func(addr), n)
[docs] def store(self, addr, data): self.backer.store(self.func(addr), data)
[docs] def backers(self, addr=0): raise TypeError("Cannot access backers through address translation")
[docs] def find(self, data, search_min=None, search_max=None): raise TypeError("Cannot perform finds through address translation")
[docs]class UninitializedClemory(Clemory): """ A special kind of Clemory that acts as a placeholder for uninitialized and invalid memory. This is needed for the PAGEZERO segment for MachO binaries, which is 4GB worth of memory This does _not_ handle data being written to it, this is only for uninitialized memory that is technically occupied but should never be accessed """
[docs] def __init__(self, arch, size): super().__init__(arch, root=False) self.max_addr = size
[docs] def add_backer(self, start, data, overwrite=False): raise ValueError("Cannot add backers to an uninitialized clemory")
[docs] def split_backer(self, addr): raise ValueError("This is an uninitialized clemory, it cannot be split")
[docs] def update_backer(self, start, data): raise ValueError("This is an uninitialized clemory, it cannot be updated")
[docs] def remove_backer(self, start): raise ValueError("This is an uninitialized clemory, backers cannot be removed")
[docs] def backers(self, addr=0): """ Technically this object has no real backer We could create a fake backer on demand, but that would be a waste of memory, and code like the function prolog discovery for MachO binaries would search 4GB worth of nullbytes for a prolog, which is a waste of time Instead we just return an empty byte array, which seems to pass the test cases :param addr: :return: """ return [(0, b"")]
[docs] def load(self, addr, n): return b"\x00" * n
[docs] def store(self, addr, data): raise ValueError()
[docs] def find(self, data, search_min=None, search_max=None): """ The memory has no value, so matter what is searched for, it won't be found. :param data: :param search_min: :param search_max: :return: """ return iter(())