import logging
from collections import defaultdict
import networkx
import pyvex
from . import Analysis
from ..code_location import CodeLocation
from ..annocfg import AnnotatedCFG
from ..errors import AngrBackwardSlicingError
from ..utils.constants import DEFAULT_STATEMENT
l = logging.getLogger(name=__name__)
[docs]class BackwardSlice(Analysis):
"""
Represents a backward slice of the program.
"""
# FIXME: BackwardSlice does not work with the engines refactoring. It will be brought back to life after the
# FIXME: DDG refactoring, which will happen shortly.
def __init__(
self,
cfg,
cdg,
ddg,
targets=None,
cfg_node=None,
stmt_id=None,
control_flow_slice=False,
same_function=False,
no_construct=False,
):
"""
Create a backward slice from a specific statement based on provided control flow graph (CFG), control
dependence graph (CDG), and data dependence graph (DDG).
The data dependence graph can be either CFG-based, or Value-set analysis based. A CFG-based DDG is much faster
to generate, but it only reflects those states while generating the CFG, and it is neither sound nor accurate.
The VSA based DDG (called VSA_DDG) is based on static analysis, which gives you a much better result.
:param cfg: The control flow graph.
:param cdg: The control dependence graph.
:param ddg: The data dependence graph.
:param targets: A list of "target" that specify targets of the backward slices. Each target can be a
tuple in form of (cfg_node, stmt_idx), or a CodeLocation instance.
:param cfg_node: Deprecated. The target CFGNode to reach. It should exist in the CFG.
:param stmt_id: Deprecated. The target statement to reach.
:param control_flow_slice: True/False, indicates whether we should slice only based on CFG. Sometimes when
acquiring DDG is difficult or impossible, you can just create a slice on your CFG.
Well, if you don't even have a CFG, then...
:param no_construct: Only used for testing and debugging to easily create a BackwardSlice object.
"""
self._cfg = cfg
self._cdg = cdg
self._ddg = ddg
self._same_function = same_function
# All targets
self._targets = []
if cfg_node is not None or stmt_id is not None:
l.warning('"cfg_node" and "stmt_id" are deprecated. Please use "targets" to pass in one or more targets.')
self._targets = [(cfg_node, stmt_id)]
if targets is not None:
for t in targets:
if isinstance(t, CodeLocation):
node = self._cfg.model.get_any_node(t.block_addr)
self._targets.append((node, t.stmt_idx))
elif type(t) is tuple:
self._targets.append(t)
else:
raise AngrBackwardSlicingError("Unsupported type of target %s" % t)
# Save a list of taints to begin with at the beginning of each SimRun
self.initial_taints_per_run = None
self.runs_in_slice = None
self.cfg_nodes_in_slice = None
# IDs of all chosen statement for each SimRun
self.chosen_statements = defaultdict(set)
# IDs for all chosen exit statements as well as their corresponding targets
self.chosen_exits = defaultdict(list)
if not no_construct:
self._construct(self._targets, control_flow_slice=control_flow_slice)
#
# Public methods
#
def __repr__(self):
s = "BackwardSlice (to %s)" % self._targets
return s
[docs] def dbg_repr(self, max_display=10):
"""
Debugging output of this slice.
:param max_display: The maximum number of SimRun slices to show.
:return: A string representation.
"""
s = repr(self) + "\n"
if len(self.chosen_statements) > max_display:
s += "%d SimRuns in program slice, displaying %d.\n" % (len(self.chosen_statements), max_display)
else:
s += "%d SimRuns in program slice.\n" % len(self.chosen_statements)
# Pretty-print the first `max_display` basic blocks
if max_display is None:
# Output all
run_addrs = sorted(self.chosen_statements.keys())
else:
# Only output the first "max_display" ones
run_addrs = sorted(self.chosen_statements.keys())[:max_display]
for run_addr in run_addrs:
s += self.dbg_repr_run(run_addr) + "\n"
return s
[docs] def dbg_repr_run(self, run_addr):
"""
Debugging output of a single SimRun slice.
:param run_addr: Address of the SimRun.
:return: A string representation.
"""
if self.project.is_hooked(run_addr):
ss = "%#x Hooked\n" % run_addr
else:
ss = "%#x\n" % run_addr
# statements
chosen_statements = self.chosen_statements[run_addr]
vex_block = self.project.factory.block(run_addr).vex
statements = vex_block.statements
for i in range(0, len(statements)):
if i in chosen_statements:
line = "+"
else:
line = "-"
line += "[% 3d] " % i
line += str(statements[i])
ss += line + "\n"
# exits
targets = self.chosen_exits[run_addr]
addr_strs = []
for exit_stmt_id, target_addr in targets:
if target_addr is None:
addr_strs.append("default")
else:
addr_strs.append("%#x" % target_addr)
ss += "Chosen exits: " + ", ".join(addr_strs)
return ss
[docs] def annotated_cfg(self, start_point=None):
"""
Returns an AnnotatedCFG based on slicing result.
"""
# TODO: Support context-sensitivity
targets = []
for simrun, stmt_idx in self._targets:
targets.append((simrun.addr, stmt_idx))
l.debug("Initializing AnnoCFG...")
anno_cfg = AnnotatedCFG(self.project, self._cfg)
for simrun, stmt_idx in self._targets:
if stmt_idx != -1:
anno_cfg.set_last_statement(simrun.addr, stmt_idx)
for n in self._cfg.graph.nodes():
run = n
if run.addr in self.chosen_statements:
if self.chosen_statements[run.addr] is True:
anno_cfg.add_block_to_whitelist(run.addr)
else:
anno_cfg.add_statements_to_whitelist(run.addr, self.chosen_statements[run.addr])
for src, dst in self._cfg.graph.edges():
run = src
if dst.addr in self.chosen_statements and src.addr in self.chosen_statements:
anno_cfg.add_exit_to_whitelist(run.addr, dst.addr)
return anno_cfg
[docs] def is_taint_impacting_stack_pointers(self, simrun_addr, stmt_idx, taint_type, simrun_whitelist=None):
"""
Query in taint graph to check if a specific taint will taint the stack pointer in the future or not.
The taint is specified with the tuple (simrun_addr, stmt_idx, taint_type).
:param simrun_addr: Address of the SimRun.
:param stmt_idx: Statement ID.
:param taint_type: Type of the taint, might be one of the following: 'reg', 'tmp', 'mem'.
:param simrun_whitelist: A list of SimRun addresses that are whitelisted.
:returns: True/False.
"""
if simrun_whitelist is None:
simrun_whitelist = set()
if type(simrun_whitelist) is not set:
simrun_whitelist = set(simrun_whitelist)
# Find the specific taint in our graph
taint = None
for n in self.taint_graph.nodes():
if n.type == taint_type and n.addr == simrun_addr and n.stmt_id == stmt_idx:
taint = n
break
if taint is None:
raise AngrBackwardSlicingError("The specific taint is not found")
bfs_tree = networkx.bfs_tree(self.taint_graph, taint)
# A node is tainting the stack pointer if one of the following criteria holds:
# - a descendant register is the sp/bp itself
for descendant in bfs_tree.nodes():
if descendant.type == "reg" and (
descendant.reg in (self.project.arch.sp_offset, self.project.arch.bp_offset)
):
return True
return False
#
# Private methods
#
def _construct(self, targets, control_flow_slice=False):
"""
Construct a dependency graph based on given parameters.
:param targets: A list of tuples like (CFGNode, statement ID)
:param control_flow_slice: Is the backward slicing only depends on CFG or not.
"""
if control_flow_slice:
simruns = [r for r, _ in targets]
self._construct_control_flow_slice(simruns)
else:
self._construct_default(targets)
def _construct_control_flow_slice(self, simruns):
"""
Build a slice of the program without considering the effect of data dependencies.
This is an incorrect hack, but it should work fine with small programs.
:param simruns: A list of SimRun targets. You probably wanna get it from the CFG somehow. It must exist in the
CFG.
"""
# TODO: Support context-sensitivity!
if self._cfg is None:
l.error("Please build CFG first.")
cfg = self._cfg.graph
for simrun in simruns:
if simrun not in cfg:
l.error("SimRun instance %s is not in the CFG.", simrun)
stack = []
for simrun in simruns:
stack.append(simrun)
self.runs_in_slice = networkx.DiGraph()
self.cfg_nodes_in_slice = networkx.DiGraph()
self.chosen_statements = {}
while stack:
# Pop one out
block = stack.pop()
if block.addr not in self.chosen_statements:
self.chosen_statements[block.addr] = True
# Get all predecessors of that block
predecessors = cfg.predecessors(block)
for pred in predecessors:
stack.append(pred)
self.cfg_nodes_in_slice.add_edge(pred, block)
self.runs_in_slice.add_edge(pred.addr, block.addr)
def _construct_default(self, targets):
"""
Create a backward slice from a specific statement in a specific block. This is done by traverse the CFG
backwards, and mark all tainted statements based on dependence graphs (CDG and DDG) provided initially. The
traversal terminated when we reach the entry point, or when there is no unresolved dependencies.
:param targets: A list of tuples like (cfg_node, stmt_idx), where cfg_node is a CFGNode instance where the
backward slice starts, and it must be included in CFG and CDG. stmt_idx is the ID of the target
statement where the backward slice starts.
"""
# TODO: Support context-sensitivity
l.debug("Constructing a default backward program slice")
self.taint_graph = networkx.DiGraph()
taints = set()
accessed_taints = set()
# Fill in the taint set
for cfg_node, stmt_idx in targets:
if cfg_node not in self._cfg.graph:
raise AngrBackwardSlicingError("Target CFGNode %s is not in the CFG." % cfg_node)
if stmt_idx == -1:
new_taints = self._handle_control_dependence(cfg_node)
taints |= new_taints
else:
cl = CodeLocation(cfg_node.addr, stmt_idx)
taints.add(cl)
while taints:
# Pop a tainted code location
tainted_cl = taints.pop()
l.debug("Checking taint %s...", tainted_cl)
# Mark it as picked
if tainted_cl.block_addr is not None and tainted_cl.stmt_idx is not None:
# Skip SimProcedures
self._pick_statement(tainted_cl.block_addr, tainted_cl.stmt_idx)
# Mark it as accessed
accessed_taints.add(tainted_cl)
# Pick all its data dependencies from data dependency graph
if self._ddg is not None and tainted_cl in self._ddg:
if isinstance(self._ddg, networkx.DiGraph):
predecessors = list(self._ddg.predecessors(tainted_cl))
else:
# angr.analyses.DDG
predecessors = list(self._ddg.get_predecessors(tainted_cl))
l.debug("Returned %d predecessors for %s from data dependence graph", len(predecessors), tainted_cl)
for p in predecessors:
if p not in accessed_taints:
taints.add(p)
self.taint_graph.add_edge(p, tainted_cl)
# Handle the control dependence
for n in self._cfg.model.get_all_nodes(tainted_cl.block_addr):
new_taints = self._handle_control_dependence(n)
l.debug("Returned %d taints for %s from control dependence graph", len(new_taints), n)
for taint in new_taints:
if taint not in accessed_taints:
taints.add(taint)
self.taint_graph.add_edge(taint, tainted_cl)
# In the end, map the taint graph onto CFG
self._map_to_cfg()
def _find_exits(self, src_block, target_block):
"""
Source block has more than one exit, and through some of those exits, the control flow can eventually go to
the target block. This method returns exits that lead to the target block.
:param src_block: The block that has multiple exits.
:param target_block: The target block to reach.
:returns: a dict of statement ID -> a list of target IPs (or None if the exit should not be taken), each
corresponds to an exit to take in order to reach the target.
For example, it returns the following dict:
{
'default': None, # It has a default exit, but shouldn't be taken
15: [ 0x400080 ], # Statement 15 is an exit statement, and should be taken when the target is
# 0x400080
28: None # Statement 28 is an exit statement, but shouldn't be taken
}
"""
# Enumerate all statements and find exit statements
# Since we don't have a state, we have to rely on the pyvex block instead of SimIRSB
# Just create the block from pyvex again - not a big deal
if self.project.is_hooked(src_block.addr):
# Just return all exits for now
return {-1: [target_block.addr]}
block = self.project.factory.block(src_block.addr)
vex_block = block.vex
exit_stmt_ids = {}
for stmt_idx, stmt in enumerate(vex_block.statements):
if isinstance(stmt, pyvex.IRStmt.Exit):
exit_stmt_ids[stmt_idx] = None
# And of course, it has a default exit
# Don't forget about it.
exit_stmt_ids[DEFAULT_STATEMENT] = None
# Find all paths from src_block to target_block
# FIXME: This is some crappy code written in a hurry. Replace the all_simple_paths() later.
all_simple_paths = list(networkx.all_simple_paths(self._cfg.graph, src_block, target_block, cutoff=3))
for simple_path in all_simple_paths:
if len(simple_path) <= 1:
# Oops, it looks that src_block and target_block are the same guy?
continue
if self._same_function:
# Examine this path and make sure it does not have call or return edge
for i in range(len(simple_path) - 1):
jumpkind = self._cfg.graph[simple_path[i]][simple_path[i + 1]]["jumpkind"]
if jumpkind in ("Ijk_Call", "Ijk_Ret"):
return {}
# Get the first two nodes
a, b = simple_path[0], simple_path[1]
# Get the exit statement ID from CFG
exit_stmt_id = self._cfg.model.get_exit_stmt_idx(a, b)
if exit_stmt_id is None:
continue
# Mark it!
if exit_stmt_ids[exit_stmt_id] is None:
exit_stmt_ids[exit_stmt_id] = [b.addr]
else:
exit_stmt_ids[exit_stmt_id].append(b.addr)
return exit_stmt_ids
def _handle_control_dependence(self, target_node):
"""
Based on control dependence graph, pick all exits (statements) that lead to the target.
:param target_node: A CFGNode instance.
:returns: A set of new tainted code locations.
"""
new_taints = set()
# Query the CDG and figure out all control flow transitions to reach this target
cdg_guardians = self._cdg.get_guardians(target_node)
if not cdg_guardians:
# this block is directly reachable from the entry point
pass
else:
# For each predecessor on CDG, find the correct exit to take, and continue slicing from those exits
for predecessor in cdg_guardians:
exits = self._find_exits(predecessor, target_node)
for stmt_idx, target_addresses in exits.items():
if stmt_idx is not None:
# If it's an exit statement, mark it as picked
self._pick_statement(predecessor.addr, self._normalize_stmt_idx(predecessor.addr, stmt_idx))
# If it's the default statement, we should also pick other conditional exit statements
if stmt_idx == DEFAULT_STATEMENT:
conditional_exits = self._conditional_exits(predecessor.addr)
for conditional_exit_stmt_id in conditional_exits:
cl = CodeLocation(
predecessor.addr,
self._normalize_stmt_idx(predecessor.addr, conditional_exit_stmt_id),
)
new_taints.add(cl)
self._pick_statement(
predecessor.addr,
self._normalize_stmt_idx(predecessor.addr, conditional_exit_stmt_id),
)
if target_addresses is not None:
if stmt_idx is not None:
# If it's an exit statement, we create a new tainted code location
cl = CodeLocation(predecessor.addr, self._normalize_stmt_idx(predecessor.addr, stmt_idx))
new_taints.add(cl)
# Mark those exits as picked
for target_address in target_addresses:
self._pick_exit(predecessor.addr, stmt_idx, target_address)
# On CFG, pick default exits of all nodes between predecessor and our target node
# Usually this is not required if basic blocks strictly end at control flow transitions. But this is
# not always the case for some architectures
all_simple_paths = list(
networkx.all_simple_paths(self._cfg.graph, predecessor, target_node, cutoff=3)
)
previous_node = None
for path in all_simple_paths:
for node in path:
self._pick_statement(node.addr, self._normalize_stmt_idx(node.addr, DEFAULT_STATEMENT))
if previous_node is not None:
self._pick_exit(previous_node.addr, DEFAULT_STATEMENT, node.addr)
return new_taints
def _map_to_cfg(self):
"""
Map our current slice to CFG.
Based on self._statements_per_run and self._exit_statements_per_run, this method will traverse the CFG and
check if there is any missing block on the path. If there is, the default exit of that missing block will be
included in the slice. This is because Slicecutor cannot skip individual basic blocks along a path.
"""
exit_statements_per_run = self.chosen_exits
new_exit_statements_per_run = defaultdict(list)
while len(exit_statements_per_run):
for block_address, exits in exit_statements_per_run.items():
for stmt_idx, exit_target in exits:
if exit_target not in self.chosen_exits:
# Oh we found one!
# The default exit should be taken no matter where it leads to
# Add it to the new set
tpl = (DEFAULT_STATEMENT, None)
if tpl not in new_exit_statements_per_run[exit_target]:
new_exit_statements_per_run[exit_target].append(tpl)
# Add the new ones to our global dict
for block_address, exits in new_exit_statements_per_run.items():
for ex in exits:
if ex not in self.chosen_exits[block_address]:
self.chosen_exits[block_address].append(ex)
# Switch them so we can process the new set
exit_statements_per_run = new_exit_statements_per_run
new_exit_statements_per_run = defaultdict(list)
def _pick_statement(self, block_address, stmt_idx):
"""
Include a statement in the final slice.
:param int block_address: Address of the basic block.
:param int stmt_idx: Statement ID.
"""
# TODO: Support context-sensitivity
# Sanity check
if not isinstance(block_address, int):
raise AngrBackwardSlicingError("Invalid block address %s." % block_address)
if not isinstance(stmt_idx, int):
raise AngrBackwardSlicingError("Invalid statement ID %s." % stmt_idx)
self.chosen_statements[block_address].add(stmt_idx)
def _pick_exit(self, block_address, stmt_idx, target_ips):
"""
Include an exit in the final slice.
:param block_address: Address of the basic block.
:param stmt_idx: ID of the exit statement.
:param target_ips: The target address of this exit statement.
"""
# TODO: Support context-sensitivity
tpl = (stmt_idx, target_ips)
if tpl not in self.chosen_exits[block_address]:
self.chosen_exits[block_address].append(tpl)
#
# Helper functions
#
def _conditional_exits(self, block_addr):
"""
Return a list of conditional statement exits with respect to a basic block.
:param block_addr: The address of the basic block.
:return: A list of statement IDs.
"""
vex_block = self.project.factory.block(block_addr).vex
lst = []
for i, stmt in enumerate(vex_block.statements):
if isinstance(stmt, pyvex.IRStmt.Exit):
lst.append(i)
return lst
def _normalize_stmt_idx(self, block_addr, stmt_idx):
"""
For each statement ID, convert 'default' to (last_stmt_idx+1)
:param block_addr: The block address.
:param stmt_idx: Statement ID.
:returns: New statement ID.
"""
if type(stmt_idx) is int:
return stmt_idx
if stmt_idx == DEFAULT_STATEMENT:
vex_block = self.project.factory.block(block_addr).vex
return len(vex_block.statements)
raise AngrBackwardSlicingError('Unsupported statement ID "%s"' % stmt_idx)
@staticmethod
def _last_branching_statement(statements):
"""
Search for the last branching exit, just like
# if (t12) { PUT(184) = 0xBADF00D:I64; exit-Boring }
and then taint the temp variable inside if predicate
"""
cmp_stmt_id = None
cmp_tmp_id = None
total_stmts = len(statements)
statements = reversed(statements)
for stmt_rev_idx, stmt in enumerate(statements):
if isinstance(stmt, pyvex.IRStmt.Exit):
stmt_idx = total_stmts - stmt_rev_idx - 1
cmp_stmt_id = stmt_idx
cmp_tmp_id = stmt.guard.tmp
return cmp_stmt_id, cmp_tmp_id
from angr.analyses import AnalysesHub
AnalysesHub.register_default("BackwardSlice", BackwardSlice)