Source code for angr.blade

import itertools

import networkx

import pyvex

from .errors import AngrBladeError, SimTranslationError
from .knowledge_plugins.cfg import CFGNode
from .utils.constants import DEFAULT_STATEMENT
from .slicer import SimSlicer


[docs]class Blade: """ Blade is a light-weight program slicer that works with networkx DiGraph containing CFGNodes. It is meant to be used in angr for small or on-the-fly analyses. """
[docs] def __init__( self, graph: networkx.DiGraph, dst_run: int, dst_stmt_idx: int, direction: str = "backward", project=None, cfg=None, ignore_sp: bool = False, ignore_bp: bool = False, ignored_regs=None, max_level: int = 3, base_state=None, stop_at_calls: bool = False, cross_insn_opt=False, max_predecessors: int = 10, include_imarks: bool = True, ): """ :param graph: A graph representing the control flow graph. Note that it does not take angr.analyses.CFGEmulated or angr.analyses.CFGFast. :param dst_run: An address specifying the target SimRun. :param dst_stmt_idx: The target statement index. -1 means executing until the last statement. :param direction: 'backward' or 'forward' slicing. Forward slicing is not yet supported. :param angr.Project project: The project instance. :param angr.analyses.CFGBase cfg: the CFG instance. It will be made mandatory later. :param ignore_sp: Whether the stack pointer should be ignored in dependency tracking. Any dependency from/to stack pointers will be ignored if this options is True. :param ignore_bp: Whether the base pointer should be ignored or not. :param max_level: The maximum number of blocks that we trace back for. :param stop_at_calls: Limit slicing within a single function. Do not proceed when encounters a call edge. :param include_imarks: Should IMarks (instruction boundaries) be included in the slice. :return: None """ self._graph = graph self._dst_run = dst_run self._dst_stmt_idx = dst_stmt_idx self._ignore_sp = ignore_sp self._ignore_bp = ignore_bp self._max_level = max_level self._base_state = base_state self._stop_at_calls = stop_at_calls self._cross_insn_opt = cross_insn_opt self._max_predecessors = max_predecessors self._include_imarks = include_imarks self._slice = networkx.DiGraph() self.project = project self._cfg = cfg.model if self._cfg is None: # `cfg` is made optional only for compatibility concern. It will be made a positional parameter later. raise AngrBladeError('"cfg" must be specified.') if not self._in_graph(self._dst_run): raise AngrBladeError("The specified SimRun %s doesn't exist in graph." % self._dst_run) self._ignored_regs = set() if ignored_regs: for r in ignored_regs: if isinstance(r, int): self._ignored_regs.add(r) else: self._ignored_regs.add(self.project.arch.registers[r][0]) self._run_cache = {} self._traced_runs = set() if direction == "backward": self._backward_slice() elif direction == "forward": raise AngrBladeError("Forward slicing is not implemented yet") else: raise AngrBladeError("Unknown slicing direction %s" % direction)
# # Properties # @property def slice(self): return self._slice # # Public methods #
[docs] def dbg_repr(self, arch=None): if arch is None and self.project is not None: arch = self.project.arch s = "" block_addrs = {a for a, _ in self.slice.nodes()} for block_addr in block_addrs: block_str = " IRSB %#x\n" % block_addr block = self.project.factory.block( block_addr, cross_insn_opt=self._cross_insn_opt, backup_state=self._base_state ).vex included_stmts = {stmt for _, stmt in self.slice.nodes() if _ == block_addr} default_exit_included = any(stmt == DEFAULT_STATEMENT for _, stmt in self.slice.nodes() if _ == block_addr) for i, stmt in enumerate(block.statements): if arch is not None: if isinstance(stmt, pyvex.IRStmt.Put): reg_name = arch.translate_register_name(stmt.offset) stmt_str = stmt.__str__(reg_name=reg_name) elif isinstance(stmt, pyvex.IRStmt.WrTmp) and isinstance(stmt.data, pyvex.IRExpr.Get): reg_name = arch.translate_register_name(stmt.data.offset) stmt_str = stmt.__str__(reg_name=reg_name) else: stmt_str = str(stmt) else: stmt_str = str(stmt) block_str += "%02s %02d | %s\n" % ("+" if i in included_stmts else " ", i, stmt_str) block_str += " + " if default_exit_included else " " if isinstance(block.next, pyvex.IRExpr.Const): block_str += "Next: %#x\n" % block.next.con.value elif isinstance(block.next, pyvex.IRExpr.RdTmp): block_str += "Next: t%d\n" % block.next.tmp else: block_str += "Next: %s\n" % str(block.next) s += block_str s += "\n" return s
# # Private methods # def _get_irsb(self, v): """ Get the IRSB object from an address, a SimRun, or a CFGNode. :param v: Can be one of the following: an address, or a CFGNode. :return: The IRSB instance. :rtype: pyvex.IRSB """ if isinstance(v, CFGNode): v = v.addr if type(v) is int: # Generate an IRSB from self._project if v in self._run_cache: return self._run_cache[v] if self.project: irsb = self.project.factory.block( v, cross_insn_opt=self._cross_insn_opt, backup_state=self._base_state ).vex self._run_cache[v] = irsb return irsb else: raise AngrBladeError("Project must be specified if you give me all addresses for SimRuns") else: raise AngrBladeError("Unsupported SimRun argument type %s" % type(v)) def _get_cfgnode(self, thing): """ Get the CFGNode corresponding to the specific address. :param thing: Can be anything that self._normalize() accepts. Usually it's the address of the node :return: the CFGNode instance :rtype: CFGNode """ return self._cfg.get_any_node(self._get_addr(thing)) @staticmethod def _get_addr(v): """ Get address of the basic block or CFG node specified by v. :param v: Can be one of the following: a CFGNode, or an address. :return: The address. :rtype: int """ if isinstance(v, CFGNode): return v.addr elif type(v) is int: return v else: raise AngrBladeError("Unsupported SimRun argument type %s" % type(v)) def _in_graph(self, v): return self._get_cfgnode(v) in self._graph def _inslice_callback(self, stmt_idx, stmt, infodict): # pylint:disable=unused-argument tpl = (infodict["irsb_addr"], stmt_idx) if "prev" in infodict and infodict["prev"]: prev = infodict["prev"] self._slice.add_edge(tpl, prev) else: self._slice.add_node(tpl) infodict["prev"] = tpl infodict["has_statement"] = True def _backward_slice(self): """ Backward slicing. We support the following IRStmts: # WrTmp # Put We support the following IRExprs: # Get # RdTmp # Const :return: """ temps = set() regs = set() # Retrieve the target: are we slicing from a register(IRStmt.Put), or a temp(IRStmt.WrTmp)? try: stmts = self._get_irsb(self._dst_run).statements except SimTranslationError: return if self._dst_stmt_idx != -1: dst_stmt = stmts[self._dst_stmt_idx] if type(dst_stmt) is pyvex.IRStmt.Put: regs.add(dst_stmt.offset) elif type(dst_stmt) is pyvex.IRStmt.WrTmp: temps.add(dst_stmt.tmp) else: raise AngrBladeError("Incorrect type of the specified target statement. We only support Put and WrTmp.") prev = (self._get_addr(self._dst_run), self._dst_stmt_idx) else: next_expr = self._get_irsb(self._dst_run).next if type(next_expr) is pyvex.IRExpr.RdTmp: temps.add(next_expr.tmp) elif type(next_expr) is pyvex.IRExpr.Const: # A const doesn't rely on anything else! pass else: raise AngrBladeError("Unsupported type for irsb.next: %s" % type(next_expr)) # Then we gotta start from the very last statement! self._dst_stmt_idx = len(stmts) - 1 prev = (self._get_addr(self._dst_run), DEFAULT_STATEMENT) if not temps and not regs: # no dependency return slicer = SimSlicer( self.project.arch, stmts, target_tmps=temps, target_regs=regs, target_stack_offsets=None, inslice_callback=self._inslice_callback, inslice_callback_infodict={ "irsb_addr": self._get_irsb(self._dst_run).addr, "prev": prev, }, include_imarks=self._include_imarks, ) regs = slicer.final_regs if self._ignore_sp and self.project.arch.sp_offset in regs: regs.remove(self.project.arch.sp_offset) if self._ignore_bp and self.project.arch.bp_offset in regs: regs.remove(self.project.arch.bp_offset) for offset in self._ignored_regs: if offset in regs: regs.remove(offset) stack_offsets = slicer.final_stack_offsets prev = slicer.inslice_callback_infodict["prev"] if regs or stack_offsets: cfgnode = self._get_cfgnode(self._dst_run) if cfgnode is not None: in_edges = self._graph.in_edges(cfgnode, data=True) if len(in_edges) > self._max_predecessors: # take the first N edges in_edges = itertools.islice(in_edges, self._max_predecessors) for pred, _, data in in_edges: if "jumpkind" in data: if self._stop_at_calls and data["jumpkind"] in {"Ijk_Call", "Ijk_Ret"}: # Skip calls continue if self.project.is_hooked(pred.addr): # Skip SimProcedures continue self._backward_slice_recursive( self._max_level - 1, pred, regs, stack_offsets, prev, data.get("stmt_idx", None) ) def _backward_slice_recursive(self, level, run, regs, stack_offsets, prev, exit_stmt_idx): if level <= 0: return temps = set() regs = regs.copy() irsb_addr = self._get_addr(run) stmts = self._get_irsb(run).statements if exit_stmt_idx is None or exit_stmt_idx == DEFAULT_STATEMENT: # Initialize the temps set with whatever in the `next` attribute of this irsb next_expr = self._get_irsb(run).next if type(next_expr) is pyvex.IRExpr.RdTmp: temps.add(next_expr.tmp) # add the default exit into our slice self._inslice_callback(DEFAULT_STATEMENT, None, {"irsb_addr": irsb_addr, "prev": prev}) prev = irsb_addr, DEFAULT_STATEMENT # if there are conditional exits, we *always* add them into the slice (so if they should not be taken, we do not # lose the condition) for stmt_idx_, s_ in enumerate(self._get_irsb(run).statements): if type(s_) is not pyvex.IRStmt.Exit: continue if s_.jumpkind != "Ijk_Boring": continue if type(s_.guard) is pyvex.IRExpr.RdTmp: temps.add(s_.guard.tmp) # Put it in our slice self._inslice_callback(stmt_idx_, s_, {"irsb_addr": irsb_addr, "prev": prev}) prev = (irsb_addr, stmt_idx_) infodict = {"irsb_addr": irsb_addr, "prev": prev, "has_statement": False} slicer = SimSlicer( self.project.arch, stmts, target_tmps=temps, target_regs=regs, target_stack_offsets=stack_offsets, inslice_callback=self._inslice_callback, inslice_callback_infodict=infodict, include_imarks=self._include_imarks, ) if not infodict["has_statement"]: # put this block into the slice self._inslice_callback(0, None, infodict) if run in self._traced_runs: return self._traced_runs.add(run) regs = slicer.final_regs if self._ignore_sp and self.project.arch.sp_offset in regs: regs.remove(self.project.arch.sp_offset) if self._ignore_bp and self.project.arch.bp_offset in regs: regs.remove(self.project.arch.bp_offset) stack_offsets = slicer.final_stack_offsets prev = slicer.inslice_callback_infodict["prev"] if regs or stack_offsets: next_node = self._get_cfgnode(run) if next_node is not None: in_edges = self._graph.in_edges(next_node, data=True) if len(in_edges) > self._max_predecessors: # take the first N edges in_edges = itertools.islice(in_edges, self._max_predecessors) for pred, _, data in in_edges: if "jumpkind" in data: if self._stop_at_calls and data["jumpkind"] in {"Ijk_Call", "Ijk_Ret"}: # skip calls as instructed continue if self.project.is_hooked(pred.addr): # Stop at SimProcedures continue self._backward_slice_recursive( level - 1, pred, regs, stack_offsets, prev, data.get("stmt_idx", None) )