Source code for angr.state_plugins.symbolizer

import logging
import claripy
import struct

from .plugin import SimStatePlugin
from ..storage.memory_mixins import PagedMemoryMixin


l = logging.getLogger(name=__name__)
l.setLevel("DEBUG")


def _mem_write_cb(s):
    s.symbolizer._mem_write_callback()


def _mem_read_cb(s):
    s.symbolizer._mem_read_callback()


def _reg_write_cb(s):
    s.symbolizer._reg_write_callback()


def _reg_read_cb(s):
    s.symbolizer._reg_read_callback()


def _page_map_cb(s):
    s.symbolizer._page_map_callback()


PAGE_SIZE = 0x1000


[docs]class SimSymbolizer(SimStatePlugin): # pylint:disable=abstract-method """ The symbolizer state plugin ensures that pointers that are stored in memory are symbolic. This allows for the tracking of and reasoning over these pointers (for example, to reason about memory disclosure). """
[docs] def __init__(self): SimStatePlugin.__init__(self) self._symbolize_all = False self.symbolization_target_pages = set() self.ignore_target_pages = set() self.symbolized_count = 0 self.page_symbols = {} self._min_addr = 0 self._max_addr = 0 self._LE_FMT = None self._BE_FMT = None self._zero = None
def _page_map_callback(self): if self._symbolize_all: page_id, _ = self.state.memory._divide_addr(self.state.inspect.mapped_address) self.symbolization_target_pages.add(page_id) def _mem_write_callback(self): if not isinstance(self.state.inspect.mem_write_expr, int) and self.state.inspect.mem_write_expr.symbolic: return mem_write_length = self.state.inspect.mem_write_length if mem_write_length is not None and not isinstance(mem_write_length, int) and mem_write_length.symbolic: return # length = self.state.solver.eval_one(self.state.inspect.mem_write_length) # if length != self.state.arch.bytes: # return write_expr = self.state.inspect.mem_write_expr byte_expr = self.state.solver.eval_one(write_expr, cast_to=bytes).rjust( write_expr.length // self.state.arch.byte_width ) replacement_expr = self._resymbolize_data(byte_expr) if replacement_expr is not None: assert replacement_expr.length == write_expr.length self.state.inspect.mem_write_expr = replacement_expr def _reg_write_callback(self): if not isinstance(self.state.inspect.reg_write_expr, int) and self.state.inspect.reg_write_expr.symbolic: return if not isinstance(self.state.inspect.reg_write_length, int) and self.state.inspect.reg_write_length.symbolic: return if self.state.inspect.reg_write_offset == self.state.arch.ip_offset: return length = self.state.solver.eval_one(self.state.inspect.reg_write_length) if length != self.state.arch.bytes: return expr = self.state.solver.eval_one(self.state.inspect.reg_write_expr) if self._should_symbolize(expr): self.state.inspect.reg_write_expr = self._preconstrain(expr)
[docs] def init_state(self): super().init_state() if not isinstance(self.state.memory, PagedMemoryMixin): raise TypeError("Symbolizer only supports a paged memory model.") assert self.state.memory.page_size == PAGE_SIZE self._LE_FMT = self.state.arch.struct_fmt(endness="Iend_LE") self._BE_FMT = self.state.arch.struct_fmt(endness="Iend_BE") # ignore CLE pages for i in range(0, self.state.project.loader.kernel_object.map_size, PAGE_SIZE): self.ignore_target_pages.add((self.state.project.loader.kernel_object.mapped_base + i) // PAGE_SIZE) for i in range(0, self.state.project.loader.extern_object.map_size, PAGE_SIZE): self.ignore_target_pages.add((self.state.project.loader.extern_object.mapped_base + i) // PAGE_SIZE) self.state.inspect.make_breakpoint("memory_page_map", when=self.state.inspect.BP_BEFORE, action=_page_map_cb) self.state.inspect.make_breakpoint("mem_write", when=self.state.inspect.BP_BEFORE, action=_mem_write_cb) # self.state.inspect.make_breakpoint('mem_read', when=self.state.inspect.BP_BEFORE, action=_mem_read_cb) # self.state.inspect.make_breakpoint('reg_write', when=self.state.inspect.BP_BEFORE, action=_reg_write_cb) # self.state.inspect.make_breakpoint('reg_read', when=self.state.inspect.BP_BEFORE, action=_reg_read_cb) self._zero = claripy.BVV(0, self.state.arch.bytes)
@staticmethod def _page_id(x): return x // PAGE_SIZE @staticmethod def _page_addr(p): return p * PAGE_SIZE @staticmethod def _page_offset(p): return p % PAGE_SIZE def _update_ranges(self): if len(self.symbolization_target_pages) == 0: self._max_addr = self._min_addr = 0 else: self._min_addr = self._page_addr(min(self.symbolization_target_pages)) self._max_addr = self._page_addr(max(self.symbolization_target_pages) + 1)
[docs] def set_symbolization_for_all_pages(self): """ Sets the symbolizer to symbolize pointers to all pages as they are written to memory.. """ self._symbolize_all = True self.symbolization_target_pages.update(set(self.state.memory._pages.keys())) self._update_ranges()
[docs] def set_symbolized_target_range(self, base, length): """ All pointers to the target range will be symbolized as they are written to memory. Due to optimizations, the _pages_ containing this range will be set as symbolization targets, not just the range itself. """ base_page = self._page_id(base) pages = (length + self._page_offset(base) + PAGE_SIZE - 1) // PAGE_SIZE assert pages > 0 self.symbolization_target_pages.update(range(base_page, base_page + pages)) self._update_ranges()
def _preconstrain(self, value, name_prefix="address_"): page_base = value & ~(PAGE_SIZE - 1) try: symbol = self.page_symbols[page_base] except KeyError: symbol = claripy.BVS(name_prefix + hex(page_base), self.state.arch.bits) self.page_symbols[page_base] = symbol self.state.add_constraints(symbol == page_base) self.symbolized_count += 1 return symbol + (value - page_base) def _should_symbolize(self, addr): return ( self._page_id(addr) in self.symbolization_target_pages and self._page_id(addr) not in self.ignore_target_pages ) def _resymbolize_int(self, be, le=0, base=0, offset=0, skip=()): if base + offset in skip: return None elif self._min_addr <= be < self._max_addr and self._should_symbolize(be): s = self._preconstrain(be) l.debug("Replacing %#x (at %#x, endness BE) with %s!", be, base + offset, s) return s elif self._min_addr <= le < self._max_addr and self._should_symbolize(le): s = self._preconstrain(le).reversed l.debug("Replacing %#x (at %#x, endness LE) with %s!", le, base + offset, s) return s else: return None def _resymbolize_data(self, data, prefix=b"", base=0, skip=()): ws = self.state.arch.bytes suffix = data[len(data) - (len(data) % ws) :] data = data[: len(data) - (len(data) % ws)] num_words = len(data) // ws unpacked_le = struct.unpack(self._LE_FMT[0] + str(num_words) + self._LE_FMT[1], data) unpacked_be = struct.unpack(self._BE_FMT[0] + str(num_words) + self._BE_FMT[1], data) values_squashed = [prefix] last_idx = 0 for i, (be, le) in enumerate(zip(unpacked_be, unpacked_le)): # assert len(claripy.Concat(*values_squashed)) == i*8 s = self._resymbolize_int(be, le, base, i * ws, skip) if s is None: return None if last_idx != i: values_squashed.append(data[last_idx * ws : i * ws]) last_idx = i + 1 values_squashed.append(s) if len(values_squashed) == 1: return None if last_idx != num_words: values_squashed.append(data[last_idx * ws :]) values_squashed.append(suffix) new_data = claripy.Concat(*values_squashed) # assert len(new_data)/8 == len(data) + len(prefix) # assert self.state.solver.eval_one(new_data) == self.state.solver.eval_one(claripy.BVV(data)) return new_data def _resymbolize_region(self, storage: PagedMemoryMixin, addr, length): assert type(addr) is int assert type(length) is int self.state.scratch.push_priv(True) i = 0 while i < length: a = addr + i aligned_a = a - a % (-self.state.arch.bytes) i = aligned_a - addr if i == length: break data = storage.concrete_load(aligned_a, length - i) if not data or len(data) == 0: # try the next byte i += 1 continue replacement_content = self._resymbolize_data( bytearray(data), base=aligned_a, prefix=b"", skip=() if storage is self.state.memory else (self.state.arch.ip_offset,), ) if replacement_content is not None: storage.store(aligned_a, replacement_content) i += len(replacement_content) // self.state.arch.bytes else: i += 1 self.state.scratch.pop_priv()
[docs] def resymbolize(self): """ Re-symbolizes all pointers in memory. This can be called to symbolize any pointers to target regions that were written (and not mangled beyond recognition) before symbolization was set. """ for i, p_id in enumerate(self.state.memory._pages): if i % 100 == 0: l.info("%s/%s memory pages symbolized", i, len(self.state.memory._pages)) addr_start = p_id * self.state.memory.page_size length = self.state.memory.page_size self._resymbolize_region(self.state.memory, addr_start, length)
@SimStatePlugin.memo def copy(self, memo): # pylint: disable=unused-argument sc = SimSymbolizer() sc._symbolize_all = self._symbolize_all sc.symbolization_target_pages = set(self.symbolization_target_pages) sc.ignore_target_pages = set(self.ignore_target_pages) sc.symbolized_count = self.symbolized_count sc._LE_FMT = self._LE_FMT sc._BE_FMT = self._BE_FMT sc._min_addr = self._min_addr sc._max_addr = self._max_addr sc.page_symbols = dict(sc.page_symbols) return sc
from ..sim_state import SimState SimState.register_default("symbolizer", SimSymbolizer)