Source code for angr.storage.memory_mixins.paged_memory.pages.ultra_page

# pylint:disable=arguments-differ
import logging
from typing import List, Set, Optional, Tuple, Union, Any, Iterable

from sortedcontainers import SortedDict

import claripy

from .....errors import SimMemoryError
from . import PageBase
from .cooperation import MemoryObjectMixin, SimMemoryObject


l = logging.getLogger(name=__name__)


[docs]class UltraPage(MemoryObjectMixin, PageBase): """ Default page implementation """ SUPPORTS_CONCRETE_LOAD = True
[docs] def __init__(self, memory=None, init_zero=False, **kwargs): super().__init__(**kwargs) if memory is not None: self.concrete_data = bytearray(memory.page_size) if init_zero: self.symbolic_bitmap = bytearray(memory.page_size) else: self.symbolic_bitmap = bytearray(b"\1" * memory.page_size) else: self.concrete_data = None self.symbolic_bitmap = None self.symbolic_data = SortedDict()
[docs] @classmethod def new_from_shared(cls, data, memory=None, **kwargs): o = cls(**kwargs) o.concrete_data = data o.symbolic_bitmap = bytearray(memory.page_size) o.refcount = 2 # pylint: disable=attribute-defined-outside-init return o
[docs] def copy(self, memo): o = super().copy(memo) o.concrete_data = bytearray(self.concrete_data) o.symbolic_bitmap = bytearray(self.symbolic_bitmap) o.symbolic_data = SortedDict(self.symbolic_data) return o
[docs] def load( self, addr, size=None, page_addr=None, endness=None, memory=None, cooperate=False, **kwargs ): # pylint: disable=arguments-differ concrete_run = [] symbolic_run = ... last_run = None result = [] def cycle(end): if last_run is symbolic_run and symbolic_run is None: fill(end) elif last_run is concrete_run: new_ast = claripy.BVV(concrete_run, (end - result[-1][0]) * memory.state.arch.byte_width) new_obj = SimMemoryObject(new_ast, result[-1][0], endness) result[-1] = (result[-1][0], new_obj) def fill(end): global_end_addr = end global_start_addr = result[-1][0] size = global_end_addr - global_start_addr new_ast = self._default_value( global_start_addr, size, # pylint: disable=assignment-from-no-return key=(self.category, global_start_addr), memory=memory, endness=endness, **kwargs, ) new_item = SimMemoryObject(new_ast, global_start_addr, endness=endness) self.symbolic_data[global_start_addr - page_addr] = new_item result[-1] = (global_start_addr, new_item) for subaddr in range(addr, addr + size): realaddr = subaddr + page_addr if self.symbolic_bitmap[subaddr]: cur_val = self._get_object(subaddr, page_addr, memory=memory) if cur_val is last_run and last_run is symbolic_run: pass else: cycle(realaddr) last_run = symbolic_run = cur_val result.append((realaddr, cur_val)) else: cur_val = self.concrete_data[subaddr] if last_run is concrete_run: if endness == "Iend_LE": last_run = concrete_run = concrete_run | ( cur_val << (memory.state.arch.byte_width * (realaddr - result[-1][0])) ) else: last_run = concrete_run = (concrete_run << memory.state.arch.byte_width) | cur_val result[-1] = (result[-1][0], concrete_run) else: cycle(realaddr) last_run = concrete_run = cur_val result.append((realaddr, cur_val)) cycle(page_addr + addr + size) if not cooperate: result = self._force_load_cooperation(result, size, endness, page_addr=page_addr, memory=memory, **kwargs) return result
[docs] def store( self, addr, data: Union[int, SimMemoryObject], size: int = None, endness=None, memory=None, page_addr=None, # pylint: disable=arguments-differ cooperate=False, **kwargs, ): super().store( addr, data, size=size, endness=endness, memory=memory, cooperate=cooperate, page_addr=page_addr, **kwargs ) if not cooperate: data = self._force_store_cooperation( addr, data, size, endness, page_addr=page_addr, memory=memory, **kwargs ) if size >= memory.page_size - addr: size = memory.page_size - addr if type(data) is not int: if data.object.op == "BVV" and not data.object.annotations: # trim the unnecessary leading bytes if there are any full_bits = len(data.object) start = (page_addr + addr - data.base) & ((1 << memory.state.arch.bits) - 1) if start >= data.base + data.length: raise SimMemoryError("Not enough bytes to store.") start_bits = full_bits - start * memory.state.arch.byte_width - 1 # trim the overflowing bytes if there are any end_bits = start_bits + 1 - size * memory.state.arch.byte_width if start_bits != full_bits - 1 or end_bits != 0: if endness == "Iend_LE": start_bits, end_bits = len(data.object) - end_bits - 1, len(data.object) - start_bits - 1 obj = data.object[start_bits:end_bits] data = obj.args[0] if type(data) is int or (data.object.op == "BVV" and not data.object.annotations): # mark range as not symbolic self.symbolic_bitmap[addr : addr + size] = b"\0" * size # store arange = range(addr, addr + size) if type(data) is int: ival = data else: # data.object.op == 'BVV' ival = data.object.args[0] if endness == "Iend_BE": arange = reversed(arange) assert memory.state.arch.byte_width == 8 # TODO: Make UltraPage support architectures with greater byte_widths (but are still multiples of 8) for subaddr in arange: self.concrete_data[subaddr] = ival & 0xFF ival >>= 8 else: # mark range as symbolic self.symbolic_bitmap[addr : addr + size] = b"\1" * size # set ending object try: endpiece = next(self.symbolic_data.irange(maximum=addr + size, reverse=True)) except StopIteration: pass else: if endpiece != addr + size: self.symbolic_data[addr + size] = self.symbolic_data[endpiece] # clear range for midpiece in self.symbolic_data.irange(maximum=addr + size - 1, minimum=addr, reverse=True): del self.symbolic_data[midpiece] # set. self.symbolic_data[addr] = data
[docs] def merge( self, others: List["UltraPage"], merge_conditions, common_ancestor=None, page_addr: int = None, # pylint: disable=arguments-differ memory=None, changed_offsets: Optional[Set[int]] = None, ): all_pages = [self] + others merged_to = None merged_objects = set() merged_offsets = set() if changed_offsets is None: changed_offsets = set() for other in others: changed_offsets |= self.changed_bytes(other, page_addr) for b in sorted(changed_offsets): if merged_to is not None and not b >= merged_to: l.info("merged_to = %d ... already merged byte 0x%x", merged_to, b) continue l.debug("... on byte 0x%x", b) memory_objects: List[Tuple[SimMemoryObject, Any]] = [] concretes: List[Tuple[int, Any]] = [] unconstrained_in: List[Tuple["UltraPage", Any]] = [] our_mo: Optional[SimMemoryObject] = None # first get a list of all memory objects at that location, and # all memories that don't have those bytes for pg, fv in zip(all_pages, merge_conditions): if pg.symbolic_bitmap[b]: mo = pg._get_object(b, page_addr) if mo is not None: l.info("... MO present in %s", fv) memory_objects.append((mo, fv)) if pg is self: our_mo = mo else: l.info("... not present in %s", fv) unconstrained_in.append((pg, fv)) else: # concrete data concretes.append((pg.concrete_data[b], fv)) # fast path: no memory objects, no unconstrained positions, and only one concrete value if not memory_objects and not unconstrained_in and len({cv for cv, _ in concretes}) == 1: cv = concretes[0][0] self.store(b, cv, size=1, cooperate=True, page_addr=page_addr, memory=memory) continue # convert all concrete values into memory objects for cv, fv in concretes: mo = SimMemoryObject(claripy.BVV(cv, size=8), page_addr + b, "Iend_LE") memory_objects.append((mo, fv)) mos = {mo for mo, _ in memory_objects} mo_bases = {mo.base for mo, _ in memory_objects} mo_lengths = {mo.length for mo, _ in memory_objects} if not unconstrained_in and not mos - merged_objects: continue # first, optimize the case where we are dealing with the same-sized memory objects if len(mo_bases) == 1 and len(mo_lengths) == 1 and not unconstrained_in: to_merge = [(mo.object, fv) for mo, fv in memory_objects] # Update `merged_to` merged_to = b + list(mo_lengths)[0] merged_val = self._merge_values(to_merge, memory_objects[0][0].length, memory=memory) if merged_val is None: continue if our_mo is None: # this object does not exist in the current page. do the store new_object = SimMemoryObject(merged_val, page_addr + b, memory_objects[0][0].endness) self.store( b, new_object, size=list(mo_lengths)[0], cooperate=True, page_addr=page_addr, memory=memory ) merged_objects.add(new_object) else: # do the replacement new_object = self._replace_memory_object(our_mo, merged_val, memory=memory) merged_objects.add(new_object) merged_objects.update(mos) merged_offsets.add(b) else: # get the size that we can merge easily. This is the minimum of # the size of all memory objects and unallocated spaces. min_size = min(mo.length - (page_addr + b - mo.base) for mo, _ in memory_objects) for um, _ in unconstrained_in: for i in range(0, min_size): if um._contains(b + i, page_addr): min_size = i break merged_to = b + min_size l.info("... determined minimum size of %d", min_size) # Now, we have the minimum size. We'll extract/create expressions of that # size and merge them extracted = ( [(mo.bytes_at(page_addr + b, min_size), fv) for mo, fv in memory_objects] if min_size != 0 else [] ) created = [ (self._default_value(None, min_size, name=f"merge_uc_{uc.id}_{b:x}", memory=memory), fv) for uc, fv in unconstrained_in ] to_merge = extracted + created merged_val = self._merge_values(to_merge, min_size, memory=memory) if merged_val is None: continue self.store( b, merged_val, size=len(merged_val) // memory.state.arch.byte_width, inspect=False, page_addr=page_addr, memory=memory, ) # do not convert endianness again merged_offsets.add(b) return merged_offsets
[docs] def concrete_load(self, addr, size, **kwargs): # pylint: disable=arguments-differ if type(self.concrete_data) is bytearray: return ( memoryview(self.concrete_data)[addr : addr + size], memoryview(self.symbolic_bitmap)[addr : addr + size], ) else: return self.concrete_data[addr : addr + size], memoryview(self.symbolic_bitmap)[addr : addr + size]
[docs] def changed_bytes(self, other, page_addr=None) -> Set[int]: changed_candidates = super().changed_bytes(other) if changed_candidates is None: changed_candidates = range(len(self.symbolic_bitmap)) changes: Set[int] = set() for addr in changed_candidates: if self.symbolic_bitmap[addr] != other.symbolic_bitmap[addr]: changes.add(addr) elif self.symbolic_bitmap[addr] == 0: if self.concrete_data[addr] != other.concrete_data[addr]: changes.add(addr) else: try: aself = next(self.symbolic_data.irange(maximum=addr, reverse=True)) except StopIteration: aself = None try: aother = next(other.symbolic_data.irange(maximum=addr, reverse=True)) except StopIteration: aother = None if aself is None and aother is None: pass elif aself is None and aother is not None: oobj = other.symbolic_data[aother] if oobj.includes(addr + page_addr): changes.add(addr) elif aother is None and aself is not None: aobj = self.symbolic_data[aself] if aobj.includes(addr + page_addr): changes.add(addr) else: real_addr = page_addr + addr aobj = self.symbolic_data[aself] oobj = other.symbolic_data[aother] acont = aobj.includes(real_addr) ocont = oobj.includes(real_addr) if acont != ocont: changes.add(addr) elif acont is False: pass else: abyte = aobj.bytes_at(real_addr, 1) obyte = oobj.bytes_at(real_addr, 1) if abyte is not obyte: changes.add(addr) return changes
def _contains(self, start: int, page_addr: int): if not self.symbolic_bitmap[start]: # concrete data return True else: # symbolic data or does not exist return self._get_object(start, page_addr) is not None def _get_object(self, start: int, page_addr: int, memory=None) -> Optional[SimMemoryObject]: try: place = next(self.symbolic_data.irange(maximum=start, reverse=True)) except StopIteration: return None else: obj = self.symbolic_data[place] if obj.includes(start + page_addr): return obj elif memory is not None and obj.includes(start + page_addr + (1 << memory.state.arch.bits)): return obj else: return None
[docs] def replace_all_with_offsets(self, offsets: Iterable[int], old: claripy.ast.BV, new: claripy.ast.BV, memory=None): memory_objects = set() for offset in sorted(list(offsets)): try: a = next(self.symbolic_data.irange(maximum=offset, reverse=True)) except StopIteration: a = None if a is None: continue aobj = self.symbolic_data[a] memory_objects.add(aobj) replaced_objects_cache = {} for mo in memory_objects: replaced_object = None if mo.object in replaced_objects_cache: if mo.object is not replaced_objects_cache[mo.object]: replaced_object = replaced_objects_cache[mo.object] else: replaced_object = mo.object.replace(old, new) replaced_objects_cache[mo.object] = replaced_object if mo.object is replaced_object: # The replace does not really occur replaced_object = None if replaced_object is not None: self._replace_memory_object(mo, replaced_object, memory=memory)
def _replace_memory_object(self, old: SimMemoryObject, new_content: claripy.Bits, memory=None): """ Replaces the memory object `old` with a new memory object containing `new_content`. :param old: A SimMemoryObject (i.e., one from :func:`memory_objects_for_hash()` or :func:` memory_objects_for_name()`). :param new_content: The content (claripy expression) for the new memory object. :returns: the new memory object """ if ( old.object.size() if not old.is_bytes else len(old.object) * self.state.arch.byte_width ) != new_content.size(): raise SimMemoryError("memory objects can only be replaced by the same length content") new = SimMemoryObject(new_content, old.base, old.endness, byte_width=old._byte_width) for k in list(self.symbolic_data): if self.symbolic_data[k] is old: self.symbolic_data[k] = new if isinstance(new.object, claripy.ast.BV): # pylint:disable=isinstance-second-argument-not-valid-type for b in range(old.base, old.base + old.length): self._update_mappings(b, old.object, new.object, memory=memory) return new