Source code for angr.analyses.cfg.cfb

import logging
from typing import Any, Callable, Optional, Set

import cle
from cle.backends.externs import KernelObject, ExternObject
from cle.backends.tls.elf_tls import ELFTLSObject

from sortedcontainers import SortedDict

from ...knowledge_plugins.cfg.memory_data import MemoryDataSort, MemoryData
from ..analysis import Analysis

_l = logging.getLogger(name=__name__)


[docs]class CFBlanketView: """ A view into the control-flow blanket. """
[docs] def __init__(self, cfb): self._cfb = cfb
def __getitem__(self, item): if isinstance(item, slice): addr = item.start start_addr = self._cfb.floor_addr(addr) addr_ = start_addr while True: obj = self._cfb[addr_] yield obj addr_ += obj # Find gaps # TODO: finish it raise NotImplementedError()
# # Memory region #
[docs]class MemoryRegion:
[docs] def __init__(self, addr, size, type_, object_, cle_region): self.addr = addr self.size = size self.type = type_ self.object = object_ self.cle_region = cle_region
def __repr__(self): return f"<MemoryRegion {self.addr:#x}-{self.addr+self.size:#x}, type {self.type}>"
# # An address can be mapped to one of the following types of object # - Block # - MemoryData # - Unknown #
[docs]class Unknown:
[docs] def __init__(self, addr, size, bytes_=None, object_=None, segment=None, section=None): self.addr = addr self.size = size # Optional self.bytes = bytes_ self.object = object_ self.segment = segment self.section = section if size == 0: raise Exception("You cannot create an unknown region of size 0.")
def __repr__(self): s = f"<Unknown {self.addr:#x}-{self.addr + self.size:#x}>" return s
[docs]class CFBlanket(Analysis): """ A Control-Flow Blanket is a representation for storing all instructions, data entries, and bytes of a full program. Region types: - section - segment - extern - tls - kernel """
[docs] def __init__( self, exclude_region_types: Optional[Set[str]] = None, on_object_added: Optional[Callable[[int, Any], None]] = None, ): """ :param on_object_added: Callable with parameters (addr, obj) called after an object is added to the blanket. """ self._blanket = SortedDict() self._on_object_added_callback = on_object_added self._regions = [] self._exclude_region_types = set() if not exclude_region_types else exclude_region_types self._init_regions() # initialize for func in self.kb.functions.values(): self.add_function(func) self._mark_memory_data() self._mark_unknowns()
def _init_regions(self): for obj in self.project.loader.all_objects: if isinstance(obj, cle.MetaELF): if obj.sections: if "section" not in self._exclude_region_types: # Enumerate sections in an ELF file for section in obj.sections: if section.occupies_memory: mr = MemoryRegion(section.vaddr, section.memsize, "section", obj, section) self._regions.append(mr) elif obj.segments: if "segment" not in self._exclude_region_types: for segment in obj.segments: if segment.memsize > 0: mr = MemoryRegion(segment.vaddr, segment.memsize, "segment", obj, segment) self._regions.append(mr) else: raise NotImplementedError( "Currently ELFs without sections or segments are not supported. Please " "implement or complain on GitHub." ) elif isinstance(obj, cle.PE): if obj.sections: if "section" not in self._exclude_region_types: for section in obj.sections: mr = MemoryRegion(section.vaddr, section.memsize, "section", obj, section) self._regions.append(mr) else: raise NotImplementedError( "Currently PEs without sections are not supported. Please report to " "GitHub and provide an example binary." ) elif isinstance(obj, KernelObject): if "kernel" not in self._exclude_region_types: size = obj.max_addr - obj.min_addr mr = MemoryRegion(obj.min_addr, size, "kernel", obj, None) self._regions.append(mr) elif isinstance(obj, ExternObject): if "extern" not in self._exclude_region_types: size = obj.max_addr - obj.min_addr mr = MemoryRegion(obj.min_addr, size, "extern", obj, None) self._regions.append(mr) elif isinstance(obj, ELFTLSObject): if "tls" not in self._exclude_region_types: size = obj.max_addr - obj.min_addr mr = MemoryRegion(obj.min_addr, size, "tls", obj, None) self._regions.append(mr) else: if hasattr(obj, "size"): size = obj.size else: size = obj.max_addr - obj.min_addr type_ = "TODO" mr = MemoryRegion(obj.min_addr, size, type_, obj, obj) self._regions.append(mr) # Sort them just in case self._regions = list(sorted(self._regions, key=lambda x: x.addr)) @property def regions(self): """ Return all memory regions. """ return self._regions
[docs] def floor_addr(self, addr): try: return next(self._blanket.irange(maximum=addr, reverse=True)) except StopIteration: raise KeyError(addr)
[docs] def floor_item(self, addr): key = self.floor_addr(addr) return key, self._blanket[key]
[docs] def floor_items(self, addr=None, reverse=False): if addr is None: start_addr = None else: try: start_addr = next(self._blanket.irange(maximum=addr, reverse=True)) except StopIteration: start_addr = addr for key in self._blanket.irange(minimum=start_addr, reverse=reverse): yield key, self._blanket[key]
[docs] def ceiling_addr(self, addr): try: return next(self._blanket.irange(minimum=addr)) except StopIteration: raise KeyError(addr)
[docs] def ceiling_item(self, addr): key = self.ceiling_addr(addr) return key, self._blanket[key]
[docs] def ceiling_items(self, addr=None, reverse=False, include_first=True): if addr is None: start_addr = None else: try: start_addr = next(self._blanket.irange(minimum=addr)) except StopIteration: start_addr = addr for key in self._blanket.irange(maximum=start_addr if include_first else start_addr - 1, reverse=reverse): yield key, self._blanket[key]
def __getitem__(self, addr): return self._blanket[addr]
[docs] def add_obj(self, addr, obj): """ Adds an object `obj` to the blanket at the specified address `addr` """ self._blanket[addr] = obj if self._on_object_added_callback: self._on_object_added_callback(addr, obj)
[docs] def add_function(self, func): """ Add a function `func` and all blocks of this function to the blanket. """ for block in func.blocks: self.add_obj(block.addr, block)
[docs] def dbg_repr(self): """ The debugging representation of this CFBlanket. :return: The debugging representation of this CFBlanket. :rtype: str """ output = [] for obj in self.project.loader.all_objects: for section in obj.sections: if section.memsize == 0: continue min_addr, max_addr = section.min_addr, section.max_addr output.append("### Object %s" % repr(section)) output.append(f"### Range {min_addr:#x}-{max_addr:#x}") pos = min_addr while pos < max_addr: try: addr, thing = self.floor_item(pos) output.append(f"{addr:#x}: {repr(thing)}") if thing.size == 0: pos += 1 else: pos += thing.size except KeyError: pos += 1 output.append("") return "\n".join(output)
def _mark_memory_data(self): """ Mark all memory data. :return: None """ if "CFGFast" not in self.kb.cfgs: return cfg_model = self.kb.cfgs["CFGFast"] for addr, memory_data in cfg_model.memory_data.items(): memory_data: MemoryData if memory_data.sort == MemoryDataSort.CodeReference: # skip Code Reference continue self.add_obj(addr, memory_data) def _mark_unknowns(self): """ Mark all unmapped regions. :return: None """ for obj in self.project.loader.all_objects: if isinstance(obj, cle.ELF): # sections? if obj.sections and "section" not in self._exclude_region_types: for section in obj.sections: if not section.memsize or not section.vaddr: continue min_addr, max_addr = section.min_addr, section.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj, section=section) elif obj.segments and "segment" not in self._exclude_region_types: for segment in obj.segments: if not segment.memsize: continue min_addr, max_addr = segment.min_addr, segment.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj, segment=segment) else: # is it empty? _l.warning("Empty ELF object %s.", repr(obj)) elif isinstance(obj, cle.PE): if obj.sections: for section in obj.sections: if not section.memsize: continue min_addr, max_addr = section.min_addr, section.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj, section=section) else: # is it empty? _l.warning("Empty PE object %s.", repr(obj)) elif isinstance(obj, ELFTLSObject): if "tls" in self._exclude_region_types: # Skip them for now pass else: min_addr, max_addr = obj.min_addr, obj.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj) elif isinstance(obj, KernelObject): if "kernel" in self._exclude_region_types: # skip pass else: min_addr, max_addr = obj.min_addr, obj.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj) else: min_addr, max_addr = obj.min_addr, obj.max_addr self._mark_unknowns_core(min_addr, max_addr + 1, obj=obj) def _mark_unknowns_core(self, min_addr, max_addr, obj=None, segment=None, section=None): # The region should be [min_addr, max_addr) try: addr = self.floor_addr(min_addr) if addr < min_addr: raise KeyError except KeyError: # there is no other lower address try: next_addr = self.ceiling_addr(min_addr) if next_addr >= max_addr: raise KeyError except KeyError: next_addr = max_addr size = next_addr - min_addr if obj is None or isinstance(obj, cle.ExternObject): bytes_ = None else: try: _l.debug( "Loading bytes from object %s, section %s, segment %s, address %#x.", obj, section, segment, min_addr, ) bytes_ = self.project.loader.memory.load(min_addr, size) except KeyError: # The address does not exist bytes_ = None self.add_obj( min_addr, Unknown(min_addr, size, bytes_=bytes_, object_=obj, segment=segment, section=section) ) addr = min_addr while addr < max_addr: last_addr, last_item = self.floor_item(addr) if last_addr < min_addr: # impossible raise Exception("Impossible") if last_item.size == 0 or last_item.size is None: # Make sure everything has a non-zero size last_item_size = 1 else: last_item_size = last_item.size end_addr = last_addr + last_item_size if end_addr < max_addr: try: next_addr = self.ceiling_addr(end_addr) except KeyError: next_addr = max_addr if next_addr > end_addr: # there is a gap size = next_addr - end_addr if obj is None or isinstance(obj, cle.ExternObject): bytes_ = None else: try: _l.debug( "Loading bytes from object %s, section %s, segment %s, address %#x.", obj, section, segment, next_addr, ) bytes_ = self.project.loader.memory.load(next_addr, size) except KeyError: # The address does not exist bytes_ = None self.add_obj( end_addr, Unknown(end_addr, size, bytes_=bytes_, object_=obj, segment=segment, section=section) ) addr = next_addr else: addr = max_addr
from angr.analyses import AnalysesHub AnalysesHub.register_default("CFB", CFBlanket) AnalysesHub.register_default("CFBlanket", CFBlanket)