Source code for angr.analyses.cfg.cfg_base

# pylint:disable=line-too-long,multiple-statements
from typing import Dict, Tuple, List, Optional, Union, Set, TYPE_CHECKING
import logging
from collections import defaultdict

import networkx
from sortedcontainers import SortedDict

import pyvex
from claripy.utils.orderedset import OrderedSet
from cle import ELF, PE, Blob, TLSObject, MachO, ExternObject, KernelObject, FunctionHintSource
from cle.backends import NamedRegion
import archinfo
from archinfo.arch_soot import SootAddressDescriptor
from archinfo.arch_arm import is_arm_arch, get_real_address_if_arm

from angr.knowledge_plugins.functions.function_manager import FunctionManager
from angr.knowledge_plugins.functions.function import Function
from angr.knowledge_plugins.cfg import IndirectJump, CFGNode, CFGENode, CFGModel  # pylint:disable=unused-import
from angr.misc.ux import deprecated
from angr.procedures.stubs.UnresolvableJumpTarget import UnresolvableJumpTarget
from angr.utils.constants import DEFAULT_STATEMENT
from angr.procedures.procedure_dict import SIM_PROCEDURES
from angr.errors import (
    AngrCFGError,
    SimTranslationError,
    SimMemoryError,
    SimIRSBError,
    SimEngineError,
    AngrUnsupportedSyscallError,
    SimError,
)
from angr.codenode import HookNode, BlockNode
from angr.engines.vex.lifter import VEX_IRSB_MAX_SIZE, VEX_IRSB_MAX_INST
from angr.analyses import Analysis
from angr.analyses.stack_pointer_tracker import StackPointerTracker
from .indirect_jump_resolvers.default_resolvers import default_indirect_jump_resolvers

if TYPE_CHECKING:
    from angr.sim_state import SimState


l = logging.getLogger(name=__name__)


[docs]class CFGBase(Analysis): """ The base class for control flow graphs. """ tag: Optional[str] = None _cle_pseudo_objects = (ExternObject, KernelObject, TLSObject)
[docs] def __init__( self, sort, context_sensitivity_level, normalize=False, binary=None, objects=None, regions=None, exclude_sparse_regions=True, skip_specific_regions=True, force_segment=False, base_state=None, resolve_indirect_jumps=True, indirect_jump_resolvers=None, indirect_jump_target_limit=100000, detect_tail_calls=False, low_priority=False, skip_unmapped_addrs=True, sp_tracking_track_memory=True, model=None, ): """ :param str sort: 'fast' or 'emulated'. :param int context_sensitivity_level: The level of context-sensitivity of this CFG (see documentation for further details). It ranges from 0 to infinity. :param bool normalize: Whether the CFG as well as all Function graphs should be normalized. :param cle.backends.Backend binary: The binary to recover CFG on. By default, the main binary is used. :param objects: A list of objects to recover the CFG on. By default, it will recover the CFG of all loaded objects. :param iterable regions: A list of tuples in the form of (start address, end address) describing memory regions that the CFG should cover. :param bool force_segment: Force CFGFast to rely on binary segments instead of sections. :param angr.SimState base_state: A state to use as a backer for all memory loads. :param bool resolve_indirect_jumps: Whether to try to resolve indirect jumps. This is necessary to resolve jump targets from jump tables, etc. :param list indirect_jump_resolvers: A custom list of indirect jump resolvers. If this list is None or empty, default indirect jump resolvers specific to this architecture and binary types will be loaded. :param int indirect_jump_target_limit: Maximum indirect jump targets to be recovered. :param skip_unmapped_addrs: Ignore all branches into unmapped regions. True by default. You may want to set it to False if you are analyzing manually patched binaries or malware samples. :param bool detect_tail_calls: Aggressive tail-call optimization detection. This option is only respected in make_functions(). :param bool sp_tracking_track_memory: Whether or not to track memory writes if tracking the stack pointer. This increases the accuracy of stack pointer tracking, especially for architectures without a base pointer. Only used if detect_tail_calls is enabled. :param None or CFGModel model: The CFGModel instance to write to. A new CFGModel instance will be created and registered with the knowledge base if `model` is None. :return: None """ self.sort = sort self._context_sensitivity_level = context_sensitivity_level # Sanity checks if context_sensitivity_level < 0: raise ValueError("Unsupported context sensitivity level %d" % context_sensitivity_level) self._binary = binary if binary is not None else self.project.loader.main_object self._force_segment = force_segment self._base_state = base_state self._detect_tail_calls = detect_tail_calls self._low_priority = low_priority self._skip_unmapped_addrs = skip_unmapped_addrs # Initialization self._edge_map = None self._loop_back_edges = None self._overlapped_loop_headers = None self._thumb_addrs = set() self._tail_calls = set() # Store all the functions analyzed before the set is cleared # Used for performance optimization self._updated_nonreturning_functions = None self._normalize = normalize # Flag, whether to track memory writes in stack pointer tracking self._sp_tracking_track_memory = sp_tracking_track_memory # IndirectJump object that describe all indirect exits found in the binary # stores as a map between addresses and IndirectJump objects self.indirect_jumps: Dict[int, IndirectJump] = {} self._indirect_jumps_to_resolve = set() # Indirect jump resolvers self._indirect_jump_target_limit = indirect_jump_target_limit self._resolve_indirect_jumps = resolve_indirect_jumps self.timeless_indirect_jump_resolvers = [] self.indirect_jump_resolvers = [] if not indirect_jump_resolvers: indirect_jump_resolvers = default_indirect_jump_resolvers(self._binary, self.project) if self._resolve_indirect_jumps and indirect_jump_resolvers: # split them into different groups for the sake of speed for ijr in indirect_jump_resolvers: if ijr.timeless: self.timeless_indirect_jump_resolvers.append(ijr) else: self.indirect_jump_resolvers.append(ijr) l.info( "Loaded %d indirect jump resolvers (%d timeless, %d generic).", len(self.timeless_indirect_jump_resolvers) + len(self.indirect_jump_resolvers), len(self.timeless_indirect_jump_resolvers), len(self.indirect_jump_resolvers), ) # Get all executable memory regions self._exec_mem_regions = self._executable_memory_regions(None, self._force_segment) self._exec_mem_region_size = sum((end - start) for start, end in self._exec_mem_regions) # initialize UnresolvableJumpTarget and UnresolvableCallTarget SimProcedure # but we do not want to hook the same symbol multiple times ut_jump_addr = self.project.loader.extern_object.get_pseudo_addr("UnresolvableJumpTarget") if not self.project.is_hooked(ut_jump_addr): self.project.hook(ut_jump_addr, SIM_PROCEDURES["stubs"]["UnresolvableJumpTarget"]()) self._unresolvable_jump_target_addr = ut_jump_addr ut_call_addr = self.project.loader.extern_object.get_pseudo_addr("UnresolvableCallTarget") if not self.project.is_hooked(ut_call_addr): self.project.hook(ut_call_addr, SIM_PROCEDURES["stubs"]["UnresolvableCallTarget"]()) self._unresolvable_call_target_addr = ut_call_addr # partially and fully analyzed functions # this is implemented as a state machine: jobs (CFGJob instances) of each function are put into # _jobs_to_analyze_per_function, which essentially makes the keys of this dict being all partially analyzed # functions so far. And then when a function does not have any more job to analyze in the future, it will be # put in to _completed_functions. # a dict of mapping between function addresses and sets of jobs (include both future jobs and pending jobs) # a set is used to speed up the job removal procedure self._jobs_to_analyze_per_function = defaultdict(set) # addresses of functions that have been completely recovered (i.e. all of its blocks are identified) so far self._completed_functions = set() # TODO: A segment tree to speed up CFG node lookups self._node_lookup_index = None self._node_lookup_index_warned = False self._function_addresses_from_symbols = self._load_func_addrs_from_symbols() self._function_addresses_from_eh_frame = self._load_func_addrs_from_eh_frame() # Cache if an object has executable sections or not self._object_to_executable_sections = {} # Cache if an object has executable segments or not self._object_to_executable_segments = {} if model is not None: self._model = model else: self._model: CFGModel = self.kb.cfgs.new_model(self.tag) # necessary warnings regions_not_specified = regions is None and binary is None and not objects if regions_not_specified and self.project.loader._auto_load_libs and len(self.project.loader.all_objects) > 3: l.warning( '"auto_load_libs" is enabled. With libraries loaded in project, CFG will cover libraries, ' "which may take significantly more time than expected. You may reload the binary with " '"auto_load_libs" disabled, or specify "regions" to limit the scope of CFG recovery.' ) if regions is None: if self._skip_unmapped_addrs: regions = self._executable_memory_regions(objects=objects, force_segment=force_segment) else: if not objects: objects = self.project.loader.all_objects regions = [(obj.min_addr, obj.max_addr) for obj in objects] for start, end in regions: if end < start: raise AngrCFGError("Invalid region bounds (end precedes start)") if exclude_sparse_regions: regions = [r for r in regions if not self._is_region_extremely_sparse(*r, base_state=base_state)] if skip_specific_regions: if base_state is not None: l.warning("You specified both base_state and skip_specific_regions. They may conflict with each other.") regions = [r for r in regions if not self._should_skip_region(r[0])] if not regions and self.project.arch.name != "Soot": raise AngrCFGError( "Regions are empty, or all regions are skipped. You may want to manually specify regions." ) self._regions_size = sum((end - start) for start, end in regions) self._regions = SortedDict(regions) l.debug("CFG recovery covers %d regions:", len(self._regions)) for start, end in self._regions.items(): l.debug("... %#x - %#x", start, end)
def __contains__(self, cfg_node): return cfg_node in self.graph # # Properties # @property def _nodes(self): return self._model._nodes @property def _nodes_by_addr(self): return self._model._nodes_by_addr @property def model(self) -> CFGModel: """ Get the CFGModel instance. :return: The CFGModel instance that this analysis currently uses. """ return self._model @property def normalized(self): return self._model.normalized @normalized.setter def normalized(self, v): self._model.normalized = v @property def context_sensitivity_level(self): return self._context_sensitivity_level @property def functions(self): """ A reference to the FunctionManager in the current knowledge base. :return: FunctionManager with all functions :rtype: angr.knowledge_plugins.FunctionManager """ return self.kb.functions # # Methods # def _initialize_cfg(self): """ Re-create the DiGraph """ self._jobs_to_analyze_per_function = defaultdict(set) self._completed_functions = set() def _post_analysis(self): if self._normalize: if not self.normalized: self.normalize() # Call normalize() on each function for f in self.kb.functions.values(): if not self.project.is_hooked(f.addr): f.normalize() # drop all propagation results that start with "cfg_intermediate" self.kb.propagations.discard_by_prefix("cfg_intermediate")
[docs] def make_copy(self, copy_to): """ Copy self attributes to the new object. :param CFGBase copy_to: The target to copy to. :return: None """ for attr, value in self.__dict__.items(): if attr.startswith("__") and attr.endswith("__"): continue setattr(copy_to, attr, value)
# pylint: disable=no-self-use
[docs] def copy(self): raise NotImplementedError()
[docs] def output(self): raise NotImplementedError()
[docs] def generate_index(self): """ Generate an index of all nodes in the graph in order to speed up get_any_node() with anyaddr=True. :return: None """ raise NotImplementedError("I'm too lazy to implement it right now")
@deprecated(replacement="self.model.get_predecessors()") def get_predecessors(self, cfgnode, excluding_fakeret=True, jumpkind=None): return self._model.get_predecessors(cfgnode, excluding_fakeret=excluding_fakeret, jumpkind=jumpkind) @deprecated(replacement="self.model.get_successors()") def get_successors(self, node, excluding_fakeret=True, jumpkind=None): return self._model.get_successors(node, excluding_fakeret=excluding_fakeret, jumpkind=jumpkind) @deprecated(replacement="self.model.get_successors_and_jumpkind") def get_successors_and_jumpkind(self, node, excluding_fakeret=True): return self._model.get_successors_and_jumpkind(node, excluding_fakeret=excluding_fakeret) @deprecated(replacement="self.model.get_all_predecessors()") def get_all_predecessors(self, cfgnode, depth_limit=None): return self._model.get_all_predecessors(cfgnode, depth_limit) @deprecated(replacement="self.model.get_all_successors()") def get_all_successors(self, cfgnode, depth_limit=None): return self._model.get_all_successors(cfgnode, depth_limit) @deprecated(replacement="self.model.get_node()") def get_node(self, block_id): return self._model.get_node(block_id) @deprecated(replacement="self.model.get_any_node()") def get_any_node(self, addr, is_syscall=None, anyaddr=False, force_fastpath=False): return self._model.get_any_node(addr, is_syscall=is_syscall, anyaddr=anyaddr, force_fastpath=force_fastpath) @deprecated(replacement="self.model.get_all_nodes()") def get_all_nodes(self, addr, is_syscall=None, anyaddr=False): return self._model.get_all_nodes(addr, is_syscall=is_syscall, anyaddr=anyaddr) @deprecated(replacement="self.model.nodes()") def nodes(self): return self._model.nodes() @deprecated(replacement="nodes") def nodes_iter(self): """ (Decrepated) An iterator of all nodes in the graph. Will be removed in the future. :return: The iterator. :rtype: iterator """ return self.nodes()
[docs] def get_loop_back_edges(self): return self._loop_back_edges
@deprecated(replacement="self.model.get_branching_nodes()") def get_branching_nodes(self): return self._model.get_branching_nodes() @deprecated(replacement="self.model.get_exit_stmt_idx") def get_exit_stmt_idx(self, src_block, dst_block): return self._model.get_exit_stmt_idx(src_block, dst_block) @property def graph(self) -> networkx.DiGraph: raise NotImplementedError()
[docs] def remove_edge(self, block_from, block_to): if self.graph is None: raise TypeError("self.graph does not exist.") if block_from not in self.graph: raise ValueError("%r is not in CFG." % block_from) if block_to not in self.graph: raise ValueError("%r is not in CFG." % block_to) if block_to not in self.graph[block_from]: raise ValueError(f"Edge {block_from!r}->{block_to!r} does not exist.") self.graph.remove_edge(block_from, block_to)
def _merge_cfgnodes(self, cfgnode_0, cfgnode_1): """ Merge two adjacent CFGNodes into one. :param CFGNode cfgnode_0: The first CFGNode. :param CFGNode cfgnode_1: The second CFGNode. :return: None """ assert cfgnode_0.addr + cfgnode_0.size == cfgnode_1.addr new_node = cfgnode_0.merge(cfgnode_1) # Update the graph and the nodes dict accordingly self._model.remove_node(cfgnode_1.block_id, cfgnode_1) self._model.remove_node(cfgnode_0.block_id, cfgnode_0) in_edges = list(self.graph.in_edges(cfgnode_0, data=True)) out_edges = list(self.graph.out_edges(cfgnode_1, data=True)) self.graph.remove_node(cfgnode_0) self.graph.remove_node(cfgnode_1) self.graph.add_node(new_node) for src, _, data in in_edges: self.graph.add_edge(src, new_node, **data) for _, dst, data in out_edges: self.graph.add_edge(new_node, dst, **data) # Put the new node into node dicts self._model.add_node(new_node.block_id, new_node) def _to_snippet(self, cfg_node=None, addr=None, size=None, thumb=False, jumpkind=None, base_state=None): """ Convert a CFGNode instance to a CodeNode object. :param angr.analyses.CFGNode cfg_node: The CFGNode instance. :param int addr: Address of the node. Only used when `cfg_node` is None. :param bool thumb: Whether this is in THUMB mode or not. Only used for ARM code and when `cfg_node` is None. :param str or None jumpkind: Jumpkind of this node. :param SimState or None base_state: The state where BlockNode should be created from. :return: A converted CodeNode instance. :rtype: CodeNode """ if cfg_node is not None: addr = cfg_node.addr size = cfg_node.size thumb = cfg_node.thumb if addr is None: raise ValueError("_to_snippet(): Either cfg_node or addr must be provided.") if self.project.is_hooked(addr) and jumpkind != "Ijk_NoHook": hooker = self.project._sim_procedures[addr] size = hooker.kwargs.get("length", 0) return HookNode(addr, size, type(hooker)) if cfg_node is not None: return BlockNode(addr, size, thumb=thumb, bytestr=cfg_node.byte_string) # pylint: disable=no-member else: return self.project.factory.snippet( addr, size=size, jumpkind=jumpkind, thumb=thumb, backup_state=base_state )
[docs] def is_thumb_addr(self, addr): return addr in self._thumb_addrs
def _arm_thumb_filter_jump_successors(self, irsb, successors, get_ins_addr, get_exit_stmt_idx, get_jumpkind): """ Filter successors for THUMB mode basic blocks, and remove those successors that won't be taken normally. :param irsb: The IRSB object. :param list successors: A list of successors. :param func get_ins_addr: A callable that returns the source instruction address for a successor. :param func get_exit_stmt_idx: A callable that returns the source statement ID for a successor. :param func get_jumpkind: A callable that returns the jumpkind of a successor. :return: A new list of successors after filtering. :rtype: list """ if not successors: return [] if len(successors) == 1 and get_exit_stmt_idx(successors[0]) == DEFAULT_STATEMENT: # only have a default exit. no need to filter return successors if irsb.instruction_addresses and all( get_ins_addr(suc) == irsb.instruction_addresses[-1] for suc in successors ): # check if all exits are produced by the last instruction # only takes the following jump kinds: Boring, FakeRet, Call, Syscall, Ret allowed_jumpkinds = {"Ijk_Boring", "Ijk_FakeRet", "Ijk_Call", "Ijk_Ret"} successors = [ suc for suc in successors if get_jumpkind(suc) in allowed_jumpkinds or get_jumpkind(suc).startswith("Ijk_Sys") ] if len(successors) == 1: return successors can_produce_exits = set() # addresses of instructions that can produce exits bb = self._lift(irsb.addr, size=irsb.size, thumb=True) # step A: filter exits using capstone (since it's faster than re-lifting the entire block to VEX) THUMB_BRANCH_INSTRUCTIONS = { "beq", "bne", "bcs", "bhs", "bcc", "blo", "bmi", "bpl", "bvs", "bvc", "bhi", "bls", "bge", "blt", "bgt", "ble", "cbz", "cbnz", } for cs_insn in bb.capstone.insns: if cs_insn.mnemonic.split(".")[0] in THUMB_BRANCH_INSTRUCTIONS: can_produce_exits.add(cs_insn.address) if all( get_ins_addr(suc) in can_produce_exits or get_exit_stmt_idx(suc) == DEFAULT_STATEMENT for suc in successors ): # nothing will be filtered. return successors # step B: consider VEX statements it_counter = 0 conc_temps = {} for stmt in bb.vex.statements: if stmt.tag == "Ist_IMark": if it_counter > 0: it_counter -= 1 can_produce_exits.add(stmt.addr + stmt.delta) elif stmt.tag == "Ist_WrTmp": val = stmt.data if val.tag == "Iex_Const": conc_temps[stmt.tmp] = val.con.value elif stmt.tag == "Ist_Put": if stmt.offset == self.project.arch.registers["itstate"][0]: val = stmt.data if val.tag == "Iex_RdTmp": if val.tmp in conc_temps: # We found an IT instruction!! # Determine how many instructions are conditional it_counter = 0 itstate = conc_temps[val.tmp] while itstate != 0: it_counter += 1 itstate >>= 8 elif val.tag == "Iex_Const": it_counter = 0 itstate = val.con.value while itstate != 0: it_counter += 1 itstate >>= 8 if it_counter != 0: l.debug("Basic block ends before calculated IT block (%#x)", irsb.addr) successors_filtered = [ suc for suc in successors if get_ins_addr(suc) in can_produce_exits or get_exit_stmt_idx(suc) == DEFAULT_STATEMENT ] return successors_filtered # Methods for determining scanning scope def _inside_regions(self, address: int) -> bool: """ Check if the address is inside any existing region. :param int address: Address to check. :return: True if the address is within one of the memory regions, False otherwise. """ try: start_addr = next(self._regions.irange(maximum=address, reverse=True)) except StopIteration: return False else: return address < self._regions[start_addr] def _get_min_addr(self) -> Optional[int]: """ Get the minimum address out of all regions. We assume self._regions is sorted. :return: The minimum address, or None if there is no such address. """ if not self._regions: if self.project.arch.name != "Soot": l.error("self._regions is empty or not properly set.") return None return next(self._regions.irange()) def _next_address_in_regions(self, address: int) -> Optional[int]: """ Return the next immediate address that is inside any of the regions. :param address: The address to start scanning. :return: The next address that is inside one of the memory regions, or None if there is no such address. """ if self._inside_regions(address): return address try: return next(self._regions.irange(minimum=address, reverse=False)) except StopIteration: return None def _is_region_extremely_sparse(self, start: int, end: int, base_state: Optional["SimState"] = None) -> bool: """ Check whether the given memory region is extremely sparse, i.e., all bytes are the same value. :param start: The beginning of the region. :param end: The end of the region (exclusive). :param base_state: The base state (optional). :return: True if the region is extremely sparse, False otherwise. """ all_bytes = None if base_state is not None: all_bytes = base_state.memory.load(start, end - start) try: all_bytes = base_state.solver.eval(all_bytes, cast_to=bytes) except SimError: all_bytes = None size = end - start if all_bytes is None: # load from the binary all_bytes = self._fast_memory_load_bytes(start, size) if all_bytes is None: return True if len(all_bytes) < size: l.warning( "_is_region_extremely_sparse: The given region %#x-%#x is not a continuous memory region in the " "memory space. Only the first %d bytes (%#x-%#x) are processed.", start, end, len(all_bytes), start, start + len(all_bytes), ) the_byte_value = None for b in all_bytes: if the_byte_value is None: the_byte_value = b else: if the_byte_value != b: return False return True def _should_skip_region(self, region_start): """ Some regions usually do not contain any executable code, but are still marked as executable. We should skip those regions by default. :param int region_start: Address of the beginning of the region. :return: True/False :rtype: bool """ obj = self.project.loader.find_object_containing(region_start, membership_check=False) if obj is None: return False if isinstance(obj, PE): section = obj.find_section_containing(region_start) if section is None: return False if section.name in {".textbss"}: return True return False def _executable_memory_regions(self, objects=None, force_segment=False): """ Get all executable memory regions from the binaries :param objects: A collection of binary objects to collect regions from. If None, regions from all project binary objects are used. :param bool force_segment: Rely on binary segments instead of sections. :return: A sorted list of tuples (beginning_address, end_address) """ if objects is None: binaries = self.project.loader.all_objects else: binaries = objects memory_regions = [] for b in binaries: if isinstance(b, ELF): # If we have sections, we get result from sections if not force_segment and b.sections: # Get all executable sections for section in b.sections: if section.is_executable: tpl = (section.min_addr, section.max_addr + 1) memory_regions.append(tpl) else: # Get all executable segments for segment in b.segments: if segment.is_executable: tpl = (segment.min_addr, segment.max_addr + 1) memory_regions.append(tpl) elif isinstance(b, PE): for section in b.sections: if section.is_executable: tpl = (section.min_addr, section.max_addr + 1) memory_regions.append(tpl) elif isinstance(b, MachO): if b.segments: # Get all executable segments for seg in b.segments: if seg.is_executable: # Take all sections from this segment (MachO style) for section in seg.sections: tpl = (section.min_addr, section.max_addr + 1) memory_regions.append(tpl) elif isinstance(b, Blob): # a blob is entirely executable tpl = (b.min_addr, b.max_addr + 1) memory_regions.append(tpl) elif isinstance(b, NamedRegion): # NamedRegions have no content! Ignore pass elif isinstance(b, self._cle_pseudo_objects): pass else: l.warning('Unsupported object format "%s". Treat it as an executable.', b.__class__.__name__) tpl = (b.min_addr, b.max_addr + 1) memory_regions.append(tpl) if not memory_regions: memory_regions = [(start, start + len(backer)) for start, backer in self.project.loader.memory.backers()] memory_regions = sorted(memory_regions, key=lambda x: x[0]) return memory_regions def _addr_in_exec_memory_regions(self, addr): """ Test if the address belongs to an executable memory region. :param int addr: The address to test :return: True if the address belongs to an exectubale memory region, False otherwise :rtype: bool """ for start, end in self._exec_mem_regions: if start <= addr < end: return True return False def _addrs_belong_to_same_section(self, addr_a, addr_b): """ Test if two addresses belong to the same section. :param int addr_a: The first address to test. :param int addr_b: The second address to test. :return: True if the two addresses belong to the same section or both of them do not belong to any section, False otherwise. :rtype: bool """ obj = self.project.loader.find_object_containing(addr_a, membership_check=False) if obj is None: # test if addr_b also does not belong to any object obj_b = self.project.loader.find_object_containing(addr_b, membership_check=False) if obj_b is None: return True return False src_section = obj.find_section_containing(addr_a) if src_section is None: # test if addr_b also does not belong to any section dst_section = obj.find_section_containing(addr_b) if dst_section is None: return self._addrs_belong_to_same_segment(addr_a, addr_b) return False return src_section.contains_addr(addr_b) def _addrs_belong_to_same_segment(self, addr_a, addr_b): """ Test if two addresses belong to the same segment. :param int addr_a: The first address to test. :param int addr_b: The second address to test. :return: True if the two addresses belong to the same segment or both of them do not belong to any section, False otherwise. :rtype: bool """ obj = self.project.loader.find_object_containing(addr_a, membership_check=False) if obj is None: # test if addr_b also does not belong to any object obj_b = self.project.loader.find_object_containing(addr_b, membership_check=False) if obj_b is None: return True return False src_segment = obj.find_segment_containing(addr_a) if src_segment is None: # test if addr_b also does not belong to any section dst_segment = obj.find_segment_containing(addr_b) if dst_segment is None: return True return False return src_segment.contains_addr(addr_b) def _object_has_executable_sections(self, obj): """ Check whether an object has at least one executable section. :param cle.Backend obj: The object to test. :return: None """ if obj in self._object_to_executable_sections: return self._object_to_executable_sections[obj] r = any(sec.is_executable for sec in obj.sections) self._object_to_executable_sections[obj] = r return r def _object_has_executable_segments(self, obj): """ Check whether an object has at least one executable segment. :param cle.Backend obj: The object to test. :return: None """ if obj in self._object_to_executable_segments: return self._object_to_executable_segments[obj] r = any(seg.is_executable for seg in obj.segments) self._object_to_executable_segments[obj] = r return r def _addr_hooked_or_syscall(self, addr): """ Check whether the address belongs to a hook or a syscall. :param int addr: The address to check. :return: True if the address is hooked or belongs to a syscall. False otherwise. :rtype: bool """ return self.project.is_hooked(addr) or self.project.simos.is_syscall_addr(addr) def _fast_memory_load_byte(self, addr): """ Perform a fast memory loading of a byte. :param int addr: Address to read from. :return: A char or None if the address does not exist. :rtype: int or None """ try: return self.project.loader.memory[addr] except KeyError: return None def _fast_memory_load_bytes(self, addr, length): """ Perform a fast memory loading of some data. :param int addr: Address to read from. :param int length: Size of the string to load. :return: A string or None if the address does not exist. :rtype: bytes or None """ try: return self.project.loader.memory.load(addr, length) except KeyError: return None def _fast_memory_load_pointer(self, addr, size=None): """ Perform a fast memory loading of a pointer. :param int addr: Address to read from. :param int size: Size of the pointer. Default to machine-word size. :return: A pointer or None if the address does not exist. :rtype: int """ try: return self.project.loader.memory.unpack_word(addr, size=size) except KeyError: return None def _load_func_addrs_from_symbols(self): """ Get all possible function addresses that are specified by the symbols in the binary :return: A set of addresses that are probably functions :rtype: set """ return {sym.rebased_addr for sym in self._binary.symbols if sym.is_function} def _load_func_addrs_from_eh_frame(self): """ Get possible function addresses from .eh_frame. :return: A set of addresses that are probably functions. :rtype: set """ addrs = set() if isinstance(self._binary, ELF) and self._binary.has_dwarf_info: for function_hint in self._binary.function_hints: if function_hint.source == FunctionHintSource.EH_FRAME: addrs.add(function_hint.addr) return addrs # # Analyze function features # def _determine_function_returning(self, func, all_funcs_completed=False): """ Determine if a function returns or not. A function does not return if a) it is a SimProcedure that has NO_RET being True, or b) it is completely recovered (i.e. every block of this function has been recovered, and no future block will be added to it), and it does not have a ret or any equivalent instruction. A function returns if any of its block contains a ret instruction or any equivalence. :param Function func: The function to work on. :param bool all_funcs_completed: Whether we treat all functions as completed functions or not. :return: True if the function returns, False if the function does not return, or None if it is not yet determinable with the information available at the moment. :rtype: bool or None """ if not self._inside_regions(func.addr): # we don't have a full view of this function. assume it returns return True # If there is at least one return site, then this function is definitely returning if func.has_return: return True # Let's first see if it's a known SimProcedure that does not return if self.project.is_hooked(func.addr): procedure = self.project.hooked_by(func.addr) else: try: procedure = self.project.simos.syscall_from_addr(func.addr, allow_unsupported=False) except AngrUnsupportedSyscallError: procedure = None if procedure is not None and hasattr(procedure, "NO_RET"): return not procedure.NO_RET # did we finish analyzing this function? if not all_funcs_completed and func.addr not in self._completed_functions: return None if not func.block_addrs_set: # there is no block inside this function # it might happen if the function has been incorrectly identified as part of another function # the error will be corrected during post-processing. In fact at this moment we cannot say anything # about whether this function returns or not. We always assume it returns. return True bail_out = False # if this function has jump-out sites or ret-out sites, it returns as long as any of the target function # returns for goout_site, type_ in [(site, "jumpout") for site in func.jumpout_sites] + [ (site, "retout") for site in func.retout_sites ]: # determine where it jumps/returns to goout_site_successors = goout_site.successors() # Filter out UnresolvableJumpTarget because those don't mean that we actually know where it jumps to known_successors = [ n for n in goout_site_successors if not (isinstance(n, HookNode) and n.sim_procedure == UnresolvableJumpTarget) ] if not known_successors: # not sure where it jumps to. bail out bail_out = True continue # for ret-out sites, determine what function it calls if type_ == "retout": # see whether the function being called returns or not func_successors = [succ for succ in goout_site_successors if isinstance(succ, Function)] if func_successors and all( func_successor.returning in (None, False) for func_successor in func_successors ): # the returning of all possible function calls are undetermined, or they do not return # ignore this site continue if type_ == "retout": goout_target = next((succ for succ in goout_site_successors if not isinstance(succ, Function)), None) else: goout_target = next((succ for succ in goout_site_successors), None) if goout_target is None: # there is no jumpout site, which is weird, but what can we do... continue if not self.kb.functions.contains_addr(goout_target.addr): # wait it does not jump to a function? bail_out = True continue target_func = self.kb.functions[goout_target.addr] if target_func.returning is True: return True elif target_func.returning is None: # the returning status of at least one of the target functions is not decided yet. bail_out = True if bail_out: # We cannot determine at this point. bail out return None # well this function does not return then return False def _analyze_function_features(self, all_funcs_completed=False): """ For each function in the function_manager, try to determine if it returns or not. A function does not return if it calls another function that is known to be not returning, and this function does not have other exits. We might as well analyze other features of functions in the future. :param bool all_funcs_completed: Ignore _completed_functions set and treat all functions as completed. This can be set to True after the entire CFG is built and _post_analysis() is called (at which point analysis on all functions must be completed). """ changes = {"functions_return": [], "functions_do_not_return": []} if self._updated_nonreturning_functions is not None: all_func_addrs = self._updated_nonreturning_functions # Convert addresses to objects all_functions = [ self.kb.functions.get_by_addr(f) for f in all_func_addrs if self.kb.functions.contains_addr(f) ] else: all_functions = list(self.kb.functions.values()) analyzed_functions = set() # short-hand functions: FunctionManager = self.kb.functions while all_functions: func: Function = all_functions.pop(-1) analyzed_functions.add(func.addr) if func.returning is not None: # It has been determined before. Skip it continue returning = self._determine_function_returning(func, all_funcs_completed=all_funcs_completed) if returning: func.returning = True changes["functions_return"].append(func) elif returning is False: func.returning = False changes["functions_do_not_return"].append(func) if returning is not None: # Add all callers of this function to all_functions list if func.addr in functions.callgraph: callers = functions.callgraph.predecessors(func.addr) for caller in callers: if caller in analyzed_functions: continue if functions.contains_addr(caller): all_functions.append(functions.get_by_addr(caller)) return changes def _iteratively_analyze_function_features(self, all_funcs_completed=False): """ Iteratively analyze function features until a fixed point is reached. :return: the "changes" dict :rtype: dict """ changes = {"functions_do_not_return": set(), "functions_return": set()} while True: new_changes = self._analyze_function_features(all_funcs_completed=all_funcs_completed) changes["functions_do_not_return"] |= set(new_changes["functions_do_not_return"]) changes["functions_return"] |= set(new_changes["functions_return"]) if not new_changes["functions_do_not_return"] and not new_changes["functions_return"]: # a fixed point is reached break return changes
[docs] def normalize(self): """ Normalize the CFG, making sure that there are no overlapping basic blocks. Note that this method will not alter transition graphs of each function in self.kb.functions. You may call normalize() on each Function object to normalize their transition graphs. :return: None """ graph = self.graph smallest_nodes = {} # indexed by end address of the node end_addresses_to_nodes = defaultdict(set) for n in graph.nodes(): if n.is_simprocedure: continue end_addr = n.addr + n.size key = (end_addr, n.callstack_key) # add the new item end_addresses_to_nodes[key].add(n) for key in list(end_addresses_to_nodes.keys()): if len(end_addresses_to_nodes[key]) == 1: smallest_nodes[key] = next(iter(end_addresses_to_nodes[key])) del end_addresses_to_nodes[key] while end_addresses_to_nodes: key_to_find = (None, None) for tpl, x in end_addresses_to_nodes.items(): if len(x) > 1: key_to_find = tpl break end_addr, callstack_key = key_to_find all_nodes = end_addresses_to_nodes[key_to_find] all_nodes = sorted(all_nodes, key=lambda node: node.addr, reverse=True) smallest_node = all_nodes[0] # take the one that has the highest address other_nodes = all_nodes[1:] self._normalize_core( graph, callstack_key, smallest_node, other_nodes, smallest_nodes, end_addresses_to_nodes ) del end_addresses_to_nodes[key_to_find] # make sure the smallest node is stored in end_addresses smallest_nodes[key_to_find] = smallest_node # corner case # sometimes two overlapping blocks may not be ending at the instruction. this might happen when one of the # blocks (the bigger one) hits the instruction count limit or bytes limit before reaching the end address # of the smaller block. in this case we manually pick up those blocks. if not end_addresses_to_nodes: # find if there are still overlapping blocks sorted_smallest_nodes = defaultdict(list) # callstack_key is the key of this dict for k, node in smallest_nodes.items(): _, callstack_key = k sorted_smallest_nodes[callstack_key].append(node) for k in sorted_smallest_nodes.keys(): sorted_smallest_nodes[k] = sorted(sorted_smallest_nodes[k], key=lambda node: node.addr) for callstack_key, lst in sorted_smallest_nodes.items(): lst_len = len(lst) for i, node in enumerate(lst): if i == lst_len - 1: break next_node = lst[i + 1] if node is not next_node and node.addr <= next_node.addr < node.addr + node.size: # umm, those nodes are overlapping, but they must have different end addresses nodekey_a = node.addr + node.size, callstack_key nodekey_b = next_node.addr + next_node.size, callstack_key if nodekey_a == nodekey_b: # error handling: this will only happen if we have completely overlapping nodes # caused by different jumps (one of the jumps is probably incorrect), which usually # indicates an error in CFG recovery. we print a warning and skip this node l.warning( "Found completely overlapping nodes %s. It usually indicates an error in CFG " "recovery. Skip.", node, ) continue if nodekey_a in smallest_nodes and nodekey_b in smallest_nodes: # misuse end_addresses_to_nodes end_addresses_to_nodes[(node.addr + node.size, callstack_key)].add(node) end_addresses_to_nodes[(node.addr + node.size, callstack_key)].add(next_node) smallest_nodes.pop(nodekey_a, None) smallest_nodes.pop(nodekey_b, None) self.normalized = True
def _normalize_core( self, graph: networkx.DiGraph, callstack_key, smallest_node, other_nodes, smallest_nodes, end_addresses_to_nodes ): # Break other nodes for n in other_nodes: new_size = get_real_address_if_arm(self.project.arch, smallest_node.addr) - get_real_address_if_arm( self.project.arch, n.addr ) if new_size == 0: # This node has the same size as the smallest one. Don't touch it. continue new_end_addr = n.addr + new_size # Does it already exist? new_node = None key = (new_end_addr, n.callstack_key) # the logic below is a little convoluted. we check if key exists in either end_address_to_nodes or # smallest_nodes, since we don't always add the new node back to end_addresses_to_nodes dict - we only do so # when there are more than one node with that key. if key in end_addresses_to_nodes: new_node = next((i for i in end_addresses_to_nodes[key] if i.addr == n.addr), None) if new_node is None: if key in smallest_nodes and smallest_nodes[key].addr == n.addr: new_node = smallest_nodes[key] if new_node is None: # Create a new one if self.tag == "CFGFast": new_node = CFGNode( n.addr, new_size, self.model, function_address=n.function_address, block_id=n.block_id, instruction_addrs=[i for i in n.instruction_addrs if n.addr <= i <= n.addr + new_size], thumb=n.thumb, ) elif self.tag == "CFGEmulated": new_node = CFGENode( n.addr, new_size, self.model, callstack_key=callstack_key, function_address=n.function_address, block_id=n.block_id, instruction_addrs=[i for i in n.instruction_addrs if n.addr <= i <= n.addr + new_size], thumb=n.thumb, ) else: raise ValueError("Unknown tag %s." % self.tag) # Copy instruction addresses new_node.instruction_addrs = [ ins_addr for ins_addr in n.instruction_addrs if ins_addr < n.addr + new_size ] # Put the new node into end_addresses list if key in smallest_nodes: end_addresses_to_nodes[key].add(smallest_nodes[key]) end_addresses_to_nodes[key].add(new_node) else: smallest_nodes[key] = new_node # Modify the CFG original_predecessors = list(graph.in_edges([n], data=True)) original_successors = list(graph.out_edges([n], data=True)) if smallest_node not in graph: continue for _s, d, data in original_successors: ins_addr = data.get("ins_addr", None) # ins_addr might be None for FakeRet edges if ins_addr is None and data.get("jumpkind", None) != "Ijk_FakeRet": l.warning( "Unexpected edge with ins_addr being None: %s -> %s, data = %s.", _s, d, str(data), ) if ins_addr is not None and ins_addr < smallest_node.addr: continue if d not in graph[smallest_node]: if d is n: graph.add_edge(smallest_node, new_node, **data) else: graph.add_edge(smallest_node, d, **data) for p, _, _ in original_predecessors: graph.remove_edge(p, n) if n in graph: graph.remove_node(n) # Update nodes dict self._model.remove_node(n.block_id, n) self._model.add_node(n.block_id, new_node) for p, _, data in original_predecessors: # Consider the following case: two basic blocks ending at the same position, where A is larger, and # B is smaller. Suppose there is an edge going from the end of A to A itself, and apparently there # is another edge from B to A as well. After splitting A into A' and B, we DO NOT want to add A back # in, otherwise there will be an edge from A to A`, while A should totally be got rid of in the new # graph. if p not in other_nodes: graph.add_edge(p, new_node, **data) # We should find the correct successor new_successors = [i for i in [smallest_node] + other_nodes if i.addr == smallest_node.addr] if new_successors: new_successor = new_successors[0] graph.add_edge(new_node, new_successor, jumpkind="Ijk_Boring", ins_addr=new_node.instruction_addrs[-1]) else: # We gotta create a new one l.error("normalize(): Please report it to Fish.") # deal with duplicated entries in self.jump_tables and self.indirect_jumps if smallest_node.addr in self.model.jump_tables: for n in other_nodes: if n.addr in self.model.jump_tables: del self.model.jump_tables[n.addr] if smallest_node.addr in self.indirect_jumps: for n in other_nodes: if n.addr in self.indirect_jumps: del self.indirect_jumps[n.addr] # # Job management # def _register_analysis_job(self, func_addr, job): """ Register an analysis job of a function to job manager. This allows us to track whether we have finished analyzing/recovering a function or not. :param int func_addr: Address of the function that this job belongs to. :param job: The job to register. Note that it does not necessarily be the a CFGJob instance. There can be PendingJob or PendingJob or other instances, too. :return: None """ self._jobs_to_analyze_per_function[func_addr].add(job) def _deregister_analysis_job(self, func_addr, job): """ Deregister/Remove an analysis job of a function from job manager. :param int func_addr: Address of the function that this job belongs to. :param job: The job to deregister. :return: None """ self._jobs_to_analyze_per_function[func_addr].discard(job) def _get_finished_functions(self): """ Obtain all functions of which we have finished analyzing. As _jobs_to_analyze_per_function is a defaultdict(), if a function address shows up in it with an empty job list, we consider we have exhausted all jobs of this function (both current jobs and pending jobs), thus the analysis of this function is done. :return: a list of function addresses of that we have finished analysis. :rtype: list """ finished_func_addrs = [] for func_addr, all_jobs in self._jobs_to_analyze_per_function.items(): if not all_jobs: # great! we have finished analyzing this function! finished_func_addrs.append(func_addr) return finished_func_addrs def _cleanup_analysis_jobs(self, finished_func_addrs=None): """ From job manager, remove all functions of which we have finished analysis. :param list or None finished_func_addrs: A list of addresses of functions of which we have finished analysis. A new list of function addresses will be obtained by calling _get_finished_functions() if this parameter is None. :return: None """ if finished_func_addrs is None: finished_func_addrs = self._get_finished_functions() for func_addr in finished_func_addrs: if func_addr in self._jobs_to_analyze_per_function: del self._jobs_to_analyze_per_function[func_addr] def _make_completed_functions(self): """ Fill in self._completed_functions list and clean up job manager. :return: None """ finished = self._get_finished_functions() for func_addr in finished: self._completed_functions.add(func_addr) self._cleanup_analysis_jobs(finished_func_addrs=finished) # # Function identification and such #
[docs] def mark_function_alignments(self): """ Find all potential function alignments and mark them. Note that it is not always correct to simply remove them, because these functions may not be actual alignments but part of an actual function, and is incorrectly marked as an individual function because of failures in resolving indirect jumps. An example is in the test binary ``x86_64/dir_gcc_-O0`` 0x40541d (indirect jump at 0x4051b0). If the indirect jump cannot be correctly resolved, removing function 0x40541d will cause a missing label failure in reassembler. :return: None """ # This function requires Capstone engine support if not self.project.arch.capstone_support: return for func_addr in self.kb.functions.keys(): function = self.kb.functions[func_addr] if function.is_simprocedure or function.is_syscall: continue if len(function.block_addrs_set) == 1: block = next((b for b in function.blocks), None) if block is None: continue if all(self._is_noop_insn(insn) for insn in block.capstone.insns): # all nops. mark this function as a function alignment l.debug("Function chunk %#x is probably used as a function alignment (all nops).", func_addr) self.kb.functions[func_addr].alignment = True continue node = function.get_node(block.addr) successors = list(function.graph.successors(node)) if len(successors) == 1 and successors[0].addr == node.addr: # self loop. mark this function as a function alignment l.debug("Function chunk %#x is probably used as a function alignment (self-loop).", func_addr) self.kb.functions[func_addr].alignment = True continue
[docs] def make_functions(self): """ Revisit the entire control flow graph, create Function instances accordingly, and correctly put blocks into each function. Although Function objects are crated during the CFG recovery, they are neither sound nor accurate. With a pre-constructed CFG, this method rebuilds all functions bearing the following rules: - A block may only belong to one function. - Small functions lying inside the startpoint and the endpoint of another function will be merged with the other function - Tail call optimizations are detected. - PLT stubs are aligned by 16. :return: None """ # TODO: Is it required that PLT stubs are always aligned by 16? If so, on what architectures and platforms is it # TODO: enforced? tmp_functions = self.kb.functions.copy() for function in tmp_functions.values(): function.mark_nonreturning_calls_endpoints() # Clear old functions dict self.kb.functions.clear() blockaddr_to_function = {} traversed_cfg_nodes = set() function_nodes = set() # Find nodes for beginnings of all functions for _, dst, data in self.graph.edges(data=True): jumpkind = data.get("jumpkind", "") if jumpkind == "Ijk_Call" or jumpkind.startswith("Ijk_Sys"): function_nodes.add(dst) entry_node = self.model.get_any_node(self._binary.entry) if entry_node is not None: function_nodes.add(entry_node) # aggressively remove and merge functions # For any function, if there is a call to it, it won't be removed called_function_addrs = {n.addr for n in function_nodes} # Any function addresses that appear as symbols won't be removed predetermined_function_addrs = called_function_addrs | self._function_addresses_from_symbols removed_functions_a = self._process_irrational_functions( tmp_functions, predetermined_function_addrs, blockaddr_to_function ) removed_functions_b, adjusted_cfgnodes = self._process_irrational_function_starts( tmp_functions, predetermined_function_addrs, blockaddr_to_function ) self._process_jump_table_targeted_functions( tmp_functions, predetermined_function_addrs, blockaddr_to_function, ) removed_functions = removed_functions_a | removed_functions_b # Remove all nodes that are adjusted function_nodes.difference_update(adjusted_cfgnodes) for n in self.graph.nodes(): if n.addr in tmp_functions or n.addr in removed_functions: function_nodes.add(n) # traverse the graph starting from each node, not following call edges # it's important that we traverse all functions in order so that we have a greater chance to come across # rational functions before its irrational counterparts (e.g. due to failed jump table resolution) min_stage_2_progress = 50.0 max_stage_2_progress = 90.0 nodes_count = len(function_nodes) for i, fn in enumerate(sorted(function_nodes, key=lambda n: n.addr)): if self._low_priority: self._release_gil(i, 800, 0.000001) if self._show_progressbar or self._progress_callback: progress = min_stage_2_progress + (max_stage_2_progress - min_stage_2_progress) * ( i * 1.0 / nodes_count ) self._update_progress(progress) self._graph_bfs_custom( self.graph, [fn], self._graph_traversal_handler, blockaddr_to_function, tmp_functions, traversed_cfg_nodes, ) # Don't forget those small function chunks that are not called by anything. # There might be references to them from data, or simply references that we cannot find via static analysis secondary_function_nodes = set() # add all function chunks ("functions" that are not called from anywhere) for func_addr in tmp_functions: node = self.model.get_any_node(func_addr) if node is None: continue if node.addr not in blockaddr_to_function: secondary_function_nodes.add(node) missing_cfg_nodes = set(self.graph.nodes()) - traversed_cfg_nodes missing_cfg_nodes = {node for node in missing_cfg_nodes if node.function_address is not None} if missing_cfg_nodes: l.debug("%d CFGNodes are missing in the first traversal.", len(missing_cfg_nodes)) secondary_function_nodes |= missing_cfg_nodes min_stage_3_progress = 90.0 max_stage_3_progress = 99.9 nodes_count = len(secondary_function_nodes) for i, fn in enumerate(sorted(secondary_function_nodes, key=lambda n: n.addr)): if self._show_progressbar or self._progress_callback: progress = min_stage_3_progress + (max_stage_3_progress - min_stage_3_progress) * ( i * 1.0 / nodes_count ) self._update_progress(progress) self._graph_bfs_custom( self.graph, [fn], self._graph_traversal_handler, blockaddr_to_function, tmp_functions ) to_remove = set() # Remove all stubs after PLT entries if not is_arm_arch(self.project.arch): to_remove |= self._remove_dummy_plt_stubs(self.kb.functions) # remove empty functions for func in self.kb.functions.values(): if func.startpoint is None: to_remove.add(func.addr) for addr in to_remove: del self.kb.functions[addr] # Update CFGNode.function_address for node in self._nodes.values(): if node.addr in blockaddr_to_function: node.function_address = blockaddr_to_function[node.addr].addr # Update function.info for func in self.kb.functions.values(): if func.addr in tmp_functions: func.info = tmp_functions[func.addr].info
def _remove_dummy_plt_stubs(self, functions): def _is_function_a_plt_stub(arch_, func): if len(func.block_addrs_set) != 1: # multiple blocks? no idea what this is... return False block = next(func.blocks) if self._is_noop_block(arch_, block): # alignments return False if ( arch_.name in {"X86", "AMD64"} and len(block.vex.instruction_addresses) == 2 and block.vex.jumpkind == "Ijk_Boring" ): # push ordinal; jump _resolve return False # TODO: We may want to add support for filtering dummy PLT stubs for other architectures, but I haven't # TODO: seen any need for those. return True to_remove = set() met_plts = False non_plt_funcs = 0 sorted_func_addrs = sorted(functions.keys()) arch = self.project.arch # we assume all PLT entries are all located at the same region. the moment we come across the end of it, we # stop looping. for fn_addr in sorted_func_addrs: fn = functions.get_by_addr(fn_addr) addr = fn.addr - (fn.addr % 16) if addr != fn.addr: if addr in functions and functions[addr].is_plt and not _is_function_a_plt_stub(arch, fn): to_remove.add(fn.addr) continue if fn.is_plt: met_plts = True non_plt_funcs = 0 if met_plts and not fn.is_plt: non_plt_funcs += 1 if non_plt_funcs >= 2: break return to_remove def _process_irrational_functions(self, functions, predetermined_function_addrs, blockaddr_to_function): """ When force_complete_scan is enabled, for unresolveable indirect jumps, angr will find jump targets and mark them as individual functions. For example, usually the following pattern is seen: sub_0x400010: push ebp mov esp, ebp ... cmp eax, 10 ja end mov eax, jumptable[eax] jmp eax sub_0x400080: # do something here jmp end end (0x400e00): pop ebp ret In the example above, `process_irrational_functions` will remove function 0x400080, and merge it with function 0x400010. :param angr.knowledge_plugins.FunctionManager functions: all functions that angr recovers, including those ones that are misidentified as functions. :param dict blockaddr_to_function: A mapping between block addresses and Function instances. :return: A set of addresses of all removed functions :rtype: set """ functions_to_remove = {} all_func_addrs = sorted(set(functions.keys())) for func_pos, (func_addr, function) in enumerate(functions.items()): if func_addr in functions_to_remove: continue # check all blocks and see if any block ends with an indirect jump and is not resolved has_unresolved_jumps = False # the functions to merge with must be locating between the unresolved basic block address and the endpoint # of the current function max_unresolved_jump_addr = 0 for block_addr in function.block_addrs_set: if ( block_addr in self.indirect_jumps and self.indirect_jumps[block_addr].jumpkind == "Ijk_Boring" and not self.indirect_jumps[block_addr].resolved_targets ): # it's not resolved # we should also make sure it's a jump, not a call has_unresolved_jumps = True max_unresolved_jump_addr = max(max_unresolved_jump_addr, block_addr) if not has_unresolved_jumps: continue if function.startpoint is None: continue startpoint_addr = function.startpoint.addr if not function.endpoints: # Function should have at least one endpoint continue endpoint_addr = max(a.addr for a in function.endpoints) the_endpoint = next(a for a in function.endpoints if a.addr == endpoint_addr) endpoint_addr += the_endpoint.size # sanity check: startpoint of the function should be greater than its endpoint if startpoint_addr >= endpoint_addr: continue if max_unresolved_jump_addr <= startpoint_addr or max_unresolved_jump_addr >= endpoint_addr: continue # scan forward from the endpoint to include any function-tail jumps # Here is an example: # loc_8049562: # mov eax, ebp # add esp, 3ch # ... # ret # loc_804956c: # mov ebp, 3 # jmp loc_8049562 # loc_8049573: # mov ebp, 4 # jmp loc_8049562 # last_addr = endpoint_addr while True: try: # do not follow hooked addresses (such as SimProcedures) if self.project.is_hooked(last_addr): break next_block = self._lift(last_addr) next_block_irsb = next_block.vex_nostmt if next_block_irsb.jumpkind not in ("Ijk_Boring", "Ijk_InvalICache"): break if not isinstance(next_block_irsb.next, pyvex.IRExpr.Const): break suc_addr = next_block_irsb.next.con.value if ( max(startpoint_addr, the_endpoint.addr - 0x40) <= suc_addr < the_endpoint.addr + the_endpoint.size ): # increment the endpoint_addr endpoint_addr = next_block.addr + next_block.size else: break last_addr = next_block.addr + next_block.size except (SimTranslationError, SimMemoryError, SimIRSBError, SimEngineError): break # find all functions that are between [ startpoint, endpoint ] should_merge = True functions_to_merge = set() i = func_pos + 1 while i < len(all_func_addrs): f_addr = all_func_addrs[i] i += 1 f = functions[f_addr] if f_addr == func_addr: continue if max_unresolved_jump_addr < f_addr < endpoint_addr and all( max_unresolved_jump_addr < b_addr < endpoint_addr for b_addr in f.block_addrs ): if f_addr in functions_to_remove: # this function has already been merged with other functions before... it cannot be merged with # this function anymore should_merge = False break if f_addr in predetermined_function_addrs: # this function is a legit one. it shouldn't be removed/merged should_merge = False break functions_to_merge.add(f_addr) if not should_merge: # we shouldn't merge... continue for f_addr in functions_to_merge: functions_to_remove[f_addr] = func_addr # merge all functions for to_remove, merge_with in functions_to_remove.items(): func_merge_with = self._addr_to_function(merge_with, blockaddr_to_function, functions) for block_addr in functions[to_remove].block_addrs: blockaddr_to_function[block_addr] = func_merge_with del functions[to_remove] return set(functions_to_remove.keys()) def _process_irrational_function_starts(self, functions, predetermined_function_addrs, blockaddr_to_function): """ Functions that are identified via function prologues can be starting after the actual beginning of the function. For example, the following function (with an incorrect start) might exist after a CFG recovery: sub_8049f70: push esi sub_8049f71: sub esp, 0A8h mov esi, [esp+0ACh+arg_0] mov [esp+0ACh+var_88], 0 If the following conditions are met, we will remove the second function and merge it into the first function: - The second function is not called by other code. - The first function has only one jumpout site, which points to the second function. :param FunctionManager functions: All functions that angr recovers. :return: A set of addresses of all removed functions. :rtype: set """ addrs = sorted( k for k in functions.keys() if not self.project.is_hooked(k) and not self.project.simos.is_syscall_addr(k) ) functions_to_remove = set() adjusted_cfgnodes = set() for addr_0, addr_1 in zip(addrs[:-1], addrs[1:]): if addr_1 in predetermined_function_addrs: continue if addr_0 in functions_to_remove: continue func_0 = functions[addr_0] if 1 <= len(func_0.block_addrs_set) <= 4: if len(func_0.jumpout_sites) != 1: continue block_node = func_0.jumpout_sites[0] if block_node is None: continue if block_node.size == 0: # skip empty blocks (that are usually caused by lifting failures) continue block = func_0.get_block(block_node.addr, block_node.size) if block.vex.jumpkind not in ("Ijk_Boring", "Ijk_InvalICache"): continue # Skip alignment blocks if self._is_noop_block(self.project.arch, block): continue # does the first block transition to the next function? transition_found = False out_edges = list(func_0.transition_graph.out_edges(block_node, data=True)) for _, dst_node, data in out_edges: if ( dst_node.addr == addr_1 and data.get("type", None) == "transition" and data.get("outside", False) is True ): transition_found = True break if not transition_found: continue cfgnode_0 = self.model.get_any_node(block_node.addr) cfgnode_1 = self.model.get_any_node(addr_1) if cfgnode_0 is None or cfgnode_1 is None: continue # who's jumping to or calling cfgnode_1? cfgnode_1_preds = self.model.get_predecessors_and_jumpkinds(cfgnode_1, excluding_fakeret=True) func_1 = functions[addr_1] abort = False for pred, jumpkind in cfgnode_1_preds: if pred.addr in func_0.block_addrs_set and jumpkind == "Ijk_Boring": # this is the transition from function 0 continue if pred.addr in func_1.block_addrs_set: # this is a transition from function 1 itself continue # found an unexpected edge. give up abort = True break if abort: continue # Merge block addr_0 and block addr_1 l.debug("Merging function %#x into %#x.", addr_1, addr_0) cfgnode_1_merged = False # we only merge two CFG nodes if the first one does not end with a branch instruction if ( len(func_0.block_addrs_set) == 1 and len(out_edges) == 1 and out_edges[0][0].addr == cfgnode_0.addr and out_edges[0][0].size == cfgnode_0.size and self.project.factory.block(cfgnode_0.addr, strict_block_end=True).size > cfgnode_0.size ): cfgnode_1_merged = True self._merge_cfgnodes(cfgnode_0, cfgnode_1) adjusted_cfgnodes.add(cfgnode_0) adjusted_cfgnodes.add(cfgnode_1) # Merge it func_1 = functions[addr_1] for block_addr in func_1.block_addrs: if block_addr == addr_1 and cfgnode_1_merged: # Skip addr_1 (since it has been merged to the preceding block) continue merge_with = self._addr_to_function(addr_0, blockaddr_to_function, functions) blockaddr_to_function[block_addr] = merge_with functions_to_remove.add(addr_1) for to_remove in functions_to_remove: del functions[to_remove] return functions_to_remove, adjusted_cfgnodes def _process_jump_table_targeted_functions( self, functions, predetermined_function_addrs, blockaddr_to_function ) -> Set[int]: """ Sometimes compilers will optimize "cold" code regions, make them separate functions, mark them as cold, which conflicts with how angr handles jumps to these functions (because they weren't functions to start with). Here is an example (in function version_etc_arn() from gllib):: switch (n_authors) { case 0: abort(); case 1: ... } GCC may decide to move the `abort();` block under case 0 into a separate function (usually named "version_etc_arn_cold") and mark it as "cold." When loading function hints from eh frame is enabled, this function will be identified, and the recovered switch-case structure will have a jump to a function. It's usually not a problem until we need to decompile this function, where (at least for now) angr decompiler requires all switch-case entry blocks must belong to the same function. The temporary solution is identifying functions that (a) have no call predecessors, and (b) are used as jump targets for identified jump tables. Remove these functions so that they can be treated as part of the source function where the corresponding jump table belongs. """ jumptable_entries: Set[int] = set() for jt in self.model.jump_tables.values(): jumptable_entries |= set(jt.jumptable_entries) if not jumptable_entries: return set() functions_to_remove = set() for func_addr in functions.keys(): if func_addr in predetermined_function_addrs: continue if func_addr in jumptable_entries: # is there any call edge pointing to it? func_node = self.get_any_node(func_addr) if func_node is not None: in_edges = self.graph.in_edges(func_node, data=True) has_transition_pred = None has_non_transition_pred = None for _, _, data in in_edges: if data.get("jumpkind", None) == "Ijk_Boring": has_transition_pred = True else: has_non_transition_pred = True if has_transition_pred is True and not has_non_transition_pred: # all predecessors are transition-only # remove this function functions_to_remove.add(func_addr) for to_remove in functions_to_remove: del functions[to_remove] if to_remove in blockaddr_to_function: del blockaddr_to_function[to_remove] return functions_to_remove def _addr_to_function(self, addr, blockaddr_to_function, known_functions): """ Convert an address to a Function object, and store the mapping in a dict. If the block is known to be part of a function, just return that function. :param int addr: Address to convert :param dict blockaddr_to_function: A mapping between block addresses to Function instances. :param angr.knowledge_plugins.FunctionManager known_functions: Recovered functions. :return: a Function object :rtype: angr.knowledge.Function """ if addr in blockaddr_to_function: f = blockaddr_to_function[addr] else: is_syscall = self.project.simos.is_syscall_addr(addr) n = self.model.get_any_node(addr, is_syscall=is_syscall) if n is None: node = addr else: node = self._to_snippet(n) if isinstance(addr, SootAddressDescriptor): addr = addr.method self.kb.functions._add_node(addr, node, syscall=is_syscall) f = self.kb.functions.function(addr=addr) blockaddr_to_function[addr] = f function_is_returning = False if addr in known_functions: if known_functions.function(addr).returning: f.returning = True function_is_returning = True if not function_is_returning: # We will rerun function feature analysis on this function later. Add it to # self._updated_nonreturning_functions so it can be picked up by function feature analysis later. if self._updated_nonreturning_functions is not None: self._updated_nonreturning_functions.add(addr) return f def _is_tail_call_optimization( self, g: networkx.DiGraph, src_addr, dst_addr, src_function, all_edges, known_functions, blockaddr_to_function ): """ If source and destination belong to the same function, and the following criteria apply: - source node has only one default exit - destination is not one of the known functions - destination does not belong to another function, or destination belongs to the same function that source belongs to - at the end of the block, the SP offset is 0 - for all other edges that are pointing to the destination node, their source nodes must only have one default exit, too :return: True if it is a tail-call optimization. False otherwise. :rtype: bool """ def _has_more_than_one_exit(node_): # Do not consider FakeRets as counting as multiple exits here. out_edges = [e for e in g.out_edges(node_) if g.get_edge_data(*e)["jumpkind"] != "Ijk_FakeRet"] return len(out_edges) > 1 if len(src_function.block_addrs_set) > 10: # ignore functions unless they are extremely small return False if len(all_edges) == 1 and dst_addr != src_addr: the_edge = next(iter(all_edges)) _, dst, data = the_edge if data.get("stmt_idx", None) != DEFAULT_STATEMENT: return False # relift the source node to make sure it's not a fall-through target full_src_node = self.project.factory.block(src_addr) if full_src_node.size >= VEX_IRSB_MAX_SIZE or full_src_node.instructions >= VEX_IRSB_MAX_INST: # we are probably hitting the max-block limit in VEX return False if full_src_node.addr <= dst_addr < full_src_node.addr + full_src_node.size: return False dst_in_edges = g.in_edges(dst, data=True) if len(dst_in_edges) > 1: # there are other edges going to the destination node. check all edges to make sure all source nodes # only have one default exit if any(data.get("stmt_idx", None) != DEFAULT_STATEMENT for _, _, data in dst_in_edges): # some nodes are jumping to the destination node via non-default edges. skip. return False if any(_has_more_than_one_exit(src_) for src_, _, _ in dst_in_edges): # at least one source node has more than just the default exit. skip. return False candidate = False if dst_addr in known_functions: # dst_addr cannot be the same as src_function.addr. Pass pass elif dst_addr in blockaddr_to_function: # it seems that we already know where this function should belong to. Pass. dst_func = blockaddr_to_function[dst_addr] if dst_func is src_function: # they belong to the same function right now, but they'd better not candidate = True # treat it as a tail-call optimization else: # we don't know where it belongs to # treat it as a tail-call optimization candidate = True if candidate: regs = {self.project.arch.sp_offset} if hasattr(self.project.arch, "bp_offset") and self.project.arch.bp_offset is not None: regs.add(self.project.arch.bp_offset) sptracker = self.project.analyses[StackPointerTracker].prep()( src_function, regs, track_memory=self._sp_tracking_track_memory ) sp_delta = sptracker.offset_after_block(src_addr, self.project.arch.sp_offset) if sp_delta == 0: return True return False def _graph_bfs_custom(self, g, starts, callback, blockaddr_to_function, known_functions, traversed_cfg_nodes=None): """ A customized control flow graph BFS implementation with the following rules: - Call edges are not followed. - Syscall edges are not followed. :param networkx.DiGraph g: The graph. :param list starts: A collection of beginning nodes to start graph traversal. :param func callback: Callback function for each edge and node. :param dict blockaddr_to_function: A mapping between block addresses to Function instances. :param angr.knowledge_plugins.FunctionManager known_functions: Already recovered functions. :param set traversed_cfg_nodes: A set of CFGNodes that are traversed before. :return: None """ stack = OrderedSet(starts) traversed = set() if traversed_cfg_nodes is None else traversed_cfg_nodes while stack: n: CFGNode = stack.pop(last=False) if n in traversed: continue traversed.add(n) if n.has_return: callback(g, n, None, {"jumpkind": "Ijk_Ret"}, blockaddr_to_function, known_functions, None) # NOTE: A block that has_return CAN have successors that aren't the return. # This is particularly the case for ARM conditional instructions. Yes, conditional rets are a thing. if g.out_degree(n) == 0: # it's a single node callback(g, n, None, None, blockaddr_to_function, known_functions, None) else: all_out_edges = g.out_edges(n, data=True) for src, dst, data in all_out_edges: callback(g, src, dst, data, blockaddr_to_function, known_functions, all_out_edges) jumpkind = data.get("jumpkind", "") if not (jumpkind == "Ijk_Call" or jumpkind.startswith("Ijk_Sys")): # Only follow non call edges if dst not in stack and dst not in traversed: stack.add(dst) def _graph_traversal_handler(self, g, src, dst, data, blockaddr_to_function, known_functions, all_edges): """ Graph traversal handler. It takes in a node or an edge, and create new functions or add nodes to existing functions accordingly. Oh, it also create edges on the transition map of functions. :param g: The control flow graph that is currently being traversed. :param CFGNode src: Beginning of the edge, or a single node when dst is None. :param CFGNode dst: Destination of the edge. For processing a single node, `dst` is None. :param dict data: Edge data in the CFG. 'jumpkind' should be there if it's not None. :param dict blockaddr_to_function: A mapping between block addresses to Function instances. :param angr.knowledge_plugins.FunctionManager known_functions: Already recovered functions. :param list or None all_edges: All edges going out from src. :return: None """ src_addr = src.addr src_function = self._addr_to_function(src_addr, blockaddr_to_function, known_functions) if src_addr not in src_function.block_addrs_set: n = self.model.get_any_node(src_addr) if n is None: node = src_addr else: node = self._to_snippet(n) self.kb.functions._add_node(src_function.addr, node) if data is None: # it's a single node only return jumpkind = data["jumpkind"] if jumpkind == "Ijk_Ret": n = self.model.get_any_node(src_addr) if n is None: from_node = src_addr else: from_node = self._to_snippet(n) self.kb.functions._add_return_from(src_function.addr, from_node, None) if dst is None: return dst_addr = dst.addr # get instruction address and statement index ins_addr = data.get("ins_addr", None) stmt_idx = data.get("stmt_idx", None) if jumpkind == "Ijk_Call" or jumpkind.startswith("Ijk_Sys"): is_syscall = jumpkind.startswith("Ijk_Sys") # It must be calling a function dst_function = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) n = self.model.get_any_node(src_addr) if n is None: src_snippet = self._to_snippet(addr=src_addr, base_state=self._base_state) else: src_snippet = self._to_snippet(cfg_node=n) # HACK: FIXME: We need a better way of representing unresolved calls and whether they return. # For now, assume UnresolvedTarget returns if we're calling to it # If the function doesn't return, don't add a fakeret! if not all_edges or (dst_function.returning is False and not dst_function.name == "UnresolvableCallTarget"): fakeret_node = None else: fakeret_node = self._one_fakeret_node(all_edges) if fakeret_node is None: fakeret_snippet = None else: fakeret_snippet = self._to_snippet(cfg_node=fakeret_node) if isinstance(dst_addr, SootAddressDescriptor): dst_addr = dst_addr.method self.kb.functions._add_call_to( src_function.addr, src_snippet, dst_addr, fakeret_snippet, syscall=is_syscall, ins_addr=ins_addr, stmt_idx=stmt_idx, ) if dst_function.returning and fakeret_node is not None: returning_target = src.addr + src.size if returning_target not in blockaddr_to_function: if returning_target not in known_functions: blockaddr_to_function[returning_target] = src_function else: self._addr_to_function(returning_target, blockaddr_to_function, known_functions) to_outside = blockaddr_to_function[returning_target] is not src_function n = self.model.get_any_node(returning_target) if n is None: try: returning_snippet = self._to_snippet(addr=returning_target, base_state=self._base_state) except SimEngineError: # it may not exist returning_snippet = None else: returning_snippet = self._to_snippet(cfg_node=n) if returning_snippet is not None: self.kb.functions._add_fakeret_to( src_function.addr, src_snippet, returning_snippet, confirmed=True, to_outside=to_outside ) elif jumpkind in ("Ijk_Boring", "Ijk_InvalICache", "Ijk_Exception"): # convert src_addr and dst_addr to CodeNodes n = self.model.get_any_node(src_addr) if n is None: src_node = src_addr else: src_node = self._to_snippet(cfg_node=n) n = self.model.get_any_node(dst_addr) if n is None: dst_node = dst_addr else: dst_node = self._to_snippet(cfg_node=n) if self._skip_unmapped_addrs: # pre-check: if source and destination do not belong to the same section, it must be jumping to another # function belong_to_same_section = self._addrs_belong_to_same_section(src_addr, dst_addr) if not belong_to_same_section: _ = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) if self._detect_tail_calls: if self._is_tail_call_optimization( g, src_addr, dst_addr, src_function, all_edges, known_functions, blockaddr_to_function ): l.debug("Possible tail-call optimization detected at function %#x.", dst_addr) # it's (probably) a tail-call optimization. we should make the destination node a new function # instead. blockaddr_to_function.pop(dst_addr, None) _ = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) self.kb.functions._add_outside_transition_to( src_function.addr, src_node, dst_node, to_function_addr=dst_addr ) self._tail_calls.add(dst_addr) # is it a jump to another function? if isinstance(dst_addr, SootAddressDescriptor): is_known_function_addr = dst_addr.method in known_functions and dst_addr.method.addr == dst_addr else: is_known_function_addr = dst_addr in known_functions if (is_known_function_addr and dst_addr != src_function.addr) or ( dst_addr in blockaddr_to_function and blockaddr_to_function[dst_addr] is not src_function ): # yes it is dst_function_addr = ( blockaddr_to_function[dst_addr].addr if dst_addr in blockaddr_to_function else dst_addr ) self.kb.functions._add_outside_transition_to( src_function.addr, src_node, dst_node, to_function_addr=dst_function_addr, is_exception=jumpkind == "Ijk_Exception", ) _ = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) else: # no it's not # add the transition code if dst_addr not in blockaddr_to_function: blockaddr_to_function[dst_addr] = src_function self.kb.functions._add_transition_to( src_function.addr, src_node, dst_node, ins_addr=ins_addr, stmt_idx=stmt_idx, is_exception=jumpkind == "Ijk_Exception", ) elif jumpkind == "Ijk_FakeRet": # convert src_addr and dst_addr to CodeNodes n = self.model.get_any_node(src_addr) if n is None: src_node = src_addr else: src_node = self._to_snippet(n) n = self.model.get_any_node(dst_addr) if n is None: dst_node = dst_addr else: dst_node = self._to_snippet(n) if dst_addr not in blockaddr_to_function: if isinstance(dst_addr, SootAddressDescriptor): if dst_addr.method not in known_functions: blockaddr_to_function[dst_addr] = src_function target_function = src_function else: target_function = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) else: if dst_addr not in known_functions: blockaddr_to_function[dst_addr] = src_function target_function = src_function else: target_function = self._addr_to_function(dst_addr, blockaddr_to_function, known_functions) else: target_function = blockaddr_to_function[dst_addr] # Figure out if the function called (not the function returned to) returns. # We may have determined that this does not happen, since the time this path # was scheduled for exploration called_function = None called_function_addr = None # Try to find the call that this fakeret goes with for _, d, e in all_edges: if e["jumpkind"] == "Ijk_Call": if d.addr in blockaddr_to_function: called_function = blockaddr_to_function[d.addr] called_function_addr = d.addr break # We may have since figured out that the called function doesn't ret. # It's important to assume that all unresolved targets do return if called_function is not None and called_function.returning is False: return to_outside = target_function is not src_function confirmed = called_function is None or called_function.returning is True self.kb.functions._add_fakeret_to( src_function.addr, src_node, dst_node, confirmed=confirmed, to_outside=to_outside, to_function_addr=called_function_addr, ) else: l.debug("Ignored jumpkind %s", jumpkind) # # Other functions # @staticmethod def _is_noop_block(arch: archinfo.Arch, block): """ Check if the block is a no-op block by checking VEX statements. :param arch: An architecture descriptor. :param block: The VEX block instance. :return: True if the entire block is a single-byte or multi-byte nop instruction, False otherwise. :rtype: bool """ if arch.name == "X86": if set(block.bytes) == {"b\x90"}: return True elif arch.name == "AMD64": if set(block.bytes) == {"b\x90"}: return True elif arch.name == "MIPS32": if arch.memory_endness == "Iend_BE": MIPS32_BE_NOOPS = { b"\x00\x20\x08\x25", # move $at, $at } insns = {block.bytes[i : i + 4] for i in range(0, block.size, 4)} if MIPS32_BE_NOOPS.issuperset(insns): return True elif is_arm_arch(arch): if block.addr & 1 == 0: # ARM mode if arch.memory_endness == archinfo.Endness.LE: ARM_NOOPS = { b"\x00\x00\x00\x00", # andeq r0, r0, r0 b"\x00\x00\xa0\xe1", # mov r0, r0 } else: # if arch.memory_endness == archinfo.Endness.BE: ARM_NOOPS = { b"\x00\x00\x00\x00", # andeq r0, r0, r0 b"\xe1\xa0\x00\x00", # mov r0, r0 } insns = {block.bytes[i : i + 4] for i in range(0, block.size, 4)} if ARM_NOOPS.issuperset(insns): return True else: # THUMB mode, 2-byte instructions if arch.memory_endness == archinfo.Endness.LE: THUMB_NOOPS = { b"\xc0\x46", # mov r8, r8 b"\xb0\x00", # add sp, #0 b"\x00\xbf", # nop } else: THUMB_NOOPS = { b"\x46\xc0", # mov r8, r8 b"\x00\xb0", # add sp, #0 b"\xbf\x00", # nop } insns = {block.bytes[i : i + 2] for i in range(0, block.size, 2)} if THUMB_NOOPS.issuperset(insns): return True # Fallback # the block is a noop block if it only has IMark statements **and** it jumps to its immediate successor. VEX # will generate such blocks when opt_level==1 and cross_insn_opt is True fallthrough_addr = block.addr + block.size next_ = block.vex.next if isinstance(next_, pyvex.IRExpr.Const) and next_.con.value == fallthrough_addr: if all((type(stmt) is pyvex.IRStmt.IMark) for stmt in block.vex.statements): return True # the block is a noop block if it only has IMark statements and IP-setting statements that set the IP to the # next location. VEX will generate such blocks when opt_level==1 and cross_insn_opt is False ip_offset = arch.ip_offset if all( (type(stmt) is pyvex.IRStmt.IMark or (type(stmt) is pyvex.IRStmt.Put and stmt.offset == ip_offset)) for stmt in block.vex.statements ): if block.vex.statements: last_stmt = block.vex.statements[-1] if isinstance(last_stmt, pyvex.IRStmt.IMark): if isinstance(next_, pyvex.IRExpr.Const) and next_.con.value == fallthrough_addr: return True return False @staticmethod def _is_noop_insn(insn): """ Check if the instruction does nothing. :param insn: The capstone insn object. :return: True if the instruction does no-op, False otherwise. """ insn_name = insn.insn_name() if insn_name == "nop": # nops return True elif insn_name == "lea": # lea reg, [reg + 0] op0, op1 = insn.operands if op0.type == 1 and op1.type == 3: # reg and mem if op0.reg == op1.mem.base and op1.mem.index == 0 and op1.mem.disp == 0: return True elif insn_name == "mov": if len(insn.operands) > 2: # mov reg_a, imm1, shift imm2 # This is not a NOP return False # mov reg_a, reg_a op0, op1 = insn.operands if op0.type == 1 and op1.type == 1: # reg and reg if op0.reg == op1.reg: return True elif insn_name in {"ud0", "ud1", "ud2"}: return True # add more types of no-op instructions here :-) return False @classmethod def _get_nop_length(cls, insns): """ Calculate the total size of leading nop instructions. :param insns: A list of capstone insn objects. :return: Number of bytes of leading nop instructions. :rtype: int """ nop_length = 0 if insns and cls._is_noop_insn(insns[0]): # see where those nop instructions terminate for insn in insns: if cls._is_noop_insn(insn): nop_length += insn.size else: break return nop_length @staticmethod def _one_fakeret_node(all_edges): """ Pick the first Ijk_FakeRet edge from all_edges, and return the destination node. :param list all_edges: A list of networkx.Graph edges with data. :return: The first FakeRet node, or None if nothing is found. :rtype: CFGNode or None """ for _, dst, data in all_edges: if data.get("jumpkind", None) == "Ijk_FakeRet": return dst return None def _lift(self, addr, *args, opt_level=1, cross_insn_opt=False, **kwargs): """ Lift a basic block of code. Will use the base state as a source of bytes if possible. """ if "backup_state" not in kwargs: kwargs["backup_state"] = self._base_state return self.project.factory.block(addr, *args, opt_level=opt_level, cross_insn_opt=cross_insn_opt, **kwargs) # # Indirect jumps processing # def _resolve_indirect_jump_timelessly(self, addr, block, func_addr, jumpkind): """ Attempt to quickly resolve an indirect jump. :param int addr: Basic block address of this indirect jump. :param block: The basic block. The type is determined by the backend being used. It's pyvex.IRSB if pyvex is used as the backend. :param int func_addr: Address of the function that this indirect jump belongs to. :param str jumpkind: The jumpkind. :return: A tuple of a boolean indicating whether the resolution is successful or not, and a list of resolved targets (ints). :rtype: tuple """ # pre-check: if re-lifting the block with full optimization (cross-instruction-optimization enabled) gives us # a constant next expression, we don't need to resolve it relifted = self.project.factory.block(block.addr, size=block.size, opt_level=1, cross_insn_opt=True).vex if isinstance(relifted.next, pyvex.IRExpr.Const): # yes! return True, [relifted.next.con.value] if block.statements is None: # make sure there are statements block = self.project.factory.block(block.addr, size=block.size).vex for res in self.timeless_indirect_jump_resolvers: if res.filter(self, addr, func_addr, block, jumpkind): r, resolved_targets = res.resolve(self, addr, func_addr, block, jumpkind) if r: return True, resolved_targets return False, [] def _indirect_jump_resolved(self, jump, jump_addr, resolved_by, targets): """ Called when an indirect jump is successfully resolved. :param IndirectJump jump: The resolved indirect jump, or None if an IndirectJump instance is not available. :param int jump_addr: Address of the resolved indirect jump. :param IndirectJumpResolver resolved_by: The resolver used to resolve this indirect jump. :param list targets: List of indirect jump targets. :param CFGJob job: The job at the start of the block containing the indirect jump. :return: None """ addr = jump.addr if jump is not None else jump_addr l.debug( "The indirect jump at %#x is successfully resolved by %s. It has %d targets.", addr, resolved_by, len(targets), ) self.kb.indirect_jumps.update_resolved_addrs(addr, targets) def _indirect_jump_unresolved(self, jump): """ Called when we cannot resolve an indirect jump. :param IndirectJump jump: The unresolved indirect jump. :return: None """ l.debug("Failed to resolve the indirect jump at %#x.", jump.addr) # tell KnowledgeBase that it's not resolved # TODO: self.kb._unresolved_indirect_jumps is not processed during normalization. Fix it. self.kb.unresolved_indirect_jumps.add(jump.addr) def _indirect_jump_encountered( self, addr: int, cfg_node: CFGNode, irsb: pyvex.IRSB, func_addr: int, stmt_idx: Union[int, str] = DEFAULT_STATEMENT, ) -> Tuple[bool, List[int], Optional[IndirectJump]]: """ Called when we encounter an indirect jump. We will try to resolve this indirect jump using timeless (fast) indirect jump resolvers. If it cannot be resolved, we will see if this indirect jump has been resolved before. :param addr: Address of the block containing the indirect jump. :param cfg_node: The CFGNode instance of the block that contains the indirect jump. :param irsb: The IRSB instance of the block that contains the indirect jump. It must be lifted with cross-instruction optimization disabled (cross_insn_opt=True when opt_level=1, or opt_level=0). :param func_addr: Address of the current function. :param stmt_idx: ID of the source statement. :return: A 3-tuple of (whether it is resolved or not, all resolved targets, an IndirectJump object if there is one or None otherwise) """ jumpkind = irsb.jumpkind l.debug("IRSB %#x has an indirect jump (%s) as its default exit.", addr, jumpkind) # try resolving it fast resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, func_addr, jumpkind) if resolved: l.debug( "Indirect jump at block %#x is resolved by a timeless indirect jump resolver. %d targets found.", addr, len(resolved_targets), ) return True, resolved_targets, None l.debug("Indirect jump at block %#x cannot be resolved by a timeless indirect jump resolver.", addr) # Add it to our set. Will process it later if user allows. # Create an IndirectJump instance if addr not in self.indirect_jumps: if self.project.arch.branch_delay_slot: if len(cfg_node.instruction_addrs) < 2: # sanity check # decoding failed when decoding the second instruction (or even the first instruction) return False, [], None ins_addr = cfg_node.instruction_addrs[-2] else: ins_addr = cfg_node.instruction_addrs[-1] ij = IndirectJump(addr, ins_addr, func_addr, jumpkind, stmt_idx, resolved_targets=[]) self.indirect_jumps[addr] = ij resolved = False else: ij: IndirectJump = self.indirect_jumps[addr] resolved = len(ij.resolved_targets) > 0 return resolved, ij.resolved_targets, ij def _process_unresolved_indirect_jumps(self): """ Resolve all unresolved indirect jumps found in previous scanning. Currently we support resolving the following types of indirect jumps: - Ijk_Call: indirect calls where the function address is passed in from a proceeding basic block - Ijk_Boring: jump tables - For an up-to-date list, see analyses/cfg/indirect_jump_resolvers :return: A set of concrete indirect jump targets (ints). :rtype: set """ l.info("%d indirect jumps to resolve.", len(self._indirect_jumps_to_resolve)) all_targets = set() idx: int jump: IndirectJump for idx, jump in enumerate(self._indirect_jumps_to_resolve): if self._low_priority: self._release_gil(idx, 50, 0.000001) all_targets |= self._process_one_indirect_jump(jump) self._indirect_jumps_to_resolve.clear() return all_targets def _process_one_indirect_jump(self, jump: IndirectJump, func_graph_complete: bool = True) -> Set: """ Resolve a given indirect jump. :param jump: The IndirectJump instance. :param func_graph_complete: True if the function graph is complete at this point (except for this indirect jump and all nodes that it dominates). Indirect jump resolvers may use the current function graph to perform sanity checks. CFGEmulated sets func_graph_complete to False while CFGFast sets it to True (because in CFGFast, indirect jumps are always resolved after direct jump jobs are processed). :return: A set of resolved indirect jump targets (ints). """ resolved = False resolved_by = None targets = None block = self._lift(jump.addr) for resolver in self.indirect_jump_resolvers: resolver.base_state = self._base_state if not resolver.filter(self, jump.addr, jump.func_addr, block, jump.jumpkind): continue resolved, targets = resolver.resolve( self, jump.addr, jump.func_addr, block, jump.jumpkind, func_graph_complete=func_graph_complete ) if resolved: resolved_by = resolver break if resolved: self._indirect_jump_resolved(jump, jump.addr, resolved_by, targets) else: self._indirect_jump_unresolved(jump) return set() if targets is None else set(targets)