Source code for angr.storage.memory_mixins.paged_memory.pages.mv_list_page

# pylint:disable=abstract-method,arguments-differ,assignment-from-no-return
import logging
from typing import Optional, List, Set, Tuple, Union, Callable, Any, FrozenSet

from angr.utils.dynamic_dictlist import DynamicDictList
from .....storage.memory_object import SimMemoryObject, SimLabeledMemoryObject
from . import PageBase
from .cooperation import MemoryObjectSetMixin


l = logging.getLogger(name=__name__)

_MOTYPE = Union[SimMemoryObject, SimLabeledMemoryObject]


[docs]class MVListPage( MemoryObjectSetMixin, PageBase, ): """ MVListPage allows storing multiple values at the same location, thus allowing weak updates. Each store() may take a value or multiple values, and a "weak" parameter to specify if this store is a weak update or not. Each load() returns an iterator of all values stored at that location. """
[docs] def __init__(self, memory=None, content=None, sinkhole=None, mo_cmp=None, **kwargs): super().__init__(**kwargs) self.content: DynamicDictList[Optional[Union[_MOTYPE, Set[_MOTYPE]]]] = ( DynamicDictList(max_size=memory.page_size, content=content) if content is not None else None ) self.stored_offset = set() self._mo_cmp: Optional[Callable] = mo_cmp if self.content is None: if memory is not None: self.content: DynamicDictList[Optional[Union[_MOTYPE, Set[_MOTYPE]]]] = DynamicDictList( max_size=memory.page_size ) self.sinkhole: Optional[_MOTYPE] = sinkhole
[docs] def copy(self, memo) -> "MVListPage": o = super().copy(memo) o.content = DynamicDictList(max_size=self.content.max_size, content=self.content) o.sinkhole = self.sinkhole o.stored_offset = self.stored_offset.copy() o._mo_cmp = self._mo_cmp return o
[docs] def load( self, addr, size=None, endness=None, page_addr=None, memory=None, cooperate=False, **kwargs ) -> List[Tuple[int, _MOTYPE]]: result = [] last_seen = ... # ;) # loop over the loading range. accumulate a result for each byte, but collapse results from adjacent bytes # using the same memory object for subaddr in range(addr, addr + size): items = self.content[subaddr] if items is None: items = {self.sinkhole} if self.sinkhole is not None else None if items != last_seen: if last_seen is None: self._fill(result, subaddr, page_addr, endness, memory, **kwargs) result.append((subaddr + page_addr, items)) last_seen = items if last_seen is None: self._fill(result, addr + size, page_addr, endness, memory, **kwargs) if not cooperate: result = self._force_load_cooperation(result, size, endness, memory=memory, **kwargs) return result
def _fill(self, result, addr, page_addr, endness, memory, **kwargs): """ Small utility function for behavior which is duplicated in load mutates result to generate a new memory object and replace the last entry in it, which is None. Then, it will insert the new memory object into self.content. """ global_end_addr = addr + page_addr global_start_addr = result[-1][0] size = global_end_addr - global_start_addr new_ast = self._default_value( global_start_addr, size, name=f"{memory.id}_{global_start_addr:x}", key=(self.category, global_start_addr), memory=memory, **kwargs, ) new_item = SimMemoryObject( new_ast, global_start_addr, endness=endness, byte_width=memory.state.arch.byte_width if memory is not None else 8, ) subaddr_start = global_start_addr - page_addr for subaddr in range(subaddr_start, addr): self.content[subaddr] = {new_item} self.stored_offset.add(subaddr) result[-1] = (global_start_addr, new_item)
[docs] def store(self, addr, data, size=None, endness=None, memory=None, cooperate=False, weak=False, **kwargs): super().store(addr, data, size=size, endness=endness, memory=memory, cooperate=cooperate, weak=weak, **kwargs) if not cooperate: data = self._force_store_cooperation(addr, data, size, endness, memory=memory, **kwargs) data: Set[_MOTYPE] if size == len(self.content) and addr == 0 and len(data) == 1: self.sinkhole = next(iter(data)) self.content = DynamicDictList(max_size=len(self.content)) self.stored_offset = set() else: if not weak: if len(data) == 1: # unpack data: _MOTYPE = next(iter(data)) for subaddr in range(addr, addr + size): self.content[subaddr] = data self.stored_offset.add(subaddr) else: for subaddr in range(addr, addr + size): if self.content[subaddr] is None: self.content[subaddr] = data elif type(self.content[subaddr]) is set: self.content[subaddr] |= data else: self.content[subaddr] = {self.content[subaddr]} | data self.stored_offset.add(subaddr)
[docs] def erase(self, addr, size=None, **kwargs) -> None: for off in range(size): self.content[addr + off] = None
[docs] def merge( self, others: List["MVListPage"], merge_conditions, common_ancestor=None, page_addr: int = None, memory=None, changed_offsets: Optional[Set[int]] = None, ): if changed_offsets is None: changed_offsets = set() for other in others: changed_offsets |= self.changed_bytes(other, page_addr) all_pages: List["MVListPage"] = [self] + others if merge_conditions is None: merge_conditions = [None] * len(all_pages) merged_to = None merged_objects = set() merged_offsets = set() for b in sorted(changed_offsets): if merged_to is not None and not b >= merged_to: l.info("merged_to = %d ... already merged byte 0x%x", merged_to, b) continue l.debug("... on byte 0x%x", b) memory_object_sets: Set[Tuple[FrozenSet[SimMemoryObject], Any]] = set() unconstrained_in = [] # first get a list of all memory objects at that location, and all memories that don't have those bytes self_has_memory_object_set = False for sm, fv in zip(all_pages, merge_conditions): if sm._contains(b, page_addr): l.info("... present in %s", fv) memory_objects = set() for mo in sm.content_gen(b): if mo.includes(page_addr + b): memory_objects.add(mo) memory_object_sets.add((frozenset(memory_objects), fv)) if sm is self: self_has_memory_object_set = True else: l.info("... not present in %s", fv) unconstrained_in.append((sm, fv)) if not memory_object_sets: continue if self_has_memory_object_set and len(memory_object_sets) == 1: continue mo_sets = {mo_set for mo_set, _ in memory_object_sets} mo_bases = set() mo_lengths = set() endnesses = set() for mo_set in mo_sets: for mo in mo_set: mo_bases.add(mo.base) mo_lengths.add(mo.length) endnesses.add(mo.endness) if not unconstrained_in and not (mo_sets - merged_objects): # pylint:disable=superfluous-parens continue # first, optimize the case where we are dealing with the same-sized memory objects if len(mo_bases) == 1 and len(mo_lengths) == 1 and not unconstrained_in and len(endnesses) == 1: if len(memory_object_sets) == 1: # nothing to merge! continue the_endness = next(iter(endnesses)) to_merge = [] for mo_set, fv in memory_object_sets: for mo in mo_set: to_merge.append((mo.object, fv)) # Update `merged_to` mo_base = list(mo_bases)[0] mo_length = next(iter(mo_lengths)) size = min(mo_length - (page_addr + b - mo_base), len(self.content) - b) merged_to = b + size merged_val = self._merge_values(to_merge, mo_length, memory=memory) if merged_val is None: # merge_values() determines that we should not attempt to merge this value continue # do the replacement # TODO: Implement in-place replacement instead of calling store() # new_object = self._replace_memory_object(our_mo, merged_val, page_addr, memory.page_size) new_mos = {SimMemoryObject(v, mo_base, endness=the_endness) for v in merged_val} self.store(b, new_mos, size=size, cooperate=True, weak=False) merged_offsets.add(b) else: # get the size that we can merge easily. This is the minimum of the size of all memory objects and # unallocated spaces. min_size = len(self.content) - b mask = (1 << memory.state.arch.bits) - 1 for mo_set in mo_sets: for mo in mo_set: min_size = min(min_size, mo.length - ((b + page_addr - mo.base) & mask)) for um, _ in unconstrained_in: for i in range(0, min_size): if um._contains(b + i, page_addr): min_size = i break merged_to = b + min_size l.info("... determined minimum size of %d", min_size) # Now, we have the minimum size. We'll extract/create expressions of that # size and merge them extracted = [] if min_size != 0: for mo_set, fv in memory_object_sets: for mo in mo_set: extracted.append((mo.bytes_at(page_addr + b, min_size), fv)) if not memory.skip_missing_values_during_merging: created = [ (self._default_value(None, min_size, name=f"merge_uc_{uc.id}_{b:x}", memory=memory), fv) for uc, fv in unconstrained_in ] to_merge = extracted + created else: to_merge = extracted merged_val = self._merge_values(to_merge, min_size, memory=memory) if merged_val is None: continue new_mos = {SimMemoryObject(v, page_addr + b, endness="Iend_BE") for v in merged_val} self.store(b, new_mos, size=min_size, cooperate=True, weak=False) merged_offsets.add(b) self.stored_offset |= merged_offsets return merged_offsets
[docs] def compare( self, other: "MVListPage", page_addr: int = None, memory=None, changed_offsets=None ) -> bool: # pylint: disable=unused-argument compared_to = None for b in sorted(changed_offsets): if compared_to is not None and not b >= compared_to: continue unconstrained_in = [] self_has_memory_object_set = False memory_object_sets: Set[FrozenSet[SimMemoryObject]] = set() for sm in [self, other]: if sm._contains(b, page_addr): memory_objects = set() for mo in sm.content_gen(b): if mo.includes(page_addr + b): memory_objects.add(mo) memory_object_sets.add(frozenset(memory_objects)) if sm is self: self_has_memory_object_set = True else: unconstrained_in.append(sm) if not memory_object_sets: continue if self_has_memory_object_set and len(memory_object_sets) == 1: continue # TODO: compare_values even more? return False return True
[docs] def changed_bytes(self, other: "MVListPage", page_addr: int = None): candidates: Set[int] = super().changed_bytes(other) if candidates is not None: # using the result from the history tracking mixin as an approximation return candidates # slower path if candidates is None: candidates: Set[int] = set() # resort to the slower solution if self.sinkhole is None: candidates |= self.stored_offset else: for i in range(len(self.content)): if self._contains(i, page_addr): candidates.add(i) if other.sinkhole is None: candidates |= other.stored_offset else: for i in range(len(other.content)): if other._contains(i, page_addr): candidates.add(i) byte_width = 8 # TODO: Introduce self.state if we want to use self.state.arch.byte_width differences: Set[int] = set() for c in candidates: s_contains = self._contains(c, page_addr) o_contains = other._contains(c, page_addr) if not s_contains and o_contains: differences.add(c) elif s_contains and not o_contains: differences.add(c) else: if self.content[c] is None: if self.sinkhole is not None: self.content[c] = SimMemoryObject( self.sinkhole.bytes_at(page_addr + c, 1), page_addr + c, byte_width=byte_width, endness="Iend_BE", ) if other.content[c] is None: if other.sinkhole is not None: other.content[c] = SimMemoryObject( other.sinkhole.bytes_at(page_addr + c, 1), page_addr + c, byte_width=byte_width, endness="Iend_BE", ) if s_contains and self.content[c] != other.content[c]: same = None if self._mo_cmp is not None: same = self._mo_cmp(self.content[c], other.content[c], page_addr + c, 1) if same is None: # Try to see if the bytes are equal self_bytes = {mo.bytes_at(page_addr + c, 1) for mo in self.content_gen(c)} other_bytes = {mo.bytes_at(page_addr + c, 1) for mo in other.content_gen(c)} same = self_bytes == other_bytes if same is False: differences.add(c) else: # this means the byte is in neither memory pass return differences
[docs] def content_gen(self, index): if self.content[index] is None: return elif type(self.content[index]) is set: yield from self.content[index] else: yield self.content[index]
def _contains(self, off: int, page_addr: int): if off >= len(self.content): return False if self.content[off] is not None: return True if self.sinkhole is None: return False return self.sinkhole.includes(page_addr + off) def _replace_mo( self, old_mo: SimMemoryObject, new_mo: SimMemoryObject, page_addr: int, page_size: int ) -> SimMemoryObject: if self.sinkhole is old_mo: self.sinkhole = new_mo else: start, end = self._resolve_range(old_mo, page_addr, page_size) for i in range(start, end): s = {new_mo} if self.content[i - page_addr] is old_mo: self.content[i - page_addr] = s return new_mo @staticmethod def _resolve_range(mo: SimMemoryObject, page_addr: int, page_size) -> Tuple[int, int]: start = max(mo.base, page_addr) end = min(mo.last_addr + 1, page_addr + page_size) if end <= start: l.warning("Nothing left of the memory object to store in SimPage.") return start, end def _get_objects(self, start: int, page_addr: int) -> Optional[List[SimMemoryObject]]: mos = self.content[start] if mos is None: return None lst = [] if type(mos) is set: for mo in mos: if mo.includes(start + page_addr): lst.append(mo) else: lst.append(mos) if lst: return lst return None