Source code for angr.analyses.cfg.cfg_emulated

import itertools
import logging
import sys
from collections import defaultdict
from functools import reduce
from typing import Dict, List

import angr
import claripy
import networkx
import pyvex
from archinfo import ArchARM

from angr.analyses import ForwardAnalysis
from angr.utils.graph import GraphUtils
from angr.analyses import AnalysesHub
from ... import BP, BP_BEFORE, BP_AFTER, SIM_PROCEDURES, procedures
from ... import options as o
from ...codenode import BlockNode
from ...engines.procedure import ProcedureEngine
from ...exploration_techniques.loop_seer import LoopSeer
from ...exploration_techniques.slicecutor import Slicecutor
from ...exploration_techniques.explorer import Explorer
from ...exploration_techniques.lengthlimiter import LengthLimiter
from ...errors import (
    AngrCFGError,
    AngrError,
    AngrSkipJobNotice,
    SimError,
    SimValueError,
    SimSolverModeError,
    SimFastPathError,
    SimIRSBError,
    AngrExitError,
    SimEmptyCallStackError,
)
from ...sim_state import SimState
from ...state_plugins.callstack import CallStack
from ...state_plugins.sim_action import SimActionData
from ...knowledge_plugins.cfg import CFGENode, IndirectJump
from ...utils.constants import DEFAULT_STATEMENT
from ..cdg import CDG
from ..ddg import DDG
from ..backward_slice import BackwardSlice
from ..loopfinder import LoopFinder, Loop
from .cfg_base import CFGBase
from .cfg_job_base import BlockID, CFGJobBase

l = logging.getLogger(name=__name__)


[docs]class CFGJob(CFGJobBase): """ The job class that CFGEmulated uses. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # # local variables used during analysis # if self.jumpkind is None: # load jumpkind from path.state.scratch self.jumpkind = "Ijk_Boring" if self.state.history.jumpkind is None else self.state.history.jumpkind self.call_stack_suffix = None self.current_function = None self.cfg_node = None self.sim_successors = None self.exception_info = None self.successor_status = None self.extra_info = None
@property def block_id(self): if self._block_id is None: # generate a new block ID self._block_id = CFGEmulated._generate_block_id( self.call_stack.stack_suffix(self._context_sensitivity_level), self.addr, self.is_syscall ) return self._block_id @property def is_syscall(self): return self.jumpkind is not None and self.jumpkind.startswith("Ijk_Sys") def __hash__(self): return hash(self.block_id)
[docs]class PendingJob: """ A PendingJob is whatever will be put into our pending_exit list. A pending exit is an entry that created by the returning of a call or syscall. It is "pending" since we cannot immediately figure out whether this entry will be executed or not. If the corresponding call/syscall intentially doesn't return, then the pending exit will be removed. If the corresponding call/syscall returns, then the pending exit will be removed as well (since a real entry is created from the returning and will be analyzed later). If the corresponding call/syscall might return, but for some reason (for example, an unsupported instruction is met during the analysis) our analysis does not return properly, then the pending exit will be picked up and put into remaining_jobs list. """
[docs] def __init__( self, caller_func_addr, returning_source, state, src_block_id, src_exit_stmt_idx, src_exit_ins_addr, call_stack ): """ :param returning_source: Address of the callee function. It might be None if address of the callee is not resolvable. :param state: The state after returning from the callee function. Of course there is no way to get a precise state without emulating the execution of the callee, but at least we can properly adjust the stack and registers to imitate the real returned state. :param call_stack: A callstack. """ self.caller_func_addr = caller_func_addr self.returning_source = returning_source self.state = state self.src_block_id = src_block_id self.src_exit_stmt_idx = src_exit_stmt_idx self.src_exit_ins_addr = src_exit_ins_addr self.call_stack = call_stack
def __repr__(self): return "<PendingJob to {}, from function {}>".format( self.state.ip, hex(self.returning_source) if self.returning_source is not None else "Unknown" ) def __hash__(self): return hash( ( self.caller_func_addr, self.returning_source, self.src_block_id, self.src_exit_stmt_idx, self.src_exit_ins_addr, ) ) def __eq__(self, other): if not isinstance(other, PendingJob): return False return ( self.caller_func_addr == other.caller_func_addr and self.returning_source == other.returning_source and self.src_block_id == other.src_block_id and self.src_exit_stmt_idx == other.src_exit_stmt_idx and self.src_exit_ins_addr == other.src_exit_ins_addr )
[docs]class CFGEmulated(ForwardAnalysis, CFGBase): # pylint: disable=abstract-method """ This class represents a control-flow graph. """ tag = "CFGEmulated"
[docs] def __init__( self, context_sensitivity_level=1, start=None, avoid_runs=None, enable_function_hints=False, call_depth=None, call_tracing_filter=None, initial_state=None, starts=None, keep_state=False, indirect_jump_target_limit=100000, resolve_indirect_jumps=True, enable_advanced_backward_slicing=False, enable_symbolic_back_traversal=False, indirect_jump_resolvers=None, additional_edges=None, no_construct=False, normalize=False, max_iterations=1, address_whitelist=None, base_graph=None, iropt_level=None, max_steps=None, state_add_options=None, state_remove_options=None, model=None, ): """ All parameters are optional. :param context_sensitivity_level: The level of context-sensitivity of this CFG (see documentation for further details). It ranges from 0 to infinity. Default 1. :param avoid_runs: A list of runs to avoid. :param enable_function_hints: Whether to use function hints (constants that might be used as exit targets) or not. :param call_depth: How deep in the call stack to trace. :param call_tracing_filter: Filter to apply on a given path and jumpkind to determine if it should be skipped when call_depth is reached. :param initial_state: An initial state to use to begin analysis. :param iterable starts: A collection of starting points to begin analysis. It can contain the following three different types of entries: an address specified as an integer, a 2-tuple that includes an integer address and a jumpkind, or a SimState instance. Unsupported entries in starts will lead to an AngrCFGError being raised. :param keep_state: Whether to keep the SimStates for each CFGNode. :param resolve_indirect_jumps: Whether to enable the indirect jump resolvers for resolving indirect jumps :param enable_advanced_backward_slicing: Whether to enable an intensive technique for resolving indirect jumps :param enable_symbolic_back_traversal: Whether to enable an intensive technique for resolving indirect jumps :param list indirect_jump_resolvers: A custom list of indirect jump resolvers. If this list is None or empty, default indirect jump resolvers specific to this architecture and binary types will be loaded. :param additional_edges: A dict mapping addresses of basic blocks to addresses of successors to manually include and analyze forward from. :param bool no_construct: Skip the construction procedure. Only used in unit-testing. :param bool normalize: If the CFG as well as all Function graphs should be normalized or not. :param int max_iterations: The maximum number of iterations that each basic block should be "executed". 1 by default. Larger numbers of iterations are usually required for complex analyses like loop analysis. :param iterable address_whitelist: A list of allowed addresses. Any basic blocks outside of this collection of addresses will be ignored. :param networkx.DiGraph base_graph: A basic control flow graph to follow. Each node inside this graph must have the following properties: `addr` and `size`. CFG recovery will strictly follow nodes and edges shown in the graph, and discard any contorl flow that does not follow an existing edge in the base graph. For example, you can pass in a Function local transition graph as the base graph, and CFGEmulated will traverse nodes and edges and extract useful information. :param int iropt_level: The optimization level of VEX IR (0, 1, 2). The default level will be used if `iropt_level` is None. :param int max_steps: The maximum number of basic blocks to recover forthe longest path from each start before pausing the recovery procedure. :param state_add_options: State options that will be added to the initial state. :param state_remove_options: State options that will be removed from the initial state. """ ForwardAnalysis.__init__(self, order_jobs=base_graph is not None) CFGBase.__init__( self, "emulated", context_sensitivity_level, normalize=normalize, resolve_indirect_jumps=resolve_indirect_jumps, indirect_jump_resolvers=indirect_jump_resolvers, indirect_jump_target_limit=indirect_jump_target_limit, model=model, ) if start is not None: l.warning("`start` is deprecated. Please consider using `starts` instead in your code.") self._starts = (start,) else: if isinstance(starts, (list, set)): self._starts = tuple(starts) elif isinstance(starts, tuple) or starts is None: self._starts = starts else: raise AngrCFGError("Unsupported type of the `starts` argument.") if enable_advanced_backward_slicing or enable_symbolic_back_traversal: l.warning("`advanced backward slicing` and `symbolic back traversal` are deprecated.") l.warning( "Please use `resolve_indirect_jumps` to resolve indirect jumps using different resolvers instead." ) self._iropt_level = iropt_level self._avoid_runs = avoid_runs self._enable_function_hints = enable_function_hints self._call_depth = call_depth self._call_tracing_filter = call_tracing_filter self._initial_state = initial_state self._keep_state = keep_state self._advanced_backward_slicing = enable_advanced_backward_slicing self._enable_symbolic_back_traversal = enable_symbolic_back_traversal self._additional_edges = additional_edges if additional_edges else {} self._max_steps = max_steps self._state_add_options = state_add_options if state_add_options is not None else set() self._state_remove_options = state_remove_options if state_remove_options is not None else set() self._state_add_options.update([o.SYMBOL_FILL_UNCONSTRAINED_MEMORY, o.SYMBOL_FILL_UNCONSTRAINED_REGISTERS]) # add the track_memory_option if the enable function hint flag is set if self._enable_function_hints and o.TRACK_MEMORY_ACTIONS not in self._state_add_options: self._state_add_options.add(o.TRACK_MEMORY_ACTIONS) # more initialization self._symbolic_function_initial_state = {} self._function_input_states = None self._unresolvable_runs = set() # Stores the index for each CFGNode in this CFG after a quasi-topological sort (currently a DFS) # TODO: remove it since it's no longer used self._quasi_topological_order = {} # A copy of all entry points in this CFG. Integers self._entry_points = [] self._max_iterations = max_iterations self._address_whitelist = set(address_whitelist) if address_whitelist is not None else None self._base_graph = base_graph self._node_addr_visiting_order = [] if self._base_graph: sorted_nodes = GraphUtils.quasi_topological_sort_nodes(self._base_graph) self._node_addr_visiting_order = [n.addr for n in sorted_nodes] self._sanitize_parameters() self._executable_address_ranges = [] self._executable_address_ranges = self._executable_memory_regions() # These save input states of functions. It will be discarded after the CFG is constructed self._function_input_states = {} self._loop_back_edges = [] self._overlapped_loop_headers = [] self._pending_function_hints = set() # A dict to log edges and the jumpkind between each basic block self._edge_map = defaultdict(list) self._model._iropt_level = self._iropt_level self._start_keys = [] # a list of block IDs of all starts # For each call, we are always getting two exits: an Ijk_Call that # stands for the real call exit, and an Ijk_Ret that is a simulated exit # for the retn address. There are certain cases that the control flow # never returns to the next instruction of a callsite due to # imprecision of the concrete execution. So we save those simulated # exits here to increase our code coverage. Of course the real retn from # that call always precedes those "fake" retns. self._pending_jobs = defaultdict(list) # Dict[BlockID, List[PendingJob]] # Counting how many times a basic block is traced into self._traced_addrs = defaultdict(lambda: defaultdict(int)) # A dict that collects essential parameters to properly reconstruct initial state for a block self._block_artifacts = {} self._analyzed_addrs = set() self._non_returning_functions = set() self._pending_edges = defaultdict(list) if not no_construct: self._initialize_cfg() self._analyze()
# # Public methods #
[docs] def copy(self) -> "CFGEmulated": """ Make a copy of the CFG. :return: A copy of the CFG instance. """ new_cfg = CFGEmulated.__new__(CFGEmulated) super().make_copy(new_cfg) new_cfg._indirect_jump_target_limit = self._indirect_jump_target_limit new_cfg.named_errors = defaultdict(list, self.named_errors) new_cfg.errors = list(self.errors) new_cfg._fail_fast = self._fail_fast new_cfg._max_steps = self._max_steps new_cfg.project = self.project # Intelligently (or stupidly... you tell me) fill it up new_cfg._edge_map = self._edge_map.copy() new_cfg._loop_back_edges = self._loop_back_edges[::] new_cfg._executable_address_ranges = self._executable_address_ranges[::] new_cfg._unresolvable_runs = self._unresolvable_runs.copy() new_cfg._overlapped_loop_headers = self._overlapped_loop_headers[::] new_cfg._thumb_addrs = self._thumb_addrs.copy() new_cfg._keep_state = self._keep_state return new_cfg
[docs] def resume(self, starts=None, max_steps=None): """ Resume a paused or terminated control flow graph recovery. :param iterable starts: A collection of new starts to resume from. If `starts` is None, we will resume CFG recovery from where it was paused before. :param int max_steps: The maximum number of blocks on the longest path starting from each start before pausing the recovery. :return: None """ self._should_abort = False self._starts = starts self._max_steps = max_steps if self._starts is None: self._starts = [] if self._starts: self._sanitize_starts() self._analyze()
[docs] def remove_cycles(self): """ Forces graph to become acyclic, removes all loop back edges and edges between overlapped loop headers and their successors. """ # loop detection # only detect loops after potential graph normalization if not self._loop_back_edges: l.debug("Detecting loops...") self._detect_loops() l.debug("Removing cycles...") l.debug("There are %d loop back edges.", len(self._loop_back_edges)) l.debug("And there are %d overlapping loop headers.", len(self._overlapped_loop_headers)) # First break all detected loops for b1, b2 in self._loop_back_edges: if self._graph.has_edge(b1, b2): l.debug("Removing loop back edge %s -> %s", b1, b2) self._graph.remove_edge(b1, b2) # Then remove all outedges from overlapped loop headers for b in self._overlapped_loop_headers: successors = self._graph.successors(b) for succ in successors: self._graph.remove_edge(b, succ) l.debug("Removing partial loop header edge %s -> %s", b, succ)
[docs] def downsize(self): """ Remove saved states from all CFGNodes to reduce memory usage. :return: None """ for cfg_node in self._nodes.values(): cfg_node.downsize()
[docs] def unroll_loops(self, max_loop_unrolling_times): """ Unroll loops for each function. The resulting CFG may still contain loops due to recursion, function calls, etc. :param int max_loop_unrolling_times: The maximum iterations of unrolling. :return: None """ if not isinstance(max_loop_unrolling_times, int) or max_loop_unrolling_times < 0: raise AngrCFGError( "Max loop unrolling times must be set to an integer greater than or equal to 0 if " + "loop unrolling is enabled." ) def _unroll(graph: networkx.DiGraph, loop: Loop): """ The loop callback method where loops are unrolled. :param graph: The control flow graph. :param loop: The loop instance. :return: None """ for back_edge in loop.continue_edges: loop_body_addrs = {n.addr for n in loop.body_nodes} src_blocknode: BlockNode = back_edge[0] dst_blocknode: BlockNode = back_edge[1] for src in self.get_all_nodes(src_blocknode.addr): for dst in graph.successors(src): if dst.addr != dst_blocknode.addr: continue # Duplicate the dst node new_dst = dst.copy() new_dst.looping_times = dst.looping_times + 1 if ( new_dst not in graph and # If the new_dst is already in the graph, we don't want to keep unrolling # the this loop anymore since it may *create* a new loop. Of course we # will lose some edges in this way, but in general it is acceptable. new_dst.looping_times <= max_loop_unrolling_times ): # Log all successors of the dst node dst_successors = graph.successors(dst) # Add new_dst to the graph edge_data = graph.get_edge_data(src, dst) graph.add_edge(src, new_dst, **edge_data) for ds in dst_successors: if ds.looping_times == 0 and ds.addr not in loop_body_addrs: edge_data = graph.get_edge_data(dst, ds) graph.add_edge(new_dst, ds, **edge_data) graph.remove_edge(src, dst) self._detect_loops(loop_callback=_unroll)
[docs] def force_unroll_loops(self, max_loop_unrolling_times): """ Unroll loops globally. The resulting CFG does not contain any loop, but this method is slow on large graphs. :param int max_loop_unrolling_times: The maximum iterations of unrolling. :return: None """ if not isinstance(max_loop_unrolling_times, int) or max_loop_unrolling_times < 0: raise AngrCFGError( "Max loop unrolling times must be set to an integer greater than or equal to 0 if " + "loop unrolling is enabled." ) # Traverse the CFG and try to find the beginning of loops loop_backedges = [] start = self._starts[0] if isinstance(start, tuple): start, _ = start # pylint: disable=unpacking-non-sequence start_node = self.get_any_node(start) if start_node is None: raise AngrCFGError("Cannot find start node when trying to unroll loops. The CFG might be empty.") graph_copy = networkx.DiGraph(self.graph) while True: cycles_iter = networkx.simple_cycles(graph_copy) try: cycle = next(cycles_iter) except StopIteration: break loop_backedge = (None, None) for n in networkx.dfs_preorder_nodes(graph_copy, source=start_node): if n in cycle: idx = cycle.index(n) if idx == 0: loop_backedge = (cycle[-1], cycle[idx]) else: loop_backedge = (cycle[idx - 1], cycle[idx]) break if loop_backedge not in loop_backedges: loop_backedges.append(loop_backedge) # Create a common end node for all nodes whose out_degree is 0 end_nodes = [n for n in graph_copy.nodes() if graph_copy.out_degree(n) == 0] new_end_node = "end_node" if not end_nodes: # We gotta randomly break a loop cycles = sorted(networkx.simple_cycles(graph_copy), key=len) first_cycle = cycles[0] if len(first_cycle) == 1: graph_copy.remove_edge(first_cycle[0], first_cycle[0]) else: graph_copy.remove_edge(first_cycle[0], first_cycle[1]) end_nodes = [n for n in graph_copy.nodes() if graph_copy.out_degree(n) == 0] for en in end_nodes: graph_copy.add_edge(en, new_end_node) # postdoms = self.immediate_postdominators(new_end_node, target_graph=graph_copy) # reverse_postdoms = defaultdict(list) # for k, v in postdoms.items(): # reverse_postdoms[v].append(k) # Find all loop bodies # for src, dst in loop_backedges: # nodes_in_loop = { src, dst } # while True: # new_nodes = set() # for n in nodes_in_loop: # if n in reverse_postdoms: # for node in reverse_postdoms[n]: # if node not in nodes_in_loop: # new_nodes.add(node) # if not new_nodes: # break # nodes_in_loop |= new_nodes # Unroll the loop body # TODO: Finish the implementation graph_copy.remove_node(new_end_node) src, dst = loop_backedge if graph_copy.has_edge(src, dst): # It might have been removed before # Duplicate the dst node new_dst = dst.copy() new_dst.looping_times = dst.looping_times + 1 if ( new_dst not in graph_copy and # If the new_dst is already in the graph, we don't want to keep unrolling # the this loop anymore since it may *create* a new loop. Of course we # will lose some edges in this way, but in general it is acceptable. new_dst.looping_times <= max_loop_unrolling_times ): # Log all successors of the dst node dst_successors = list(graph_copy.successors(dst)) # Add new_dst to the graph edge_data = graph_copy.get_edge_data(src, dst) graph_copy.add_edge(src, new_dst, **edge_data) for ds in dst_successors: if ds.looping_times == 0 and ds not in cycle: edge_data = graph_copy.get_edge_data(dst, ds) graph_copy.add_edge(new_dst, ds, **edge_data) # Remove the original edge graph_copy.remove_edge(src, dst) # Update loop backedges self._loop_back_edges = loop_backedges self.model.graph = graph_copy
[docs] def immediate_dominators(self, start, target_graph=None): """ Get all immediate dominators of sub graph from given node upwards. :param str start: id of the node to navigate forwards from. :param networkx.classes.digraph.DiGraph target_graph: graph to analyse, default is self.graph. :return: each node of graph as index values, with element as respective node's immediate dominator. :rtype: dict """ return self._immediate_dominators(start, target_graph=target_graph, reverse_graph=False)
[docs] def immediate_postdominators(self, end, target_graph=None): """ Get all immediate postdominators of sub graph from given node upwards. :param str start: id of the node to navigate forwards from. :param networkx.classes.digraph.DiGraph target_graph: graph to analyse, default is self.graph. :return: each node of graph as index values, with element as respective node's immediate dominator. :rtype: dict """ return self._immediate_dominators(end, target_graph=target_graph, reverse_graph=True)
[docs] def remove_fakerets(self): """ Get rid of fake returns (i.e., Ijk_FakeRet edges) from this CFG :return: None """ fakeret_edges = [ (src, dst) for src, dst, data in self.graph.edges(data=True) if data["jumpkind"] == "Ijk_FakeRet" ] self.graph.remove_edges_from(fakeret_edges)
[docs] def get_topological_order(self, cfg_node): """ Get the topological order of a CFG Node. :param cfg_node: A CFGNode instance. :return: An integer representing its order, or None if the CFGNode does not exist in the graph. """ if not self._quasi_topological_order: self._quasi_topological_sort() return self._quasi_topological_order.get(cfg_node, None)
[docs] def get_subgraph(self, starting_node, block_addresses): """ Get a sub-graph out of a bunch of basic block addresses. :param CFGNode starting_node: The beginning of the subgraph :param iterable block_addresses: A collection of block addresses that should be included in the subgraph if there is a path between `starting_node` and a CFGNode with the specified address, and all nodes on the path should also be included in the subgraph. :return: A new CFG that only contain the specific subgraph. :rtype: CFGEmulated """ graph = networkx.DiGraph() if starting_node not in self.graph: raise AngrCFGError( 'get_subgraph(): the specified "starting_node" %s does not exist in the current CFG.' % starting_node ) addr_set = set(block_addresses) graph.add_node(starting_node) queue = [starting_node] while queue: node = queue.pop() for _, dst, data in self.graph.out_edges([node], data=True): if dst not in graph and dst.addr in addr_set: graph.add_edge(node, dst, **data) queue.append(dst) cfg = self.copy() cfg._graph = graph cfg._starts = (starting_node.addr,) return cfg
[docs] def get_function_subgraph(self, start, max_call_depth=None): """ Get a sub-graph of a certain function. :param start: The function start. Currently it should be an integer. :param max_call_depth: Call depth limit. None indicates no limit. :return: A CFG instance which is a sub-graph of self.graph """ # FIXME: syscalls are not supported # FIXME: start should also take a CFGNode instance start_node = self.get_any_node(start) node_wrapper = (start_node, 0) stack = [node_wrapper] traversed_nodes = {start_node} subgraph_nodes = {start_node} while stack: nw = stack.pop() n, call_depth = nw[0], nw[1] # Get successors edges = self.graph.out_edges(n, data=True) for _, dst, data in edges: if dst not in traversed_nodes: # We see a new node! traversed_nodes.add(dst) if data["jumpkind"] == "Ijk_Call": if max_call_depth is None or (max_call_depth is not None and call_depth < max_call_depth): subgraph_nodes.add(dst) new_nw = (dst, call_depth + 1) stack.append(new_nw) elif data["jumpkind"] == "Ijk_Ret": if call_depth > 0: subgraph_nodes.add(dst) new_nw = (dst, call_depth - 1) stack.append(new_nw) else: subgraph_nodes.add(dst) new_nw = (dst, call_depth) stack.append(new_nw) # subgraph = networkx.subgraph(self.graph, subgraph_nodes) subgraph = self.graph.subgraph(subgraph_nodes).copy() # Make it a CFG instance subcfg = self.copy() subcfg._graph = subgraph subcfg._starts = (start,) return subcfg
@property def context_sensitivity_level(self): return self._context_sensitivity_level # # Serialization # def __setstate__(self, s): self.project: angr.Project = s["project"] self.indirect_jumps: Dict[int, IndirectJump] = s["indirect_jumps"] self._loop_back_edges = s["_loop_back_edges"] self._thumb_addrs = s["_thumb_addrs"] self._unresolvable_runs = s["_unresolvable_runs"] self._executable_address_ranges = s["_executable_address_ranges"] self._iropt_level = s["_iropt_level"] self._model = s["_model"] def __getstate__(self): s = { "project": self.project, "indirect_jumps": self.indirect_jumps, "_loop_back_edges": self._loop_back_edges, "_nodes_by_addr": self._nodes_by_addr, "_thumb_addrs": self._thumb_addrs, "_unresolvable_runs": self._unresolvable_runs, "_executable_address_ranges": self._executable_address_ranges, "_iropt_level": self._iropt_level, "_model": self._model, } return s # # Properties # @property def graph(self): return self._model.graph @property def unresolvables(self): """ Get those SimRuns that have non-resolvable exits. :return: A set of SimRuns :rtype: set """ return self._unresolvable_runs @property def deadends(self): """ Get all CFGNodes that has an out-degree of 0 :return: A list of CFGNode instances :rtype: list """ if self.graph is None: raise AngrCFGError("CFG hasn't been generated yet.") deadends = [i for i in self.graph if self.graph.out_degree(i) == 0] return deadends # # Private methods # # Initialization related methods def _sanitize_parameters(self): """ Perform a sanity check on parameters passed in to CFG.__init__(). An AngrCFGError is raised if any parameter fails the sanity check. :return: None """ # Check additional_edges if isinstance(self._additional_edges, (list, set, tuple)): new_dict = defaultdict(list) for s, d in self._additional_edges: new_dict[s].append(d) self._additional_edges = new_dict elif isinstance(self._additional_edges, dict): pass else: raise AngrCFGError("Additional edges can only be a list, set, tuple, or a dict.") # Check _advanced_backward_slicing if self._advanced_backward_slicing and self._enable_symbolic_back_traversal: raise AngrCFGError("Advanced backward slicing and symbolic back traversal cannot both be enabled.") if self._advanced_backward_slicing and not self._keep_state: raise AngrCFGError("Keep state must be enabled if advanced backward slicing is enabled.") # Sanitize avoid_runs self._avoid_runs = [] if self._avoid_runs is None else self._avoid_runs if not isinstance(self._avoid_runs, (list, set)): raise AngrCFGError('"avoid_runs" must either be None, or a list or a set.') self._sanitize_starts() def _sanitize_starts(self): # Sanitize starts # Convert self._starts to a list of SimState instances or tuples of (ip, jumpkind) if self._starts is None: self._starts = ((self.project.entry, None),) else: new_starts = [] for item in self._starts: if isinstance(item, tuple): if len(item) != 2: raise AngrCFGError('Unsupported item in "starts": %s' % str(item)) new_starts.append(item) elif isinstance(item, int): new_starts.append((item, None)) elif isinstance(item, SimState): new_starts.append(item) else: raise AngrCFGError('Unsupported item type in "starts": %s' % type(item)) self._starts = new_starts if not self._starts: raise AngrCFGError("At least one start must be provided") # CFG construction # The main loop and sub-methods def _job_key(self, job): """ Get the key for a specific CFG job. The key is a context-sensitive block ID. :param CFGJob job: The CFGJob instance. :return: The block ID of the specific CFG job. :rtype: BlockID """ return job.block_id def _job_sorting_key(self, job): """ Get the sorting key of a CFGJob instance. :param CFGJob job: the CFGJob object. :return: An integer that determines the order of this job in the queue. :rtype: int """ if self._base_graph is None: # we don't do sorting if there is no base_graph return 0 MAX_JOBS = 1000000 if job.addr not in self._node_addr_visiting_order: return MAX_JOBS return self._node_addr_visiting_order.index(job.addr) def _pre_analysis(self): """ Initialization work. Executed prior to the analysis. :return: None """ # Fill up self._starts for item in self._starts: callstack = None if isinstance(item, tuple): # (addr, jumpkind) ip = item[0] state = self._create_initial_state(item[0], item[1]) elif isinstance(item, SimState): # SimState state = item.copy() # pylint: disable=no-member ip = state.solver.eval_one(state.ip) self._reset_state_mode(state, "fastpath") else: raise AngrCFGError("Unsupported CFG start type: %s." % str(type(item))) self._symbolic_function_initial_state[ip] = state path_wrapper = CFGJob(ip, state, self._context_sensitivity_level, None, None, call_stack=callstack) key = path_wrapper.block_id if key not in self._start_keys: self._start_keys.append(key) self._insert_job(path_wrapper) self._register_analysis_job(path_wrapper.func_addr, path_wrapper) def _intra_analysis(self): """ During the analysis. We process function hints here. :return: None """ if self._pending_function_hints: self._process_hints(self._analyzed_addrs) def _job_queue_empty(self): """ A callback method called when the job queue is empty. :return: None """ self._iteratively_clean_pending_exits() while self._pending_jobs: # We don't have any exits remaining. Let's pop out a pending exit pending_job = self._get_one_pending_job() if pending_job is None: continue self._insert_job(pending_job) self._register_analysis_job(pending_job.func_addr, pending_job) break def _create_initial_state(self, ip, jumpkind): """ Obtain a SimState object for a specific address Fastpath means the CFG generation will work in an IDA-like way, in which it will not try to execute every single statement in the emulator, but will just do the decoding job. This is much faster than the old way. :param int ip: The instruction pointer :param str jumpkind: The jumpkind upon executing the block :return: The newly-generated state :rtype: SimState """ jumpkind = "Ijk_Boring" if jumpkind is None else jumpkind if self._initial_state is None: state = self.project.factory.blank_state( addr=ip, mode="fastpath", add_options=self._state_add_options, remove_options=self._state_remove_options, ) self._initial_state = state else: # FIXME: self._initial_state is deprecated. This branch will be removed soon state = self._initial_state.copy() state.history.jumpkind = jumpkind self._reset_state_mode(state, "fastpath") state._ip = state.solver.BVV(ip, self.project.arch.bits) if jumpkind is not None: state.history.jumpkind = jumpkind # THIS IS A HACK FOR MIPS if ip is not None and self.project.arch.name in ("MIPS32", "MIPS64"): # We assume this is a function start state.regs.t9 = ip # TODO there was at one point special logic for the ppc64 table of contents but it seems to have bitrotted return state def _get_one_pending_job(self): """ Retrieve a pending job. :return: A CFGJob instance or None """ pending_job_key = next(iter(self._pending_jobs.keys())) pending_job = self._pending_jobs[pending_job_key].pop() if len(self._pending_jobs[pending_job_key]) == 0: del self._pending_jobs[pending_job_key] pending_job_state = pending_job.state pending_job_call_stack = pending_job.call_stack pending_job_src_block_id = pending_job.src_block_id pending_job_src_exit_stmt_idx = pending_job.src_exit_stmt_idx self._deregister_analysis_job(pending_job.caller_func_addr, pending_job) # Let's check whether this address has been traced before. if pending_job_key in self._nodes: node = self._nodes[pending_job_key] if node in self.graph: pending_exit_addr = self._block_id_addr(pending_job_key) # That block has been traced before. Let's forget about it l.debug("Target 0x%08x has been traced before. Trying the next one...", pending_exit_addr) # However, we should still create the FakeRet edge self._graph_add_edge( pending_job_src_block_id, pending_job_key, jumpkind="Ijk_FakeRet", stmt_idx=pending_job_src_exit_stmt_idx, ins_addr=pending_job.src_exit_ins_addr, ) return None pending_job_state.history.jumpkind = "Ijk_FakeRet" job = CFGJob( pending_job_state.addr, pending_job_state, self._context_sensitivity_level, src_block_id=pending_job_src_block_id, src_exit_stmt_idx=pending_job_src_exit_stmt_idx, src_ins_addr=pending_job.src_exit_ins_addr, call_stack=pending_job_call_stack, ) l.debug("Tracing a missing return exit %s", self._block_id_repr(pending_job_key)) return job def _process_hints(self, analyzed_addrs): """ Process function hints in the binary. :return: None """ # Function hints! # Now let's see how many new functions we can get here... while self._pending_function_hints: f = self._pending_function_hints.pop() if f not in analyzed_addrs: new_state = self.project.factory.entry_state(mode="fastpath") new_state.ip = new_state.solver.BVV(f, self.project.arch.bits) # TOOD: Specially for MIPS if new_state.arch.name in ("MIPS32", "MIPS64"): # Properly set t9 new_state.registers.store("t9", f) new_path_wrapper = CFGJob(f, new_state, self._context_sensitivity_level) self._insert_job(new_path_wrapper) self._register_analysis_job(f, new_path_wrapper) l.debug("Picking a function 0x%x from pending function hints.", f) self.kb.functions.function(new_path_wrapper.func_addr, create=True) break def _post_analysis(self): """ Post-CFG-construction. :return: None """ self._make_completed_functions() new_changes = self._iteratively_analyze_function_features() functions_do_not_return = new_changes["functions_do_not_return"] self._update_function_callsites(functions_do_not_return) # Create all pending edges for _, edges in self._pending_edges.items(): for src_node, dst_node, data in edges: self._graph_add_edge(src_node, dst_node, **data) # Remove those edges that will never be taken! self._remove_non_return_edges() CFGBase._post_analysis(self) # Job handling def _pre_job_handling(self, job): # pylint:disable=arguments-differ """ Before processing a CFGJob. Right now each block is traced at most once. If it is traced more than once, we will mark it as "should_skip" before tracing it. An AngrForwardAnalysisSkipJob exception is raised in order to skip analyzing the job. :param CFGJob job: The CFG job object. :param dict _locals: A bunch of local variables that will be kept around when handling this job and its corresponding successors. :return: None """ # Extract initial info the CFGJob job.call_stack_suffix = job.get_call_stack_suffix() job.current_function = self.kb.functions.function( job.func_addr, create=True, syscall=job.jumpkind.startswith("Ijk_Sys") ) src_block_id = job.src_block_id src_exit_stmt_idx = job.src_exit_stmt_idx src_ins_addr = job.src_ins_addr addr = job.addr # Log this address if l.level == logging.DEBUG: self._analyzed_addrs.add(addr) if addr == job.func_addr: # Store the input state of this function self._function_input_states[job.func_addr] = job.state # deregister this job self._deregister_analysis_job(job.func_addr, job) # Generate a unique key for this job block_id = job.block_id # Should we skip tracing this block? should_skip = False if self._traced_addrs[job.call_stack_suffix][addr] >= self._max_iterations: should_skip = True elif ( self._is_call_jumpkind(job.jumpkind) and self._call_depth is not None and len(job.call_stack) > self._call_depth and (self._call_tracing_filter is None or self._call_tracing_filter(job.state, job.jumpkind)) ): should_skip = True # SimInspect breakpoints support job.state._inspect("cfg_handle_job", BP_BEFORE) # Get a SimSuccessors out of current job sim_successors, exception_info, _ = self._get_simsuccessors(addr, job, current_function_addr=job.func_addr) # determine the depth of this basic block if self._max_steps is None: # it's unnecessary to track depth when we are not limiting max_steps depth = None else: if src_block_id is None: # oh this is the very first basic block on this path depth = 0 else: src_cfgnode = self._nodes[src_block_id] depth = src_cfgnode.depth + 1 # the depth will not be updated later on even if this block has a greater depth on another path. # consequently, the `max_steps` limit is not veyr precise - I didn't see a need to make it precise # though. if block_id not in self._nodes: # Create the CFGNode object cfg_node = self._create_cfgnode( sim_successors, job.call_stack, job.func_addr, block_id=block_id, depth=depth ) self._model.add_node(block_id, cfg_node) else: # each block_id should only correspond to one CFGNode # use the existing CFGNode object # note that since we reuse existing CFGNodes, we may miss the following cases: # # mov eax, label_0 mov ebx, label_1 # jmp xxx jmp xxx # | | # | | # *----------------------* # | # call eax # # Since the basic block "call eax" will only be traced once, either label_0 or label_1 will be missed in # this case. Indirect jump resolution might be able to get it, but that's another story. Ideally, # "call eax" should be traced twice with *different* sim_successors keys, which requires block ID being flow # sensitive, but it is way too expensive. cfg_node = self._nodes[block_id] # Increment tracing count for this block self._traced_addrs[job.call_stack_suffix][addr] += 1 if self._keep_state: # TODO: if we are reusing an existing CFGNode, we will be overwriting the original input state here. we # TODO: should save them all, which, unfortunately, requires some redesigning :-( cfg_node.input_state = sim_successors.initial_state # See if this job cancels another FakeRet # This should be done regardless of whether this job should be skipped or not, otherwise edges will go missing # in the CFG or function transition graphs. if job.jumpkind == "Ijk_FakeRet" or (job.jumpkind == "Ijk_Ret" and block_id in self._pending_jobs): # The fake ret is confirmed (since we are returning from the function it calls). Create an edge for it # in the graph. the_jobs = [] if block_id in self._pending_jobs: the_jobs: "PendingJob" = self._pending_jobs.pop(block_id) for the_job in the_jobs: self._deregister_analysis_job(the_job.caller_func_addr, the_job) else: the_jobs = [job] for the_job in the_jobs: self._graph_add_edge( the_job.src_block_id, block_id, jumpkind="Ijk_FakeRet", stmt_idx=the_job.src_exit_stmt_idx, ins_addr=src_ins_addr, ) self._update_function_transition_graph( the_job.src_block_id, block_id, jumpkind="Ijk_FakeRet", ins_addr=src_ins_addr, stmt_idx=the_job.src_exit_stmt_idx, confirmed=True, ) if sim_successors is None or should_skip: # We cannot retrieve the block, or we should skip the analysis of this node # But we create the edge anyway. If the sim_successors does not exist, it will be an edge from the previous # node to a PathTerminator self._graph_add_edge( src_block_id, block_id, jumpkind=job.jumpkind, stmt_idx=src_exit_stmt_idx, ins_addr=src_ins_addr ) self._update_function_transition_graph( src_block_id, block_id, jumpkind=job.jumpkind, ins_addr=src_ins_addr, stmt_idx=src_exit_stmt_idx ) # We are good. Raise the exception and leave raise AngrSkipJobNotice() self._update_thumb_addrs(sim_successors, job.state) # We store the function hints first. Function hints will be checked at the end of the analysis to avoid # any duplication with existing jumping targets if self._enable_function_hints: if sim_successors.sort == "IRSB" and sim_successors.all_successors: function_hints = self._search_for_function_hints(sim_successors.all_successors[0]) for f in function_hints: self._pending_function_hints.add(f) self._graph_add_edge( src_block_id, block_id, jumpkind=job.jumpkind, stmt_idx=src_exit_stmt_idx, ins_addr=src_ins_addr ) self._update_function_transition_graph( src_block_id, block_id, jumpkind=job.jumpkind, ins_addr=src_ins_addr, stmt_idx=src_exit_stmt_idx ) if block_id in self._pending_edges: # there are some edges waiting to be created. do it here. for src_key, dst_key, data in self._pending_edges[block_id]: self._graph_add_edge(src_key, dst_key, **data) del self._pending_edges[block_id] block_info = { reg: sim_successors.initial_state.registers.load(reg) for reg in self.project.arch.persistent_regs } self._block_artifacts[addr] = block_info job.cfg_node = cfg_node job.sim_successors = sim_successors job.exception_info = exception_info # For debugging purposes! job.successor_status = {} def _get_successors(self, job): """ Get a collection of successors out of the current job. :param CFGJob job: The CFGJob instance. :return: A collection of successors. :rtype: list """ addr = job.addr sim_successors = job.sim_successors cfg_node = job.cfg_node input_state = job.state func_addr = job.func_addr # check step limit if self._max_steps is not None: depth = cfg_node.depth if depth >= self._max_steps: return [] successors = [] is_indirect_jump = sim_successors.sort == "IRSB" and self._is_indirect_jump(cfg_node, sim_successors) indirect_jump_resolved_by_resolvers = False if is_indirect_jump and self._resolve_indirect_jumps: # Try to resolve indirect jumps irsb = input_state.block(cross_insn_opt=False).vex resolved, resolved_targets, ij = self._indirect_jump_encountered( addr, cfg_node, irsb, func_addr, stmt_idx=DEFAULT_STATEMENT ) if resolved: successors = self._convert_indirect_jump_targets_to_states(job, resolved_targets) if ij: self._indirect_jump_resolved(ij, ij.addr, None, resolved_targets) else: # Try to resolve this indirect jump using heavier approaches resolved_targets = self._process_one_indirect_jump(ij, func_graph_complete=False) successors = self._convert_indirect_jump_targets_to_states(job, resolved_targets) if successors: indirect_jump_resolved_by_resolvers = True else: # It's unresolved. Add it to the wait list (but apparently we don't have any better way to resolve it # right now). self._indirect_jumps_to_resolve.add(ij) if not successors: # Get all successors of this block successors = ( (sim_successors.flat_successors + sim_successors.unsat_successors) if addr not in self._avoid_runs else [] ) # Post-process successors successors, job.extra_info = self._post_process_successors(input_state, sim_successors, successors) all_successors = successors + sim_successors.unconstrained_successors # make sure FakeRets are at the last all_successors = [suc for suc in all_successors if suc.history.jumpkind != "Ijk_FakeRet"] + [ suc for suc in all_successors if suc.history.jumpkind == "Ijk_FakeRet" ] if self._keep_state: cfg_node.final_states = all_successors[::] if is_indirect_jump and not indirect_jump_resolved_by_resolvers: # For indirect jumps, filter successors that do not make sense successors = self._filter_insane_successors(successors) successors = self._try_resolving_indirect_jumps( sim_successors, cfg_node, func_addr, successors, job.exception_info, self._block_artifacts ) # Remove all successors whose IP is symbolic successors = [s for s in successors if not s.ip.symbolic] # Add additional edges supplied by the user successors = self._add_additional_edges(input_state, sim_successors, cfg_node, successors) # if base graph is used, add successors implied from the graph if self._base_graph: basegraph_successor_addrs = set() for src_, dst_ in self._base_graph.edges(): if src_.addr == addr: basegraph_successor_addrs.add(dst_.addr) successor_addrs = {s.solver.eval(s.ip) for s in successors} extra_successor_addrs = basegraph_successor_addrs - successor_addrs if all_successors: # make sure we have a base state to use base_state = all_successors[0] # TODO: for calls, we want to use the fake_ret state for s_addr in extra_successor_addrs: # an extra target successor_state = base_state.copy() successor_state.ip = s_addr successors.append(successor_state) else: if extra_successor_addrs: l.error("CFGEmulated terminates at %#x although base graph provided more exits.", addr) if not successors: # There is no way out :-( # Log it first self._push_unresolvable_run(addr) if sim_successors.sort == "SimProcedure" and isinstance( sim_successors.artifacts["procedure"], SIM_PROCEDURES["stubs"]["PathTerminator"] ): # If there is no valid exit in this branch and it's not # intentional (e.g. caused by a SimProcedure that does not # do_return) , we should make it return to its call-site. However, # we don't want to use its state anymore as it might be corrupted. # Just create an edge in the graph. return_target = job.call_stack.current_return_target if return_target is not None: new_call_stack = job.call_stack_copy() return_target_key = self._generate_block_id( new_call_stack.stack_suffix(self.context_sensitivity_level), return_target, False ) # You can never return to a syscall if not cfg_node.instruction_addrs: ret_ins_addr = None else: if self.project.arch.branch_delay_slot: if len(cfg_node.instruction_addrs) > 1: ret_ins_addr = cfg_node.instruction_addrs[-2] else: l.error("At %s: expecting more than one instruction. Only got one.", cfg_node) ret_ins_addr = None else: ret_ins_addr = cfg_node.instruction_addrs[-1] # Things might be a bit difficult here. _graph_add_edge() requires both nodes to exist, but here # the return target node may not exist yet. If that's the case, we will put it into a "delayed edge # list", and add this edge later when the return target CFGNode is created. if return_target_key in self._nodes: self._graph_add_edge( job.block_id, return_target_key, jumpkind="Ijk_Ret", stmt_id=DEFAULT_STATEMENT, ins_addr=ret_ins_addr, ) else: self._pending_edges[return_target_key].append( ( job.block_id, return_target_key, { "jumpkind": "Ijk_Ret", "stmt_id": DEFAULT_STATEMENT, "ins_addr": ret_ins_addr, }, ) ) else: # There are no successors, but we still want to update the function graph artifacts = job.sim_successors.artifacts if "irsb" in artifacts and "insn_addrs" in artifacts and artifacts["insn_addrs"]: the_irsb = artifacts["irsb"] insn_addrs = artifacts["insn_addrs"] self._handle_job_without_successors(job, the_irsb, insn_addrs) # TODO: replace it with a DDG-based function IO analysis # handle all actions if successors: self._handle_actions( successors[0], sim_successors, job.current_function, job.current_stack_pointer, set(), ) return successors def _post_job_handling(self, job: CFGJob, _new_jobs, successors: List[SimState]): # type: ignore[override] """ :param CFGJob job: :param successors: :return: """ # Finally, post-process CFG Node and log the return target if job.extra_info: if job.extra_info["is_call_jump"] and job.extra_info["return_target"] is not None: job.cfg_node.return_target = job.extra_info["return_target"] # Debugging output if needed if l.level == logging.DEBUG: # Only in DEBUG mode do we process and output all those shit self._post_handle_job_debug(job, successors) # SimInspect breakpoints support job.state._inspect("cfg_handle_job", BP_AFTER) def _post_process_successors(self, input_state, sim_successors, successors): """ Filter the list of successors :param SimState input_state: Input state. :param SimSuccessors sim_successors: The SimSuccessors instance. :param list successors: A list of successors generated after processing the current block. :return: A list of successors. :rtype: list """ if sim_successors.sort == "IRSB" and input_state.thumb: successors = self._arm_thumb_filter_jump_successors( sim_successors.artifacts["irsb"], successors, lambda state: state.scratch.ins_addr, lambda state: state.scratch.exit_stmt_idx, lambda state: state.history.jumpkind, ) # If there is a call exit, we shouldn't put the default exit (which # is artificial) into the CFG. The exits will be Ijk_Call and # Ijk_FakeRet, and Ijk_Call always goes first extra_info = { "is_call_jump": False, "call_target": None, "return_target": None, "last_call_exit_target": None, "skip_fakeret": False, } # Post-process jumpkind before touching all_successors for ( suc ) in sim_successors.all_successors: # we process all successors here to include potential unsat successors suc_jumpkind = suc.history.jumpkind if self._is_call_jumpkind(suc_jumpkind): extra_info["is_call_jump"] = True break return successors, extra_info def _post_handle_job_debug(self, job: CFGJob, successors: List[SimState]) -> None: """ Post job handling: print debugging information regarding the current job. :param CFGJob job: The current CFGJob instance. :param list successors: All successors of the analysis job. :return: None """ sim_successors = job.sim_successors call_stack_suffix = job.call_stack_suffix extra_info = job.extra_info successor_status = job.successor_status func = self.project.loader.find_symbol(job.func_addr) obj = self.project.loader.find_object_containing(job.addr) function_name = func.name if func is not None else None module_name = obj.provides if obj is not None else None node = self.model.get_node(job.block_id) depth_str = "(D:%s)" % node.depth if node.depth is not None else "" l.debug( "%s [%#x%s | %s]", sim_successors.description, sim_successors.addr, depth_str, "->".join([hex(i) for i in call_stack_suffix if i is not None]), ) l.debug("(Function %s of binary %s)", function_name, module_name) l.debug("| Call jump: %s", extra_info["is_call_jump"] if extra_info is not None else "unknown") for suc in successors: jumpkind = suc.history.jumpkind if jumpkind == "Ijk_FakeRet": exit_type_str = "Simulated Ret" else: exit_type_str = "-" try: l.debug( "| target: %#x %s [%s] %s", suc.solver.eval_one(suc.ip), successor_status[suc], exit_type_str, jumpkind, ) except (SimValueError, SimSolverModeError): l.debug("| target cannot be concretized. %s [%s] %s", successor_status[suc], exit_type_str, jumpkind) l.debug("%d exits remaining, %d exits pending.", len(self._job_info_queue), len(self._pending_jobs)) l.debug("%d unique basic blocks are analyzed so far.", len(self._analyzed_addrs)) def _iteratively_clean_pending_exits(self): """ Iteratively update the completed functions set, analyze whether each function returns or not, and remove pending exits if the callee function does not return. We do this iteratively so that we come to a fixed point in the end. In most cases, the number of outer iteration equals to the maximum levels of wrapped functions whose "returningness" is determined by the very last function it calls. :return: None """ while True: # did we finish analyzing any function? # fill in self._completed_functions self._make_completed_functions() if self._pending_jobs: # There are no more remaining jobs, but only pending jobs left. Each pending job corresponds to # a previous job that does not return properly. # Now it's a good time to analyze each function (that we have so far) and determine if it is a) # returning, b) not returning, or c) unknown. For those functions that are definitely not returning, # remove the corresponding pending exits from `pending_jobs` array. Perform this procedure iteratively # until no new not-returning functions appear. Then we pick a pending exit based on the following # priorities: # - Job pended by a returning function # - Job pended by an unknown function new_changes = self._iteratively_analyze_function_features() functions_do_not_return = new_changes["functions_do_not_return"] self._update_function_callsites(functions_do_not_return) if not self._clean_pending_exits(): # no more pending exits are removed. we are good to go! break else: break def _clean_pending_exits(self): """ Remove those pending exits if: a) they are the return exits of non-returning SimProcedures b) they are the return exits of non-returning syscalls c) they are the return exits of non-returning functions :return: True if any pending exits are removed, False otherwise :rtype: bool """ pending_exits_to_remove = [] for block_id, jobs in self._pending_jobs.items(): for pe in jobs: if pe.returning_source is None: # The original call failed. This pending exit must be followed. continue func = self.kb.functions.function(pe.returning_source) if func is None: # Why does it happen? l.warning( "An expected function at %s is not found. Please report it to Fish.", hex(pe.returning_source) if pe.returning_source is not None else "None", ) continue if func.returning is False: # Oops, it's not returning # Remove this pending exit pending_exits_to_remove.append(block_id) # We want to mark that call as not returning in the current function current_function_addr = self._block_id_current_func_addr(block_id) if current_function_addr is not None: current_function = self.kb.functions.function(current_function_addr) if current_function is not None: call_site_addr = self._block_id_addr(pe.src_block_id) current_function._call_sites[call_site_addr] = (func.addr, None) else: l.warning( "An expected function at %#x is not found. Please report it to Fish.", current_function_addr, ) for block_id in pending_exits_to_remove: l.debug( "Removing all pending exits to %#x since the target function %#x does not return", self._block_id_addr(block_id), next(iter(self._pending_jobs[block_id])).returning_source, ) for to_remove in self._pending_jobs[block_id]: self._deregister_analysis_job(to_remove.caller_func_addr, to_remove) del self._pending_jobs[block_id] if pending_exits_to_remove: return True return False # Successor handling def _pre_handle_successor_state(self, extra_info, jumpkind, target_addr): """ :return: None """ # Fill up extra_info if extra_info["is_call_jump"] and self._is_call_jumpkind(jumpkind): extra_info["call_target"] = target_addr if extra_info["is_call_jump"] and jumpkind == "Ijk_FakeRet": extra_info["return_target"] = target_addr if jumpkind == "Ijk_Call": extra_info["last_call_exit_target"] = target_addr def _handle_successor(self, job, successor: SimState, successors): """ Returns a new CFGJob instance for further analysis, or None if there is no immediate state to perform the analysis on. :param CFGJob job: The current job. """ state: SimState = successor all_successor_states = successors addr = job.addr # The PathWrapper instance to return pw = None job.successor_status[state] = "" new_state = state.copy() suc_jumpkind = state.history.jumpkind suc_exit_stmt_idx = state.scratch.exit_stmt_idx suc_exit_ins_addr = state.scratch.exit_ins_addr if suc_jumpkind in { "Ijk_EmWarn", "Ijk_NoDecode", "Ijk_MapFail", "Ijk_NoRedir", "Ijk_SigTRAP", "Ijk_SigSEGV", "Ijk_ClientReq", }: # Ignore SimExits that are of these jumpkinds job.successor_status[state] = "Skipped" return [] call_target = job.extra_info["call_target"] if suc_jumpkind == "Ijk_FakeRet" and call_target is not None: # if the call points to a SimProcedure that doesn't return, we don't follow the fakeret anymore if self.project.is_hooked(call_target): sim_proc = self.project._sim_procedures[call_target] if sim_proc.NO_RET: return [] # Get target address try: target_addr = state.solver.eval_one(state.ip) except (SimValueError, SimSolverModeError): # It cannot be concretized currently. Maybe we can handle it later, maybe it just cannot be concretized target_addr = None if suc_jumpkind == "Ijk_Ret": target_addr = job.call_stack.current_return_target if target_addr is not None: new_state.ip = new_state.solver.BVV(target_addr, new_state.arch.bits) if target_addr is None: # Unlucky... return [] if state.thumb: # Make sure addresses are always odd. It is important to encode this information in the address for the # time being. target_addr |= 1 # see if the target successor is in our whitelist if self._address_whitelist is not None: if target_addr not in self._address_whitelist: l.debug("Successor %#x is not in the address whitelist. Skip.", target_addr) return [] # see if this edge is in the base graph if self._base_graph is not None: # TODO: make it more efficient. the current implementation is half-assed and extremely slow for src_, dst_ in self._base_graph.edges(): if src_.addr == addr and dst_.addr == target_addr: break else: # not found l.debug("Edge (%#x -> %#x) is not found in the base graph. Skip.", addr, target_addr) return [] # Fix target_addr for syscalls if suc_jumpkind.startswith("Ijk_Sys"): syscall_proc = self.project.simos.syscall(new_state) if syscall_proc is not None: target_addr = syscall_proc.addr self._pre_handle_successor_state(job.extra_info, suc_jumpkind, target_addr) if suc_jumpkind == "Ijk_FakeRet": if target_addr == job.extra_info["last_call_exit_target"]: l.debug("... skipping a fake return exit that has the same target with its call exit.") job.successor_status[state] = "Skipped" return [] if job.extra_info["skip_fakeret"]: l.debug("... skipping a fake return exit since the function it's calling doesn't return") job.successor_status[state] = "Skipped - non-returning function 0x%x" % job.extra_info["call_target"] return [] # TODO: Make it optional if suc_jumpkind == "Ijk_Ret" and self._call_depth is not None and len(job.call_stack) <= 1: # We cannot continue anymore since this is the end of the function where we started tracing l.debug("... reaching the end of the starting function, skip.") job.successor_status[state] = "Skipped - reaching the end of the starting function" return [] # Create the new call stack of target block new_call_stack = self._create_new_call_stack(addr, all_successor_states, job, target_addr, suc_jumpkind) # Create the callstack suffix new_call_stack_suffix = new_call_stack.stack_suffix(self._context_sensitivity_level) # Tuple that will be used to index this exit new_tpl = self._generate_block_id(new_call_stack_suffix, target_addr, suc_jumpkind.startswith("Ijk_Sys")) # We might have changed the mode for this basic block # before. Make sure it is still running in 'fastpath' mode self._reset_state_mode(new_state, "fastpath") pw = CFGJob( target_addr, new_state, self._context_sensitivity_level, src_block_id=job.block_id, src_exit_stmt_idx=suc_exit_stmt_idx, src_ins_addr=suc_exit_ins_addr, call_stack=new_call_stack, jumpkind=suc_jumpkind, ) # Special case: If the binary has symbols and the target address is a function, but for some reason (e.g., # a tail-call optimization) the CallStack's function address is still the old function address, we will have to # overwrite it here. if not self._is_call_jumpkind(pw.jumpkind): target_symbol = self.project.loader.find_symbol(target_addr) if target_symbol and target_symbol.is_function: # Force update the function address pw.func_addr = target_addr # Generate new exits if suc_jumpkind == "Ijk_Ret": # This is the real return exit job.successor_status[state] = "Appended" elif suc_jumpkind == "Ijk_FakeRet": # This is the default "fake" retn that generated at each # call. Save them first, but don't process them right # away # st = self.project._simos.prepare_call_state(new_state, initial_state=saved_state) st = new_state self._reset_state_mode(st, "fastpath") pw = None # clear the job pe = PendingJob( job.func_addr, job.extra_info["call_target"], st, job.block_id, suc_exit_stmt_idx, suc_exit_ins_addr, new_call_stack, ) self._pending_jobs[new_tpl].append(pe) self._register_analysis_job(pe.caller_func_addr, pe) job.successor_status[state] = "Pended" elif self._traced_addrs[new_call_stack_suffix][target_addr] >= 1 and suc_jumpkind == "Ijk_Ret": # This is a corner case for the f****** ARM instruction # like # BLEQ <address> # If we have analyzed the boring exit before returning from that called address, we will lose the link # between the last block of the function being called and the basic block it returns to. We cannot # reanalyze the basic block as we are not flow-sensitive, but we can still record the connection and make # for it afterwards. pass else: job.successor_status[state] = "Appended" if job.extra_info["is_call_jump"] and job.extra_info["call_target"] in self._non_returning_functions: job.extra_info["skip_fakeret"] = True if not pw: return [] # register the job self._register_analysis_job(pw.func_addr, pw) return [pw] def _handle_job_without_successors(self, job, irsb, insn_addrs): """ A block without successors should still be handled so it can be added to the function graph correctly. :param CFGJob job: The current job that do not have any successor. :param IRSB irsb: The related IRSB. :param insn_addrs: A list of instruction addresses of this IRSB. :return: None """ # it's not an empty block # handle all conditional exits ins_addr = job.addr for stmt_idx, stmt in enumerate(irsb.statements): if type(stmt) is pyvex.IRStmt.IMark: ins_addr = stmt.addr + stmt.delta elif type(stmt) is pyvex.IRStmt.Exit: successor_jumpkind = stmt.jk self._update_function_transition_graph( job.block_id, None, jumpkind=successor_jumpkind, ins_addr=ins_addr, stmt_idx=stmt_idx, ) # handle the default exit successor_jumpkind = irsb.jumpkind successor_last_ins_addr = insn_addrs[-1] self._update_function_transition_graph( job.block_id, None, jumpkind=successor_jumpkind, ins_addr=successor_last_ins_addr, stmt_idx=DEFAULT_STATEMENT, ) # SimAction handling def _handle_actions(self, state, current_run, func, sp_addr, accessed_registers): """ For a given state and current location of of execution, will update a function by adding the offets of appropriate actions to the stack variable or argument registers for the fnc. :param SimState state: upcoming state. :param SimSuccessors current_run: possible result states. :param knowledge.Function func: current function. :param int sp_addr: stack pointer address. :param set accessed_registers: set of before accessed registers. """ se = state.solver if func is not None and sp_addr is not None: # Fix the stack pointer (for example, skip the return address on the stack) new_sp_addr = sp_addr + self.project.arch.call_sp_fix actions = [a for a in state.history.recent_actions if a.bbl_addr == current_run.addr] for a in actions: if a.type == "mem" and a.action == "read": try: addr = se.eval_one(a.addr.ast, default=0) except (claripy.ClaripyError, SimSolverModeError): continue if (self.project.arch.call_pushes_ret and addr >= new_sp_addr) or ( not self.project.arch.call_pushes_ret and addr >= new_sp_addr ): # TODO: What if a variable locates higher than the stack is modified as well? We probably want # TODO: to make sure the accessing address falls in the range of stack offset = addr - new_sp_addr func._add_argument_stack_variable(offset) elif a.type == "reg": offset = a.offset if a.action == "read" and offset not in accessed_registers: func._add_argument_register(offset) elif a.action == "write": accessed_registers.add(offset) else: l.error( "handle_actions: Function not found, or stack pointer is None. It might indicates unbalanced stack." ) # Private utils - DiGraph construction and manipulation def _graph_get_node(self, node_key, terminator_for_nonexistent_node=False): """ :param node_key: :return: """ if node_key not in self._nodes: if not terminator_for_nonexistent_node: return None # Generate a PathTerminator node addr = self._block_id_addr(node_key) func_addr = self._block_id_current_func_addr(node_key) if func_addr is None: # We'll have to use the current block address instead # TODO: Is it really OK? func_addr = self._block_id_addr(node_key) is_thumb = isinstance(self.project.arch, ArchARM) and addr % 2 == 1 pt = CFGENode( self._block_id_addr(node_key), None, self.model, input_state=None, simprocedure_name="PathTerminator", function_address=func_addr, callstack_key=self._block_id_callstack_key(node_key), thumb=is_thumb, ) if self._keep_state: # We don't have an input state available for it (otherwise we won't have to create a # PathTerminator). This is just a trick to make get_any_irsb() happy. pt.input_state = self.project.factory.entry_state() pt.input_state.ip = pt.addr self._model.add_node(node_key, pt) if is_thumb: self._thumb_addrs.add(addr) self._thumb_addrs.add(addr - 1) l.debug("Block ID %s does not exist. Create a PathTerminator instead.", self._block_id_repr(node_key)) return self._nodes[node_key] def _graph_add_edge(self, src_node_key, dst_node_key, **kwargs): """ :param src_node_key: :param dst_node_key: :param jumpkind: :param exit_stmt_idx: :return: """ dst_node = self._graph_get_node(dst_node_key, terminator_for_nonexistent_node=True) if src_node_key is None: self.graph.add_node(dst_node) else: src_node = self._graph_get_node(src_node_key, terminator_for_nonexistent_node=True) self.graph.add_edge(src_node, dst_node, **kwargs) def _update_function_transition_graph( self, src_node_key, dst_node_key, jumpkind="Ijk_Boring", ins_addr=None, stmt_idx=None, confirmed=None ): """ Update transition graphs of functions in function manager based on information passed in. :param src_node_key: Node key of the source CFGNode. Might be None. :param dst_node: Node key of the destination CFGNode. Might be None. :param str jumpkind: Jump kind of this transition. :param int ret_addr: The theoretical return address for calls. :param int or None ins_addr: Address of the instruction where this transition is made. :param int or None stmt_idx: ID of the statement where this transition is made. :param bool or None confirmed: Whether this call transition has been confirmed or not. :return: None """ if dst_node_key is not None: dst_node = self._graph_get_node(dst_node_key, terminator_for_nonexistent_node=True) dst_node_addr = dst_node.addr dst_codenode = dst_node.to_codenode() dst_node_func_addr = dst_node.function_address else: dst_node = None dst_node_addr = None dst_codenode = None dst_node_func_addr = None if src_node_key is None: if dst_node is None: raise ValueError("Either src_node_key or dst_node_key must be specified.") self.kb.functions.function(dst_node.function_address, create=True)._register_nodes(True, dst_codenode) return src_node = self._graph_get_node(src_node_key, terminator_for_nonexistent_node=True) # Update the transition graph of current function if jumpkind == "Ijk_Call": ret_addr = src_node.return_target ret_node = ( self.kb.functions.function(src_node.function_address, create=True)._get_block(ret_addr).codenode if ret_addr else None ) self.kb.functions._add_call_to( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_addr=dst_node_addr, retn_node=ret_node, syscall=False, ins_addr=ins_addr, stmt_idx=stmt_idx, ) if jumpkind.startswith("Ijk_Sys"): self.kb.functions._add_call_to( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_addr=dst_node_addr, retn_node=src_node.to_codenode(), # For syscalls, they are returning to the address of themselves syscall=True, ins_addr=ins_addr, stmt_idx=stmt_idx, ) elif jumpkind == "Ijk_Ret": # Create a return site for current function self.kb.functions._add_return_from( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_node=dst_codenode, ) if dst_node is not None: # Create a returning edge in the caller function self.kb.functions._add_return_from_call( function_addr=dst_node_func_addr, src_function_addr=src_node.function_address, to_node=dst_codenode, ) elif jumpkind == "Ijk_FakeRet": self.kb.functions._add_fakeret_to( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_node=dst_codenode, confirmed=confirmed, ) elif jumpkind in ("Ijk_Boring", "Ijk_InvalICache"): src_obj = self.project.loader.find_object_containing(src_node.addr) dest_obj = self.project.loader.find_object_containing(dst_node.addr) if dst_node is not None else None if src_obj is dest_obj: # Jump/branch within the same object. Might be an outside jump. to_outside = src_node.function_address != dst_node_func_addr else: # Jump/branch between different objects. Must be an outside jump. to_outside = True if not to_outside: self.kb.functions._add_transition_to( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_node=dst_codenode, ins_addr=ins_addr, stmt_idx=stmt_idx, ) else: self.kb.functions._add_outside_transition_to( function_addr=src_node.function_address, from_node=src_node.to_codenode(), to_node=dst_codenode, to_function_addr=dst_node_func_addr, ins_addr=ins_addr, stmt_idx=stmt_idx, ) def _update_function_callsites(self, noreturns): """ Update the callsites of functions (remove return targets) that are calling functions that are just deemed not returning. :param iterable func_addrs: A collection of functions for newly-recovered non-returning functions. :return: None """ for callee_func in noreturns: # consult the callgraph to find callers of each function if callee_func.addr not in self.functions.callgraph: continue caller_addrs = self.functions.callgraph.predecessors(callee_func.addr) for caller_addr in caller_addrs: caller = self.functions[caller_addr] if callee_func not in caller.transition_graph: continue callsites = caller.transition_graph.predecessors(callee_func) for callsite in callsites: caller._add_call_site(callsite.addr, callee_func.addr, None) def _add_additional_edges(self, input_state, sim_successors, cfg_node, successors): """ :return: """ # If we have additional edges for this block, add them in addr = cfg_node.addr if addr in self._additional_edges: dests = self._additional_edges[addr] for dst in dests: if sim_successors.sort == "IRSB": base_state = sim_successors.all_successors[0].copy() else: if successors: # We try to use the first successor. base_state = successors[0].copy() else: # The SimProcedure doesn't have any successor (e.g. it's a PathTerminator) # We'll use its input state instead base_state = input_state base_state.ip = dst # TODO: Allow for sp adjustments successors.append(base_state) l.debug("Additional jump target %#x for block %s is appended.", dst, sim_successors.description) return successors def _filter_insane_successors(self, successors): """ Throw away all successors whose target doesn't make sense This method is called after we resolve an indirect jump using an unreliable method (like, not through one of the indirect jump resolvers, but through either pure concrete execution or backward slicing) to filter out the obviously incorrect successors. :param list successors: A collection of successors. :return: A filtered list of successors :rtype: list """ old_successors = successors[::] successors = [] for i, suc in enumerate(old_successors): if suc.solver.symbolic(suc.ip): # It's symbolic. Take it, and hopefully we can resolve it later successors.append(suc) else: ip_int = suc.solver.eval_one(suc.ip) if ( self._is_address_executable(ip_int) or self.project.is_hooked(ip_int) or self.project.simos.is_syscall_addr(ip_int) ): successors.append(suc) else: l.debug( "An obviously incorrect successor %d/%d (%#x) is ditched", i + 1, len(old_successors), ip_int ) return successors def _remove_non_return_edges(self): """ Remove those return_from_call edges that actually do not return due to calling some not-returning functions. :return: None """ for func in self.kb.functions.values(): graph = func.transition_graph all_return_edges = [(u, v) for (u, v, data) in graph.edges(data=True) if data["type"] == "return"] for return_from_call_edge in all_return_edges: callsite_block_addr, return_to_addr = return_from_call_edge call_func_addr = func.get_call_target(callsite_block_addr) if call_func_addr is None: continue call_func = self.kb.functions.function(call_func_addr) if call_func is None: # Weird... continue if call_func.returning is False: # Remove that edge! graph.remove_edge(call_func_addr, return_to_addr) # Remove the edge in CFG nodes = self.get_all_nodes(callsite_block_addr) for n in nodes: successors = self.get_successors_and_jumpkind(n, excluding_fakeret=False) for successor, jumpkind in successors: if jumpkind == "Ijk_FakeRet" and successor.addr == return_to_addr: self.remove_edge(n, successor) # Remove all dangling nodes # wcc = list(networkx.weakly_connected_components(graph)) # for nodes in wcc: # if func.startpoint not in nodes: # graph.remove_nodes_from(nodes) # Private methods - resolving indirect jumps @staticmethod def _convert_indirect_jump_targets_to_states(job, indirect_jump_targets): """ Convert each concrete indirect jump target into a SimState. If there are non-zero successors and the original jumpkind is a call, we also generate a fake-ret successor. :param job: The CFGJob instance. :param indirect_jump_targets: A collection of concrete jump targets resolved from a indirect jump. :return: A list of SimStates. :rtype: list """ first_successor = job.sim_successors.all_successors[0] successors = [] for t in indirect_jump_targets: # Insert new successors a = first_successor.copy() a.ip = t successors.append(a) # special case: if the indirect jump is in fact an indirect call, we should create a FakeRet successor if successors and first_successor.history.jumpkind == "Ijk_Call": a = first_successor.copy() a.ip = job.cfg_node.addr + job.cfg_node.size a.history.jumpkind = "Ijk_FakeRet" successors.append(a) return successors def _try_resolving_indirect_jumps(self, sim_successors, cfg_node, func_addr, successors, exception_info, artifacts): """ Resolve indirect jumps specified by sim_successors.addr. :param SimSuccessors sim_successors: The SimSuccessors instance. :param CFGNode cfg_node: The CFGNode instance. :param int func_addr: Current function address. :param list successors: A list of successors. :param tuple exception_info: The sys.exc_info() of the exception or None if none occurred. :param artifacts: A container of collected information. :return: Resolved successors :rtype: list """ # Try to resolve indirect jumps with advanced backward slicing (if enabled) if sim_successors.sort == "IRSB" and self._is_indirect_jump(cfg_node, sim_successors): l.debug("IRSB %#x has an indirect jump as its default exit", cfg_node.addr) # We need input states to perform backward slicing if self._advanced_backward_slicing and self._keep_state: # Optimization: make sure we only try to resolve an indirect jump if any of the following criteria holds # - It's a jump (Ijk_Boring), and its target is either fully symbolic, or its resolved target is within # the current binary # - It's a call (Ijk_Call), and its target is fully symbolic # TODO: This is very hackish, please refactor this part of code later should_resolve = True legit_successors = [ suc for suc in successors if suc.history.jumpkind in ("Ijk_Boring", "Ijk_InvalICache", "Ijk_Call") ] if legit_successors: legit_successor = legit_successors[0] if legit_successor.ip.symbolic: if not legit_successor.history.jumpkind == "Ijk_Call": should_resolve = False else: if legit_successor.history.jumpkind == "Ijk_Call": should_resolve = False else: concrete_target = legit_successor.solver.eval(legit_successor.ip) if ( self.project.loader.find_object_containing(concrete_target) is not self.project.loader.main_object ): should_resolve = False else: # No interesting successors... skip should_resolve = False # TODO: Handle those successors if not should_resolve: l.debug("This might not be an indirect jump that has multiple targets. Skipped.") self.kb.unresolved_indirect_jumps.add(cfg_node.addr) else: more_successors = self._backward_slice_indirect(cfg_node, sim_successors, func_addr) if more_successors: # Remove the symbolic successor # TODO: Now we are removing all symbolic successors. Is it possible # TODO: that there is more than one symbolic successor? all_successors = [suc for suc in successors if not suc.solver.symbolic(suc.ip)] # Insert new successors # We insert new successors in the beginning of all_successors list so that we don't break the # assumption that Ijk_FakeRet is always the last element in the list for suc_addr in more_successors: a = sim_successors.all_successors[0].copy() a.ip = suc_addr all_successors.insert(0, a) l.debug("The indirect jump is successfully resolved.") self.kb.indirect_jumps.update_resolved_addrs(cfg_node.addr, more_successors) else: l.debug("Failed to resolve the indirect jump.") self.kb.unresolved_indirect_jumps.add(cfg_node.addr) else: if not successors: l.debug("Cannot resolve the indirect jump without advanced backward slicing enabled: %s", cfg_node) # Try to find more successors if we failed to resolve the indirect jump before if exception_info is None and (cfg_node.is_simprocedure or self._is_indirect_jump(cfg_node, sim_successors)): has_call_jumps = any(suc_state.history.jumpkind == "Ijk_Call" for suc_state in successors) if has_call_jumps: concrete_successors = [ suc_state for suc_state in successors if suc_state.history.jumpkind != "Ijk_FakeRet" and not suc_state.solver.symbolic(suc_state.ip) ] else: concrete_successors = [ suc_state for suc_state in successors if not suc_state.solver.symbolic(suc_state.ip) ] symbolic_successors = [suc_state for suc_state in successors if suc_state.solver.symbolic(suc_state.ip)] resolved = not symbolic_successors if symbolic_successors: for suc in symbolic_successors: if o.SYMBOLIC in suc.options: targets = suc.solver.eval_upto(suc.ip, 32) if len(targets) < 32: all_successors = [] resolved = True for t in targets: new_ex = suc.copy() new_ex.ip = suc.solver.BVV(t, suc.ip.size()) all_successors.append(new_ex) else: break if not resolved and ( (symbolic_successors and not concrete_successors) or (not cfg_node.is_simprocedure and self._is_indirect_jump(cfg_node, sim_successors)) ): l.debug("%s has an indirect jump. See what we can do about it.", cfg_node) if sim_successors.sort == "SimProcedure" and sim_successors.artifacts["adds_exits"]: # Skip those SimProcedures that don't create new SimExits l.debug( "We got a SimProcedure %s in fastpath mode that creates new exits.", sim_successors.description ) if self._enable_symbolic_back_traversal: successors = self._symbolically_back_traverse(sim_successors, artifacts, cfg_node) # mark jump as resolved if we got successors if successors: succ_addrs = [s.addr for s in successors] self.kb.indirect_jumps.update_resolved_addrs(cfg_node.addr, succ_addrs) else: self.kb.unresolved_indirect_jumps.add(cfg_node.addr) l.debug("Got %d concrete exits in symbolic mode.", len(successors)) else: self.kb.unresolved_indirect_jumps.add(cfg_node.addr) # keep fake_rets successors = [s for s in successors if s.history.jumpkind == "Ijk_FakeRet"] elif sim_successors.sort == "IRSB" and any(ex.history.jumpkind != "Ijk_Ret" for ex in successors): # We cannot properly handle Return as that requires us start execution from the caller... l.debug("Try traversal backwards in symbolic mode on %s.", cfg_node) if self._enable_symbolic_back_traversal: successors = self._symbolically_back_traverse(sim_successors, artifacts, cfg_node) # Remove successors whose IP doesn't make sense successors = [ suc for suc in successors if self._is_address_executable(suc.solver.eval_one(suc.ip)) ] # mark jump as resolved if we got successors if successors: succ_addrs = [s.addr for s in successors] self.kb.indirect_jumps.update_resolved_addrs(cfg_node.addr, succ_addrs) else: self.kb.unresolved_indirect_jumps.add(cfg_node.addr) l.debug("Got %d concrete exits in symbolic mode", len(successors)) else: self.kb.unresolved_indirect_jumps.add(cfg_node.addr) successors = [] elif successors and all(ex.history.jumpkind == "Ijk_Ret" for ex in successors): l.debug("All exits are returns (Ijk_Ret). It will be handled by pending exits.") else: l.debug("Cannot resolve this indirect jump: %s", cfg_node) self.kb.unresolved_indirect_jumps.add(cfg_node.addr) return successors def _backward_slice_indirect(self, cfgnode, sim_successors, current_function_addr): """ Try to resolve an indirect jump by slicing backwards """ # TODO: make this a real indirect jump resolver under the new paradigm irsb = sim_successors.artifacts["irsb"] # shorthand l.debug("Resolving indirect jump at IRSB %s", irsb) # Let's slice backwards from the end of this exit next_tmp = irsb.next.tmp stmt_id = [i for i, s in enumerate(irsb.statements) if isinstance(s, pyvex.IRStmt.WrTmp) and s.tmp == next_tmp][ 0 ] cdg = self.project.analyses[CDG].prep(fail_fast=self._fail_fast)(cfg=self) ddg = self.project.analyses[DDG].prep(fail_fast=self._fail_fast)( cfg=self, start=current_function_addr, call_depth=0 ) bc = self.project.analyses[BackwardSlice].prep(fail_fast=self._fail_fast)( self, cdg, ddg, targets=[(cfgnode, stmt_id)], same_function=True ) taint_graph = bc.taint_graph # Find the correct taint next_nodes = [cl for cl in taint_graph.nodes() if cl.block_addr == sim_successors.addr] if not next_nodes: l.error("The target exit is not included in the slice. Something is wrong") return [] next_node = next_nodes[0] # Get the weakly-connected subgraph that contains `next_node` all_subgraphs = ( networkx.induced_subgraph(taint_graph, nodes) for nodes in networkx.weakly_connected_components(taint_graph) ) starts = set() for subgraph in all_subgraphs: if next_node in subgraph: # Make sure there is no symbolic read... # FIXME: This is an over-approximation. We should try to limit the starts more nodes = [n for n in subgraph.nodes() if subgraph.in_degree(n) == 0] for n in nodes: starts.add(n.block_addr) # Execute the slice successing_addresses = set() annotated_cfg = bc.annotated_cfg() for start in starts: l.debug("Start symbolic execution at 0x%x on program slice.", start) # Get the state from our CFG node = self.get_any_node(start) if node is None: # Well, we have to live with an empty state base_state = self.project.factory.blank_state(addr=start) else: base_state = node.input_state.copy() base_state.set_mode("symbolic") base_state.ip = start # Clear all initial taints (register values, memory values, etc.) initial_nodes = [n for n in bc.taint_graph.nodes() if bc.taint_graph.in_degree(n) == 0] for cl in initial_nodes: # Iterate in all actions of this node, and pick corresponding actions cfg_nodes = self.get_all_nodes(cl.block_addr) for n in cfg_nodes: if not n.final_states: continue actions = [ ac for ac in n.final_states[0].history.recent_actions # Normally it's enough to only use the first final state if ac.bbl_addr == cl.block_addr and ac.stmt_idx == cl.stmt_idx ] for ac in actions: if not hasattr(ac, "action"): continue if ac.action == "read": if ac.type == "mem": unconstrained_value = base_state.solver.Unconstrained( "unconstrained", ac.size.ast * 8 ) base_state.memory.store( ac.addr, unconstrained_value, endness=self.project.arch.memory_endness ) elif ac.type == "reg": unconstrained_value = base_state.solver.Unconstrained( "unconstrained", ac.size.ast * 8 ) base_state.registers.store( ac.offset, unconstrained_value, endness=self.project.arch.register_endness ) # Clear the constraints! base_state.release_plugin("solver") # For speed concerns, we are limiting the timeout for z3 solver to 5 seconds base_state.solver._solver.timeout = 5000 sc = self.project.factory.simulation_manager(base_state) sc.use_technique(Slicecutor(annotated_cfg)) sc.use_technique(LoopSeer(bound=1)) sc.run() if sc.cut or sc.deadended: all_deadended_states = sc.cut + sc.deadended for s in all_deadended_states: if s.addr == sim_successors.addr: # We want to get its successors succs = s.step() for succ in succs.flat_successors: successing_addresses.add(succ.addr) else: l.debug("Cannot determine the exit. You need some better ways to recover the exits :-(") l.debug("Resolution is done, and we have %d new successors.", len(successing_addresses)) return list(successing_addresses) def _symbolically_back_traverse(self, current_block, block_artifacts, cfg_node): """ Symbolically executes from ancestor nodes (2-5 times removed) finding paths of execution through the given CFGNode. :param SimSuccessors current_block: SimSuccessor with address to attempt to navigate to. :param dict block_artifacts: Container of IRSB data - specifically used for known persistant register values. :param CFGNode cfg_node: Current node interested around. :returns: Double-checked concrete successors. :rtype: List """ class RegisterProtector: """ A class that prevent specific registers from being overwritten. """ def __init__(self, reg_offset, info_collection): """ Class to overwrite registers. :param int reg_offest: Register offset to overwrite from. :param dict info_collection: New register offsets to use (in container). """ self._reg_offset = reg_offset self._info_collection = info_collection def write_persistent_register(self, state_): """ Writes over given registers from self._info_collection (taken from block_artifacts) :param SimSuccessors state_: state to update registers for """ if state_.inspect.address is None: l.error("state.inspect.address is None. It will be fixed by Yan later.") return if state_.registers.load(self._reg_offset).symbolic: current_run = state_.inspect.address if current_run in self._info_collection and not state_.solver.symbolic( self._info_collection[current_run][self._reg_offset] ): l.debug( "Overwriting %s with %s", state_.registers.load(self._reg_offset), self._info_collection[current_run][self._reg_offset], ) state_.registers.store(self._reg_offset, self._info_collection[current_run][self._reg_offset]) l.debug("Start back traversal from %s", current_block) # Create a partial CFG first temp_cfg = networkx.DiGraph(self.graph) # Reverse it temp_cfg.reverse(copy=False) path_length = 0 concrete_exits = [] if cfg_node not in temp_cfg.nodes(): # TODO: Figure out why this is happening return concrete_exits keep_running = True while not concrete_exits and path_length < 5 and keep_running: path_length += 1 queue = [cfg_node] avoid = set() for _ in range(path_length): new_queue = [] for n in queue: successors = list(temp_cfg.successors(n)) for suc in successors: jk = temp_cfg.get_edge_data(n, suc)["jumpkind"] if jk != "Ijk_Ret": # We don't want to trace into libraries predecessors = list(temp_cfg.predecessors(suc)) avoid |= {p.addr for p in predecessors if p is not n} new_queue.append(suc) queue = new_queue if path_length <= 1: continue for n in queue: # Start symbolic exploration from each block state = self.project.factory.blank_state( addr=n.addr, mode="symbolic", add_options={ o.DO_RET_EMULATION, o.CONSERVATIVE_READ_STRATEGY, } | o.resilience, ) # Avoid concretization of any symbolic read address that is over a certain limit # TODO: test case is needed for this option # Set initial values of persistent regs if n.addr in block_artifacts: for reg in state.arch.persistent_regs: state.registers.store(reg, block_artifacts[n.addr][reg]) for reg in state.arch.persistent_regs: reg_protector = RegisterProtector(reg, block_artifacts) state.inspect.add_breakpoint( "reg_write", BP( BP_AFTER, reg_write_offset=state.arch.registers[reg][0], action=reg_protector.write_persistent_register, ), ) simgr = self.project.factory.simulation_manager(state) simgr.use_technique(LoopSeer(bound=10)) simgr.use_technique(Explorer(find=current_block.addr, avoid=avoid)) simgr.use_technique(LengthLimiter(path_length)) simgr.run() if simgr.found: simgr = self.project.factory.simulation_manager(simgr.one_found, save_unsat=True) simgr.step() if simgr.active or simgr.unsat: keep_running = False concrete_exits.extend(simgr.active) concrete_exits.extend(simgr.unsat) if keep_running: l.debug("Step back for one more run...") # Make sure these successors are actually concrete # We just use the ip, persistent registers, and jumpkind to initialize the original unsat state # TODO: It works for jumptables, but not for calls. We should also handle changes in sp new_concrete_successors = [] for c in concrete_exits: unsat_state = current_block.unsat_successors[0].copy() unsat_state.history.jumpkind = c.history.jumpkind for reg in unsat_state.arch.persistent_regs + ["ip"]: unsat_state.registers.store(reg, c.registers.load(reg)) new_concrete_successors.append(unsat_state) return new_concrete_successors def _get_symbolic_function_initial_state(self, function_addr, fastpath_mode_state=None): """ Symbolically execute the first basic block of the specified function, then returns it. We prepares the state using the already existing state in fastpath mode (if avaiable). :param function_addr: The function address :return: A symbolic state if succeeded, None otherwise """ if function_addr is None: return None if function_addr in self._symbolic_function_initial_state: return self._symbolic_function_initial_state[function_addr] if fastpath_mode_state is not None: fastpath_state = fastpath_mode_state else: if function_addr in self._function_input_states: fastpath_state = self._function_input_states[function_addr] else: raise AngrCFGError("The impossible happened. Please report to Fish.") symbolic_initial_state = self.project.factory.entry_state(mode="symbolic") if fastpath_state is not None: symbolic_initial_state = self.project.simos.prepare_call_state( fastpath_state, initial_state=symbolic_initial_state ) # Find number of instructions of start block func = self.project.kb.functions.get(function_addr) start_block = func._get_block(function_addr) num_instr = start_block.instructions - 1 symbolic_initial_state.ip = function_addr path = self.project.factory.path(symbolic_initial_state) try: sim_successors = self.project.factory.successors(path.state, num_inst=num_instr) except (SimError, AngrError): return None # We execute all but the last instruction in this basic block, so we have a cleaner # state # Start execution! exits = sim_successors.flat_successors + sim_successors.unsat_successors if exits: final_st = None for ex in exits: if ex.satisfiable(): final_st = ex break else: final_st = None self._symbolic_function_initial_state[function_addr] = final_st return final_st # Private methods - function hints def _search_for_function_hints(self, successor_state): """ Scan for constants that might be used as exit targets later, and add them into pending_exits. :param SimState successor_state: A successing state. :return: A list of discovered code addresses. :rtype: list """ function_hints = [] for action in successor_state.history.recent_actions: if action.type == "reg" and action.offset == self.project.arch.ip_offset: # Skip all accesses to IP registers continue if action.type == "exit": # only consider read/write actions continue # Enumerate actions if isinstance(action, SimActionData): data = action.data if data is not None: # TODO: Check if there is a proper way to tell whether this const falls in the range of code # TODO: segments # Now let's live with this big hack... try: const = successor_state.solver.eval_one(data.ast) except Exception: # pylint:disable=broad-exception-caught continue if self._is_address_executable(const): if self._pending_function_hints is not None and const in self._pending_function_hints: continue # target = const # tpl = (None, None, target) # st = self.project._simos.prepare_call_state(self.project.initial_state(mode='fastpath'), # initial_state=saved_state) # st = self.project.initial_state(mode='fastpath') # exits[tpl] = (st, None, None) function_hints.append(const) l.debug( "Got %d possible exits, including: %s", len(function_hints), ", ".join(["0x%x" % f for f in function_hints]) ) return function_hints # Private methods - creation of stuff (SimSuccessors, CFGNode, call-stack, etc.) def _get_simsuccessors(self, addr, job, current_function_addr=None): """ Create the SimSuccessors instance for a block. :param int addr: Address of the block. :param CFGJob job: The CFG job instance with an input state inside. :param int current_function_addr: Address of the current function. :return: A SimSuccessors instance :rtype: SimSuccessors """ exception_info = None state = job.state saved_state = job.state # We don't have to make a copy here # respect the basic block size from base graph block_size = None if self._base_graph is not None: for n in self._base_graph.nodes(): if n.addr == addr: block_size = n.size break try: sim_successors = None if not self._keep_state: if self.project.is_hooked(addr): old_proc = self.project._sim_procedures[addr] is_continuation = old_proc.is_continuation elif self.project.simos.is_syscall_addr(addr): old_proc = self.project.simos.syscall_from_addr(addr) is_continuation = False # syscalls don't support continuation else: old_proc = None is_continuation = None if old_proc is not None and not is_continuation and not old_proc.ADDS_EXITS and not old_proc.NO_RET: # DON'T CREATE USELESS SIMPROCEDURES if we don't care about the accuracy of states # When generating CFG, a SimProcedure will not be created as it is but be created as a # ReturnUnconstrained stub if it satisfies the following conditions: # - It doesn't add any new exits. # - It returns as normal. # In this way, we can speed up the CFG generation by quite a lot as we avoid simulating # those functions like read() and puts(), which has no impact on the overall control flow at all. # # Special notes about SimProcedure continuation: Any SimProcedure instance that is a continuation # will add new exits, otherwise the original SimProcedure wouldn't have been executed anyway. Hence # it's reasonable for us to always simulate a SimProcedure with continuation. old_name = None if old_proc.is_syscall: new_stub = SIM_PROCEDURES["stubs"]["syscall"] ret_to = state.regs.ip_at_syscall else: # normal SimProcedures new_stub = SIM_PROCEDURES["stubs"]["ReturnUnconstrained"] ret_to = None old_name = old_proc.display_name # instantiate the stub new_stub_inst = new_stub(display_name=old_name) sim_successors = self.project.factory.procedure_engine.process( state, procedure=new_stub_inst, force_addr=addr, ret_to=ret_to, ) if sim_successors is None: jumpkind = state.history.jumpkind jumpkind = "Ijk_Boring" if jumpkind is None else jumpkind sim_successors = self.project.factory.successors( state, jumpkind=jumpkind, size=block_size, opt_level=self._iropt_level ) except (SimFastPathError, SimSolverModeError) as ex: if saved_state.mode == "fastpath": # Got a SimFastPathError or SimSolverModeError in FastPath mode. # We wanna switch to symbolic mode for current IRSB. l.debug("Switch to symbolic mode for address %#x", addr) # Make a copy of the current 'fastpath' state l.debug("Symbolic jumps at basic block %#x.", addr) new_state = None if addr != current_function_addr: new_state = self._get_symbolic_function_initial_state(current_function_addr) if new_state is None: new_state = state.copy() new_state.set_mode("symbolic") new_state.options.add(o.DO_RET_EMULATION) # Remove bad constraints # FIXME: This is so hackish... new_state.solver._solver.constraints = [ c for c in new_state.solver.constraints if c.op != "BoolV" or c.args[0] is not False ] new_state.solver._solver._result = None # Swap them saved_state, job.state = job.state, new_state sim_successors, exception_info, _ = self._get_simsuccessors(addr, job) else: exception_info = sys.exc_info() # Got a SimSolverModeError in symbolic mode. We are screwed. # Skip this IRSB l.debug("Caught a SimIRSBError %s. Don't panic, this is usually expected.", ex) inst = SIM_PROCEDURES["stubs"]["PathTerminator"]() sim_successors = ProcedureEngine().process(state, procedure=inst) except SimIRSBError: exception_info = sys.exc_info() # It's a tragedy that we came across some instructions that VEX # does not support. I'll create a terminating stub there l.debug("Caught a SimIRSBError during CFG recovery. Creating a PathTerminator.", exc_info=True) inst = SIM_PROCEDURES["stubs"]["PathTerminator"]() sim_successors = ProcedureEngine().process(state, procedure=inst) except claripy.ClaripyError: exception_info = sys.exc_info() l.debug("Caught a ClaripyError during CFG recovery. Don't panic, this is usually expected.", exc_info=True) # Generate a PathTerminator to terminate the current path inst = SIM_PROCEDURES["stubs"]["PathTerminator"]() sim_successors = ProcedureEngine().process(state, procedure=inst) except SimError: exception_info = sys.exc_info() l.debug("Caught a SimError during CFG recovery. Don't panic, this is usually expected.", exc_info=True) # Generate a PathTerminator to terminate the current path inst = SIM_PROCEDURES["stubs"]["PathTerminator"]() sim_successors = ProcedureEngine().process(state, procedure=inst) except AngrExitError: exception_info = sys.exc_info() l.debug("Caught a AngrExitError during CFG recovery. Don't panic, this is usually expected.", exc_info=True) # Generate a PathTerminator to terminate the current path inst = SIM_PROCEDURES["stubs"]["PathTerminator"]() sim_successors = ProcedureEngine().process(state, procedure=inst) except AngrError: exception_info = sys.exc_info() section = self.project.loader.main_object.find_section_containing(addr) if section is None: sec_name = "No section" else: sec_name = section.name # AngrError shouldn't really happen though l.debug("Caught an AngrError during CFG recovery at %#x (%s)", addr, sec_name, exc_info=True) # We might be on a wrong branch, and is likely to encounter the # "No bytes in memory xxx" exception # Just ignore it sim_successors = None return sim_successors, exception_info, saved_state def _create_new_call_stack(self, addr, all_jobs, job, exit_target, jumpkind): """ Creates a new call stack, and according to the jumpkind performs appropriate actions. :param int addr: Address to create at. :param Simsuccessors all_jobs: Jobs to get stack pointer from or return address. :param CFGJob job: CFGJob to copy current call stack from. :param int exit_target: Address of exit target. :param str jumpkind: The type of jump performed. :returns: New call stack for target block. :rtype: CallStack """ if self._is_call_jumpkind(jumpkind): new_call_stack = job.call_stack_copy() # Notice that in ARM, there are some freaking instructions # like # BLEQ <address> # It should give us three exits: Ijk_Call, Ijk_Boring, and # Ijk_Ret. The last exit is simulated. # Notice: We assume the last exit is the simulated one if len(all_jobs) > 1 and all_jobs[-1].history.jumpkind == "Ijk_FakeRet": se = all_jobs[-1].solver retn_target_addr = se.eval_one(all_jobs[-1].ip, default=0) sp = se.eval_one(all_jobs[-1].regs.sp, default=0) new_call_stack = new_call_stack.call(addr, exit_target, retn_target=retn_target_addr, stack_pointer=sp) elif jumpkind.startswith("Ijk_Sys") and len(all_jobs) == 1: # This is a syscall. It returns to the same address as itself (with a different jumpkind) retn_target_addr = exit_target se = all_jobs[0].solver sp = se.eval_one(all_jobs[0].regs.sp, default=0) new_call_stack = new_call_stack.call(addr, exit_target, retn_target=retn_target_addr, stack_pointer=sp) else: # We don't have a fake return exit available, which means # this call doesn't return. new_call_stack = CallStack() se = all_jobs[-1].solver sp = se.eval_one(all_jobs[-1].regs.sp, default=0) new_call_stack = new_call_stack.call(addr, exit_target, retn_target=None, stack_pointer=sp) elif jumpkind == "Ijk_Ret": # Normal return new_call_stack = job.call_stack_copy() try: new_call_stack = new_call_stack.ret(exit_target) except SimEmptyCallStackError: pass se = all_jobs[-1].solver sp = se.eval_one(all_jobs[-1].regs.sp, default=0) old_sp = job.current_stack_pointer # Calculate the delta of stack pointer if sp is not None and old_sp is not None: delta = sp - old_sp func_addr = job.func_addr if self.kb.functions.function(func_addr) is None: # Create the function if it doesn't exist # FIXME: But hell, why doesn't it exist in the first place? l.error( "Function 0x%x doesn't exist in function manager although it should be there." "Look into this issue later.", func_addr, ) self.kb.functions.function(func_addr, create=True) # Set sp_delta of the function self.kb.functions.function(func_addr, create=True).sp_delta = delta elif jumpkind == "Ijk_FakeRet": # The fake return... new_call_stack = job.call_stack else: # although the jumpkind is not Ijk_Call, it may still jump to a new function... let's see if self.project.is_hooked(exit_target): hooker = self.project.hooked_by(exit_target) if hooker is not procedures.stubs.UserHook.UserHook: # if it's not a UserHook, it must be a function # Update the function address of the most recent call stack frame new_call_stack = job.call_stack_copy() new_call_stack.current_function_address = exit_target else: # TODO: We need a way to mark if a user hook is a function or not # TODO: We can add a map in Project to store this information # For now we are just assuming they are not functions, which is mostly the case new_call_stack = job.call_stack else: # Normal control flow transition new_call_stack = job.call_stack return new_call_stack def _create_cfgnode(self, sim_successors, call_stack, func_addr, block_id=None, depth=None, exception_info=None): """ Create a context-sensitive CFGNode instance for a specific block. :param SimSuccessors sim_successors: The SimSuccessors object. :param CallStack call_stack_suffix: The call stack. :param int func_addr: Address of the current function. :param BlockID block_id: The block ID of this CFGNode. :param int or None depth: Depth of this CFGNode. :return: A CFGNode instance. :rtype: CFGNode """ sa = sim_successors.artifacts # shorthand # Determine if this is a SimProcedure, and further, if this is a syscall syscall = None is_syscall = False if sim_successors.sort == "SimProcedure": is_simprocedure = True if sa["is_syscall"] is True: is_syscall = True syscall = sim_successors.artifacts["procedure"] else: is_simprocedure = False if is_simprocedure: simproc_name = sa["name"].split(".")[-1] if simproc_name == "ReturnUnconstrained" and "resolves" in sa and sa["resolves"] is not None: simproc_name = sa["resolves"] no_ret = False if syscall is not None and sa["no_ret"]: no_ret = True cfg_node = CFGENode( sim_successors.addr, None, self.model, callstack_key=call_stack.stack_suffix(self.context_sensitivity_level), input_state=None, simprocedure_name=simproc_name, syscall_name=syscall, no_ret=no_ret, is_syscall=is_syscall, function_address=sim_successors.addr, block_id=block_id, depth=depth, creation_failure_info=exception_info, thumb=(isinstance(self.project.arch, ArchARM) and sim_successors.addr & 1), ) else: cfg_node = CFGENode( sim_successors.addr, sa["irsb_size"], self.model, callstack_key=call_stack.stack_suffix(self.context_sensitivity_level), input_state=None, is_syscall=is_syscall, function_address=func_addr, block_id=block_id, depth=depth, irsb=sim_successors.artifacts["irsb"], creation_failure_info=exception_info, thumb=(isinstance(self.project.arch, ArchARM) and sim_successors.addr & 1), ) return cfg_node # Private methods - loops and graph normalization def _detect_loops(self, loop_callback=None): """ Loop detection. :param func loop_callback: A callback function for each detected loop backedge. :return: None """ loop_finder = self.project.analyses[LoopFinder].prep(kb=self.kb, fail_fast=self._fail_fast)(normalize=False) if loop_callback is not None: graph_copy = networkx.DiGraph(self._graph) loop: angr.analyses.loopfinder.Loop for loop in loop_finder.loops: loop_callback(graph_copy, loop) self.model.graph = graph_copy # Update loop backedges and graph self._loop_back_edges = list(itertools.chain.from_iterable(loop.continue_edges for loop in loop_finder.loops)) # Private methods - function/procedure/subroutine analysis # Including calling convention, function arguments, etc. def _refine_function_arguments(self, func, callsites): """ :param func: :param callsites: :return: """ for i, c in enumerate(callsites): # Execute one block ahead of the callsite, and execute one basic block after the callsite # In this process, the following tasks are performed: # - Record registers/stack variables that are modified # - Record the change of the stack pointer # - Check if the return value is used immediately # We assume that the stack is balanced before and after the call (you can have caller clean-up of course). # Any abnormal behaviors will be logged. # Hopefully this approach will allow us to have a better understanding of parameters of those function # stubs and function proxies. if c.simprocedure_name is not None: # Skip all SimProcedures continue l.debug("Refining %s at 0x%x (%d/%d).", repr(func), c.addr, i, len(callsites)) # Get a basic block ahead of the callsite blocks_ahead = [c] # the block after blocks_after = [] successors = self.get_successors_and_jumpkind(c, excluding_fakeret=False) for s, jk in successors: if jk == "Ijk_FakeRet": blocks_after = [s] break regs_overwritten = set() stack_overwritten = set() regs_read = set() regs_written = set() try: # Execute the predecessor path = self.project.factory.path( self.project.factory.blank_state(mode="fastpath", addr=blocks_ahead[0].addr) ) path.step() all_successors = path.next_run.successors + path.next_run.unsat_successors if not all_successors: continue suc = all_successors[0] se = suc.solver # Examine the path log actions = suc.history.recent_actions sp = se.eval_one(suc.regs.sp, default=0) + self.project.arch.call_sp_fix for ac in actions: if ac.type == "reg" and ac.action == "write": regs_overwritten.add(ac.offset) elif ac.type == "mem" and ac.action == "write": addr = se.eval_one(ac.addr.ast, default=0) if (self.project.arch.call_pushes_ret and addr >= sp + self.project.arch.bytes) or ( not self.project.arch.call_pushes_ret and addr >= sp ): offset = addr - sp stack_overwritten.add(offset) func.prepared_registers.add(tuple(regs_overwritten)) func.prepared_stack_variables.add(tuple(stack_overwritten)) except (SimError, AngrError): pass try: if blocks_after: path = self.project.factory.path( self.project.factory.blank_state(mode="fastpath", addr=blocks_after[0].addr) ) path.step() all_successors = path.next_run.successors + path.next_run.unsat_successors if not all_successors: continue suc = all_successors[0] actions = suc.history.recent_actions for ac in actions: if ac.type == "reg" and ac.action == "read" and ac.offset not in regs_written: regs_read.add(ac.offset) elif ac.type == "reg" and ac.action == "write": regs_written.add(ac.offset) # Filter registers, remove unnecessary registers from the set # regs_overwritten = self.project.arch.argument_registers.intersection(regs_overwritten) regs_read = self.project.arch.argument_registers.intersection(regs_read) func.registers_read_afterwards.add(tuple(regs_read)) except (SimError, AngrError): pass # Private methods - dominators and post-dominators def _immediate_dominators(self, node, target_graph=None, reverse_graph=False): """ Get all immediate dominators of sub graph from given node upwards. :param str node: id of the node to navigate forwards from. :param networkx.classes.digraph.DiGraph target_graph: graph to analyse, default is self.graph. :param bool reverse_graph: Whether the target graph should be reversed before analysation. :return: each node of graph as index values, with element as respective node's immediate dominator. :rtype: dict """ if target_graph is None: target_graph = self.graph if node not in target_graph: raise AngrCFGError("Target node %s is not in graph." % node) graph = networkx.DiGraph(target_graph) if reverse_graph: # Reverse the graph without deepcopy for n in target_graph.nodes(): graph.add_node(n) for src, dst in target_graph.edges(): graph.add_edge(dst, src) idom = {node: node} order = list(networkx.dfs_postorder_nodes(graph, node)) dfn = {u: i for i, u in enumerate(order)} order.pop() order.reverse() def intersect(u_, v_): """ Finds the highest (in postorder valuing) point of intersection above two node arguments. :param str u_: nx node id. :param str v_: nx node id. :return: intersection of paths. :rtype: str """ while u_ != v_: while dfn[u_] < dfn[v_]: u_ = idom[u_] while dfn[u_] > dfn[v_]: v_ = idom[v_] return u_ changed = True while changed: changed = False for u in order: new_idom = reduce(intersect, (v for v in graph.pred[u] if v in idom)) if u not in idom or idom[u] != new_idom: idom[u] = new_idom changed = True return idom # # Static private utility methods # @staticmethod def _is_indirect_jump(_, sim_successors): """ Determine if this SimIRSB has an indirect jump as its exit """ if sim_successors.artifacts["irsb_direct_next"]: # It's a direct jump return False default_jumpkind = sim_successors.artifacts["irsb_default_jumpkind"] if default_jumpkind not in ("Ijk_Call", "Ijk_Boring", "Ijk_InvalICache"): # It's something else, like a ret of a syscall... we don't care about it return False return True @staticmethod def _generate_block_id(call_stack_suffix, block_addr, is_syscall): if not is_syscall: return BlockID.new(block_addr, call_stack_suffix, "normal") return BlockID.new(block_addr, call_stack_suffix, "syscall") @staticmethod def _block_id_repr(block_id): return repr(block_id) @staticmethod def _block_id_callstack_key(block_id): return block_id.callsite_tuples @staticmethod def _block_id_addr(block_id): return block_id.addr @staticmethod def _block_id_current_func_addr(block_id): """ If we don't have any information about the caller, we have no way to get the address of the current function. :param BlockID block_id: The block ID. :return: The function address if there is one, or None if it's not possible to get. :rtype: int or None """ if block_id.callsite_tuples: return block_id.callsite_tuples[-1] return None # # Other private utility methods # @staticmethod def _is_call_jumpkind(jumpkind): if jumpkind == "Ijk_Call" or jumpkind.startswith("Ijk_Sys_"): return True return False def _push_unresolvable_run(self, block_address): """ Adds this block to the list of unresolvable runs. :param dict block_address: container of IRSB data from run. """ self._unresolvable_runs.add(block_address) def _is_address_executable(self, address): """ Check if the specific address is in one of the executable ranges. :param int address: The address :return: True if it's in an executable range, False otherwise """ for r in self._executable_address_ranges: if r[0] <= address < r[1]: return True return False def _update_thumb_addrs(self, simsuccessors, state): """ :return: """ # For ARM THUMB mode if simsuccessors.sort == "IRSB" and state.thumb: insn_addrs = simsuccessors.artifacts["insn_addrs"] self._thumb_addrs.update(insn_addrs) self._thumb_addrs.update(map(lambda x: x + 1, insn_addrs)) # pylint:disable=bad-builtin def _get_callsites(self, function_address): """ Get where a specific function is called. :param function_address: Address of the target function :return: A list of CFGNodes whose exits include a call/jump to the given function """ all_predecessors = [] nodes = self.get_all_nodes(function_address) for n in nodes: predecessors = list(self.get_predecessors(n)) all_predecessors.extend(predecessors) return all_predecessors def _get_nx_paths(self, begin, end): """ Get the possible (networkx) simple paths between two nodes or addresses corresponding to nodes. Input: addresses or node instances Return: a list of lists of nodes representing paths. """ if isinstance(begin, int) and isinstance(end, int): n_begin = self.get_any_node(begin) n_end = self.get_any_node(end) elif isinstance(begin, CFGENode) and isinstance(end, CFGENode): n_begin = begin n_end = end else: raise AngrCFGError("from and to should be of the same type") self.remove_fakerets() return networkx.all_shortest_paths(self.graph, n_begin, n_end) def _quasi_topological_sort(self): """ Perform a quasi-topological sort on an already constructed CFG graph (a networkx DiGraph) :return: None """ # Clear the existing sorting result self._quasi_topological_order = {} ctr = self._graph.number_of_nodes() for ep in self._entry_points: # FIXME: This is not always correct. We'd better store CFGNodes in self._entry_points ep_node = self.get_any_node(ep) if not ep_node: continue for n in networkx.dfs_postorder_nodes(self._graph, source=ep_node): if n not in self._quasi_topological_order: self._quasi_topological_order[n] = ctr ctr -= 1 def _reset_state_mode(self, state, mode): """ Reset the state mode to the given mode, and apply the custom state options specified with this analysis. :param state: The state to work with. :param str mode: The state mode. :return: None """ state.set_mode(mode) state.options |= self._state_add_options state.options = state.options.difference(self._state_remove_options)
AnalysesHub.register_default("CFGEmulated", CFGEmulated)