Source code for angr.storage.memory_mixins.regioned_memory.regioned_memory_mixin

import logging
from itertools import count
from typing import Dict, Optional, Generator, Union, TYPE_CHECKING, Tuple, Iterable

import claripy
from claripy.ast import Bool, Bits, BV
from claripy.vsa import StridedInterval, ValueSet, RegionAnnotation

from ....sim_options import (
    AVOID_MULTIVALUED_READS,
    CONSERVATIVE_READ_STRATEGY,
    KEEP_MEMORY_READS_DISCRETE,
    CONSERVATIVE_WRITE_STRATEGY,
)
from ....state_plugins.sim_action_object import _raw_ast
from ....errors import SimMemoryError, SimAbstractMemoryError
from .. import MemoryMixin
from .region_data import AddressWrapper, RegionMap
from .abstract_address_descriptor import AbstractAddressDescriptor

if TYPE_CHECKING:
    from ....sim_state import SimState


_l = logging.getLogger(name=__name__)

invalid_read_ctr = count()


[docs]class RegionedMemoryMixin(MemoryMixin): """ Regioned memory. This mixin manages multiple memory regions. Each address is represented as a tuple of (region ID, offset into the region), which is called a regioned address. Converting absolute addresses into regioned addresses: We map an absolute address to a region by looking up which region this address belongs to in the region map. Currently this is only enabled for stack. Heap support has not landed yet. When start analyzing a function, the user should call set_stack_address_mapping() to create a new region mapping. Likewise, when exiting from a function, the user should cancel the previous mapping by calling unset_stack_address_mapping(). """
[docs] def __init__( self, write_targets_limit: int = 2048, read_targets_limit: int = 4096, stack_region_map: Optional[RegionMap] = None, generic_region_map: Optional[RegionMap] = None, stack_size: int = 65536, cle_memory_backer: Optional = None, dict_memory_backer: Optional[Dict] = None, regioned_memory_cls: Optional[type] = None, **kwargs, ): super().__init__(**kwargs) if regioned_memory_cls is None: # delayed import from .. import RegionedMemory regioned_memory_cls = RegionedMemory self._regioned_memory_cls = regioned_memory_cls self._regions: Dict[str, regioned_memory_cls] = {} self._cle_memory_backer = cle_memory_backer self._dict_memory_backer = dict_memory_backer self._stack_size: int = stack_size self._stack_region_map: Optional[RegionMap] = ( stack_region_map if stack_region_map is not None else RegionMap(True) ) self._generic_region_map: Optional[RegionMap] = ( generic_region_map if generic_region_map is not None else RegionMap(False) ) self._write_targets_limit = write_targets_limit self._read_targets_limit = read_targets_limit
@MemoryMixin.memo def copy(self, memo): o: "RegionedMemoryMixin" = super().copy(memo) o._write_targets_limit = self._write_targets_limit o._read_targets_limit = self._read_targets_limit o._stack_size = self._stack_size o._endness = self.endness o._stack_region_map = self._stack_region_map o._generic_region_map = self._generic_region_map o._cle_memory_backer = self._cle_memory_backer o._dict_memory_backer = self._dict_memory_backer o._regioned_memory_cls = self._regioned_memory_cls o._regions = {} for region_id, region in self._regions.items(): o._regions[region_id] = region.copy(memo) return o
[docs] def load( self, addr, size: Optional[Union[BV, int]] = None, endness=None, condition: Optional[Bool] = None, **kwargs ): if isinstance(size, BV) and isinstance(size._model_vsa, ValueSet): _l.critical("load(): size %s is a ValueSet. Something is wrong.", size) if self.state.scratch.ins_addr is not None: var_name = "invalid_read_%d_%#x" % (next(invalid_read_ctr), self.state.scratch.ins_addr) else: var_name = "invalid_read_%d_None" % next(invalid_read_ctr) return self.state.solver.Unconstrained(var_name, self.state.arch.bits) val = None regioned_addrs_desc = self._normalize_address(addr, condition=condition) if (regioned_addrs_desc.cardinality > 1 and AVOID_MULTIVALUED_READS in self.state.options) or ( regioned_addrs_desc.cardinality >= self._read_targets_limit and CONSERVATIVE_READ_STRATEGY in self.state.options ): val = self.state.solver.Unconstrained("unconstrained_read", size * self.state.arch.byte_width) return val gen = self._concretize_address_descriptor(regioned_addrs_desc, addr, is_write=False) for aw in gen: new_val = self._region_load( aw.address, size, aw.region, endness=endness, related_function_addr=aw.function_address, **kwargs, ) if val is None: if KEEP_MEMORY_READS_DISCRETE in self.state.options: val = self.state.solver.DSIS(to_conv=new_val, max_card=100000) else: val = new_val else: val = val.union(new_val) if val is None: # address_wrappers is empty - we cannot concretize the address in static mode. # ensure val is not None val = self.state.solver.Unconstrained( "invalid_read_%d_%d" % (next(invalid_read_ctr), size), size * self.state.arch.byte_width ) return val
[docs] def store(self, addr, data, size: Optional[int] = None, endness=None, **kwargs): regioned_addrs_desc = self._normalize_address(addr) if ( regioned_addrs_desc.cardinality >= self._write_targets_limit and CONSERVATIVE_WRITE_STRATEGY in self.state.options ): return gen = self._concretize_address_descriptor(regioned_addrs_desc, addr, is_write=True) for aw in gen: self._region_store(aw.address, data, aw.region, endness, related_function_addr=aw.function_address)
[docs] def merge(self, others: Iterable["RegionedMemoryMixin"], merge_conditions, common_ancestor=None) -> bool: r = False for o in others: for region_id, region in o._regions.items(): if region_id in self._regions: r |= self._regions[region_id].merge([region], merge_conditions, common_ancestor=common_ancestor) else: self._regions[region_id] = region r = True return r
[docs] def find(self, addr: Union[int, Bits], data, max_search, **kwargs): # FIXME: Attempt find() on more than one region gen = self._normalize_address_type(addr, self.state.arch.bits) for region, si in gen: si = claripy.SI(to_conv=si) r, s, i = self._regions[region].find(si, data, max_search, **kwargs) # Post-process r so that it's still a ValueSet region_base_addr = self._region_base(region) r = self.state.solver.ValueSet(r.size(), region, region_base_addr, r._model_vsa) return r, s, i
[docs] def set_state(self, state): for region in self._regions.values(): region.set_state(state) super().set_state(state)
[docs] def replace_all(self, old: claripy.ast.BV, new: claripy.ast.BV): for region in self._regions.values(): region.replace_all(old, new)
# # Region management #
[docs] def set_stack_address_mapping( self, absolute_address: int, region_id: str, related_function_address: Optional[int] = None ): """ Create a new mapping between an absolute address (which is the base address of a specific stack frame) and a region ID. :param absolute_address: The absolute memory address. :param region_id: The region ID. :param related_function_address: Related function address. """ if self._stack_region_map is None: raise SimMemoryError("Stack region map is not initialized.") self._stack_region_map.map(absolute_address, region_id, related_function_address=related_function_address)
[docs] def unset_stack_address_mapping(self, absolute_address: int): """ Remove a stack mapping. :param absolute_address: An absolute memory address that is the base address of the stack frame to destroy. """ if self._stack_region_map is None: raise SimMemoryError("Stack region map is not initialized.") self._stack_region_map.unmap_by_address(absolute_address)
[docs] def stack_id(self, function_address: int) -> str: """ Return a memory region ID for a function. If the default region ID exists in the region mapping, an integer will appended to the region name. In this way we can handle recursive function calls, or a function that appears more than once in the call frame. This also means that `stack_id()` should only be called when creating a new stack frame for a function. You are not supposed to call this function every time you want to map a function address to a stack ID. :param function_address: Address of the function. :return: ID of the new memory region. """ region_id = "stack_%#x" % function_address # deduplication region_ids = self._stack_region_map.region_ids if region_id not in region_ids: return region_id else: for i in range(0, 2000): new_region_id = region_id + "_%d" % i if new_region_id not in region_ids: return new_region_id raise SimMemoryError("Cannot allocate region ID for function %#08x - recursion too deep" % function_address)
[docs] def set_stack_size(self, size: int): self._stack_size = size
def _create_region( self, key: str, state: "SimState", related_function_addr: int, endness, cle_memory_backer: Optional = None, dict_memory_backer: Optional[Dict] = None, ): """ Create a new MemoryRegion with the region key specified, and store it to self._regions. :param key: a string which is the region key :param state: the SimState instance :param is_stack: Whether this memory region is on stack. True/False :param related_function_addr: Which function first creates this memory region. Just for reference. :param endness: The endianness. :param backer_dict: The memory backer object. :return: None """ self._regions[key] = self._regioned_memory_cls( memory_id=key, related_function_addr=related_function_addr, endness=endness, cle_memory_backer=cle_memory_backer, dict_memory_backer=dict_memory_backer, ) self._regions[key].set_state(state) def _region_base(self, region: str) -> int: """ Get the base address of a memory region. :param region: ID of the memory region :return: Address of the memory region """ if region == "global": base_addr = 0 elif region.startswith("stack_"): base_addr = self._stack_region_map.absolutize(region, 0) else: base_addr = self._generic_region_map.absolutize(region, 0) return base_addr def _region_load(self, addr, size, key: str, related_function_addr=None, **kwargs): bbl_addr, stmt_id, ins_addr = ( self.state.scratch.bbl_addr, self.state.scratch.stmt_idx, self.state.scratch.ins_addr, ) if key not in self._regions: self._create_region( key, self.state, related_function_addr, self.endness, cle_memory_backer=self._cle_memory_backer.get(key, None) if self._cle_memory_backer is not None else None, dict_memory_backer=self._dict_memory_backer.get(key, None) if self._dict_memory_backer is not None else None, ) return self._regions[key].load(addr, size, bbl_addr, stmt_id, ins_addr, **kwargs) def _region_store(self, addr, data, key: str, endness, related_function_addr: Optional[int] = None, **kwargs): if key not in self._regions: self._create_region( key, self.state, related_function_addr, self.endness, cle_memory_backer=self._cle_memory_backer.get(key, None) if self._cle_memory_backer is not None else None, dict_memory_backer=self._dict_memory_backer.get(key, None) if self._dict_memory_backer is not None else None, ) self._regions[key].store( addr, data, self.state.scratch.bbl_addr, self.state.scratch.stmt_idx, self.state.scratch.ins_addr, endness=endness, **kwargs, ) # # Address concretization and conversion # def _concretize_address_descriptor( self, desc: AbstractAddressDescriptor, original_addr: claripy.ast.Bits, is_write: bool = False, target_region: Optional[str] = None, ) -> Generator[AddressWrapper, None, None]: raise NotImplementedError() def _normalize_address(self, addr: claripy.ast.Bits, condition=None) -> AbstractAddressDescriptor: """ Translate an address into a series of internal representation of addresses that can be used to address in individual regions. :param addr: :param is_write: :param convert_to_valueset: :param target_region: :return: """ if type(addr) is not int: for constraint in self.state.solver.constraints: if getattr(addr, "variables", set()) & constraint.variables: addr = self._apply_condition_to_symbolic_addr(addr, constraint) # Apply the condition if necessary if condition is not None: addr = self._apply_condition_to_symbolic_addr(addr, condition) addr_with_regions = self._normalize_address_type(addr, self.state.arch.bits) desc = AbstractAddressDescriptor() for region, addr_si in addr_with_regions: desc.add_regioned_address(region, addr_si) return desc def _normalize_address_core( self, region_id: str, relative_address: int, target_region: Optional[str] = None ) -> AddressWrapper: """ If this is a stack address, we convert it to a correct region and address :param region_id: a string indicating which region the address is relative to :param relative_address: an address that is relative to the region parameter :param target_region: the ideal target region that address is normalized to. None means picking the best fit. :return: an AddressWrapper object """ if self._stack_region_map.is_empty and self._generic_region_map.is_empty: # We don't have any mapped region right now return AddressWrapper(region_id, 0, relative_address, False, None) # We wanna convert this address to an absolute address first if region_id.startswith("stack_"): absolute_address = self._stack_region_map.absolutize(region_id, relative_address) else: absolute_address = self._generic_region_map.absolutize(region_id, relative_address) stack_base = self._stack_region_map.stack_base if stack_base - self._stack_size < relative_address <= stack_base and ( target_region is not None and target_region.startswith("stack_") ): # The absolute address seems to be in the stack region. # Map it to stack new_region_id, new_relative_address, related_function_addr = self._stack_region_map.relativize( absolute_address, target_region_id=target_region ) return AddressWrapper( new_region_id, self._region_base(new_region_id), new_relative_address, True, related_function_addr ) else: new_region_id, new_relative_address, related_function_addr = self._generic_region_map.relativize( absolute_address, target_region_id=target_region ) return AddressWrapper(new_region_id, self._region_base(new_region_id), new_relative_address, False, None) def _apply_condition_to_symbolic_addr(self, addr, condition): _, converted = self.state.solver.constraint_to_si(condition) for original_expr, constrained_expr in converted: addr = addr.replace(original_expr, constrained_expr) return addr @staticmethod def _normalize_address_type(addr: Union[int, Bits], bits) -> Generator[Tuple[str, StridedInterval], None, None]: """ Convert address of different types to a list of mapping between region IDs and offsets (strided intervals). :param addr: Address to convert :return: A list of mapping between region IDs and offsets. :rtype: dict """ if isinstance(addr, int): addr_e = claripy.BVV(addr, bits) else: addr_e = _raw_ast(addr) if isinstance(addr_e, (claripy.bv.BVV, claripy.vsa.StridedInterval, claripy.vsa.ValueSet)): raise SimMemoryError("_normalize_address_type() does not take claripy models.") if isinstance(addr_e, claripy.ast.Base): if not isinstance(addr_e._model_vsa, ValueSet): # Convert it to a ValueSet first by annotating it addr_e = addr_e.annotate(RegionAnnotation("global", 0, addr_e._model_vsa)) model_vsa = addr_e._model_vsa if isinstance(model_vsa, ValueSet): yield from model_vsa.items() else: raise SimAbstractMemoryError("Cannot parse address as a VSA ValueSet") else: raise SimAbstractMemoryError("Unsupported address type %s" % type(addr_e))