import logging
from collections import defaultdict
import networkx
import pyvex
from . import Analysis
from ..code_location import CodeLocation
from ..errors import SimSolverModeError, SimUnsatError, AngrDDGError
from ..sim_variable import (
SimRegisterVariable,
SimMemoryVariable,
SimTemporaryVariable,
SimConstantVariable,
SimStackVariable,
)
l = logging.getLogger(name=__name__)
[docs]class AST:
"""
A mini implementation for AST
"""
def __init__(self, op, *operands):
self.op = op
self.operands = tuple(operands)
def __hash__(self):
return hash((self.op, self.operands))
def __eq__(self, other):
return type(other) is AST and other.op == self.op and other.operands == self.operands
def __repr__(self):
def _short_repr(a):
return a.short_repr
if len(self.operands) == 1:
return f"{self.op}{_short_repr(self.operands[0])}"
elif len(self.operands) == 2:
return f"{_short_repr(self.operands[0])} {self.op} {_short_repr(self.operands[1])}"
else:
return f"{self.op} ({self.operands})"
[docs]class ProgramVariable:
"""
Describes a variable in the program at a specific location.
:ivar SimVariable variable: The variable.
:ivar CodeLocation location: Location of the variable.
"""
def __init__(self, variable, location, initial=False, arch=None):
self.variable = variable
self.location = location
self.initial = initial
self._arch = arch # for pretty printing
def __hash__(self):
return hash((self.variable, self.location))
def __eq__(self, other):
if not isinstance(other, ProgramVariable):
return False
return self.variable == other.variable and self.location == other.location
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
if self._arch is not None:
s = f"{{{self.variable} @ {self.location}}}"
else:
s = f"{{{self.variable} @ {self.location}}}"
return s
@property
def short_repr(self):
if self._arch is not None:
s = f"{{{self.variable}@{self.location.short_repr}}}"
else:
s = f"{{{self.variable}@{self.location.short_repr}}}"
return s
[docs]class DDGJob:
def __init__(self, cfg_node, call_depth):
self.cfg_node = cfg_node
self.call_depth = call_depth
def __repr__(self):
return "<DDGJob %s, call_depth %d>" % (self.cfg_node, self.call_depth)
[docs]class LiveDefinitions:
"""
A collection of live definitions with some handy interfaces for definition killing and lookups.
"""
def __init__(self):
"""
Constructor.
"""
# byte-to-byte mappings
# TODO: make it copy-on-write in order to save memory.
# TODO: options are either collections.ChainMap or a modified version of simuvex.SimPagedMemory
self._memory_map = defaultdict(set)
self._register_map = defaultdict(set)
self._defs = defaultdict(set)
#
# Overridden methods
#
def __contains__(self, variable):
return variable in self._defs
#
# Public methods
#
[docs] def branch(self):
"""
Create a branch of the current live definition collection.
:return: A new LiveDefinition instance.
:rtype: angr.analyses.ddg.LiveDefinitions
"""
ld = LiveDefinitions()
ld._memory_map = self._memory_map.copy()
ld._register_map = self._register_map.copy()
ld._defs = self._defs.copy()
return ld
[docs] def copy(self):
"""
Make a hard copy of `self`.
:return: A new LiveDefinition instance.
:rtype: angr.analyses.ddg.LiveDefinitions
"""
ld = LiveDefinitions()
ld._memory_map = self._memory_map.copy()
ld._register_map = self._register_map.copy()
ld._defs = self._defs.copy()
return ld
[docs] def add_def(self, variable, location, size_threshold=32):
"""
Add a new definition of variable.
:param SimVariable variable: The variable being defined.
:param CodeLocation location: Location of the varaible being defined.
:param int size_threshold: The maximum bytes to consider for the variable.
:return: True if the definition was new, False otherwise
:rtype: bool
"""
new_defs_added = False
if isinstance(variable, SimRegisterVariable):
if variable.reg is None:
l.warning("add_def: Got a None for a SimRegisterVariable. Consider fixing.")
return new_defs_added
size = min(variable.size, size_threshold)
offset = variable.reg
while offset < variable.reg + size:
if location not in self._register_map[offset]:
new_defs_added = True
self._register_map[offset].add(location)
offset += 1
self._defs[variable].add(location)
elif isinstance(variable, SimMemoryVariable):
size = min(variable.size, size_threshold)
offset = variable.addr
while offset < variable.addr + size:
if location not in self._memory_map[offset]:
new_defs_added = True
self._memory_map[offset].add(location)
offset += 1
self._defs[variable].add(location)
else:
l.error('Unsupported variable type "%s".', type(variable))
return new_defs_added
[docs] def add_defs(self, variable, locations, size_threshold=32):
"""
Add a collection of new definitions of a variable.
:param SimVariable variable: The variable being defined.
:param iterable locations: A collection of locations where the variable was defined.
:param int size_threshold: The maximum bytes to consider for the variable.
:return: True if any of the definition was new, False otherwise
:rtype: bool
"""
new_defs_added = False
for loc in locations:
new_defs_added |= self.add_def(variable, loc, size_threshold=size_threshold)
return new_defs_added
[docs] def kill_def(self, variable, location, size_threshold=32):
"""
Add a new definition for variable and kill all previous definitions.
:param SimVariable variable: The variable to kill.
:param CodeLocation location: The location where this variable is defined.
:param int size_threshold: The maximum bytes to consider for the variable.
:return: None
"""
if isinstance(variable, SimRegisterVariable):
if variable.reg is None:
l.warning("kill_def: Got a None for a SimRegisterVariable. Consider fixing.")
return None
size = min(variable.size, size_threshold)
offset = variable.reg
while offset < variable.reg + size:
self._register_map[offset] = {location}
offset += 1
self._defs[variable] = {location}
elif isinstance(variable, SimMemoryVariable):
size = min(variable.size, size_threshold)
offset = variable.addr
while offset < variable.addr + size:
self._memory_map[offset] = {location}
offset += 1
self._defs[variable] = {location}
else:
l.error('Unsupported variable type "%s".', type(variable))
[docs] def lookup_defs(self, variable, size_threshold=32):
"""
Find all definitions of the variable.
:param SimVariable variable: The variable to lookup for.
:param int size_threshold: The maximum bytes to consider for the variable. For example, if the variable is 100
byte long, only the first `size_threshold` bytes are considered.
:return: A set of code locations where the variable is defined.
:rtype: set
"""
live_def_locs = set()
if isinstance(variable, SimRegisterVariable):
if variable.reg is None:
l.warning("lookup_defs: Got a None for a SimRegisterVariable. Consider fixing.")
return live_def_locs
size = min(variable.size, size_threshold)
offset = variable.reg
while offset < variable.reg + size:
if offset in self._register_map:
live_def_locs |= self._register_map[offset]
offset += 1
elif isinstance(variable, SimMemoryVariable):
size = min(variable.size, size_threshold)
offset = variable.addr
while offset < variable.addr + size:
if offset in self._memory_map:
live_def_locs |= self._memory_map[offset]
offset += 1
else:
# umm unsupported variable type
l.error('Unsupported variable type "%s".', type(variable))
return live_def_locs
[docs] def items(self):
"""
An iterator that returns all live definitions.
:return: The iterator.
:rtype: iter
"""
return self._defs.items()
[docs] def itervariables(self):
"""
An iterator that returns all live variables.
:return: The iterator.
:rtype: iter
"""
return self._defs.keys()
[docs]class DDGViewItem:
def __init__(self, ddg, variable, simplified=False):
self._ddg = ddg
self._variable = variable
self._simplified = simplified
@property
def depends_on(self):
graph = self._ddg.simplified_data_graph if self._simplified else self._ddg.data_graph
if self._variable in graph:
return [
self._to_viewitem(n)
for n, _, data in graph.in_edges(self._variable, data=True)
if data.get("type", None) != "kill"
]
return None
@property
def dependents(self):
graph = self._ddg.simplified_data_graph if self._simplified else self._ddg.data_graph
if self._variable in graph:
return [
self._to_viewitem(n)
for _, n, data in graph.in_edges(self._variable, data=True)
if data.get("type", None) != "kill"
]
return None
def __repr__(self):
s = "[%s, %d dependents, depends on %d]" % (
self._variable,
len(self.dependents),
len(self.depends_on),
)
return s
def __eq__(self, other):
return (
isinstance(other, DDGViewItem)
and self._variable == other._variable
and self._simplified == other._simplified
)
def _to_viewitem(self, prog_var):
"""
Convert a ProgramVariable instance to a DDGViewItem object.
:param ProgramVariable prog_var: The ProgramVariable object to convert.
:return: The converted DDGViewItem object.
:rtype: DDGViewItem
"""
return DDGViewItem(self._ddg, prog_var, simplified=self._simplified)
[docs]class DDGViewInstruction:
def __init__(self, cfg, ddg, insn_addr, simplified=False):
self._cfg = cfg
self._ddg = ddg
self._insn_addr = insn_addr
self._simplified = simplified
# shorthand
self._project = self._ddg.project
def __getitem__(self, key):
arch = self._project.arch
if key in arch.registers:
# it's a register name
reg_offset, size = arch.registers[key]
# obtain the CFGNode
cfg_node = self._cfg.model.get_any_node(self._insn_addr, anyaddr=True)
if cfg_node is None:
# not found
raise KeyError("CFGNode for instruction %#x is not found." % self._insn_addr)
# determine the statement ID
vex_block = self._project.factory.block(
cfg_node.addr, size=cfg_node.size, opt_level=self._cfg._iropt_level
).vex
stmt_idx = None
insn_addr = cfg_node.addr
for i, stmt in enumerate(vex_block.statements):
if isinstance(stmt, pyvex.IRStmt.IMark):
insn_addr = stmt.addr + stmt.delta
elif insn_addr == self._insn_addr:
if isinstance(stmt, pyvex.IRStmt.Put) and stmt.offset == reg_offset:
stmt_idx = i
break
elif insn_addr > self._insn_addr:
break
if stmt_idx is None:
raise KeyError("Cannot find the statement.")
# create a program variable
variable = SimRegisterVariable(reg_offset, size)
location = CodeLocation(cfg_node.addr, stmt_idx, ins_addr=self._insn_addr)
pv = ProgramVariable(variable, location, arch=self._project.arch)
return DDGViewItem(self._ddg, pv, simplified=self._simplified)
@property
def definitions(self):
"""
Get all definitions located at the current instruction address.
:return: A list of ProgramVariable instances.
:rtype: list
"""
defs = set()
if self._simplified:
graph = self._ddg.simplified_data_graph
else:
graph = self._ddg.data_graph
n: ProgramVariable
for n in graph.nodes():
if n.location.ins_addr == self._insn_addr:
defs.add(DDGViewItem(self._ddg, n, simplified=self._simplified))
return list(defs)
[docs]class DDGView:
"""
A view of the data dependence graph.
"""
def __init__(self, cfg, ddg, simplified=False):
self._cfg = cfg
self._ddg = ddg
self._simplified = simplified
# shorthand
self._project = self._ddg.project
def __getitem__(self, key):
if isinstance(key, int):
# instruction address
return DDGViewInstruction(self._cfg, self._ddg, key, simplified=self._simplified)
[docs]class DDG(Analysis):
"""
This is a fast data dependence graph directly generated from our CFG analysis result. The only reason for its
existence is the speed. There is zero guarantee for being sound or accurate. You are supposed to use it only when
you want to track the simplest data dependence, and you do not care about soundness or accuracy.
For a better data dependence graph, please consider performing a better static analysis first (like Value-set
Analysis), and then construct a dependence graph on top of the analysis result (for example, the VFG in angr).
The DDG is based on a CFG, which should ideally be a CFGEmulated generated with the following options:
- keep_state=True to keep all input states
- state_add_options=angr.options.refs to store memory, register, and temporary value accesses
You may want to consider a high value for context_sensitivity_level as well when generating the CFG.
Also note that since we are using states from CFG, any improvement in analysis performed on CFG (like a points-to
analysis) will directly benefit the DDG.
"""
def __init__(self, cfg, start=None, call_depth=None, block_addrs=None):
"""
:param cfg: Control flow graph. Please make sure each node has an associated `state` with it, e.g. by
passing the keep_state=True and state_add_options=angr.options.refs arguments to
CFGEmulated.
:param start: An address, Specifies where we start the generation of this data dependence graph.
:param call_depth: None or integers. A non-negative integer specifies how deep we would like to track in the
call tree. None disables call_depth limit.
:param iterable or None block_addrs: A collection of block addresses that the DDG analysis should be performed
on.
"""
# Sanity check
if not cfg._keep_state:
raise AngrDDGError('CFG must have "keep_state" set to True.')
self._cfg = cfg
self._start = self.project.entry if start is None else start
self._call_depth = call_depth
self._block_addrs = block_addrs
# analysis output
self._stmt_graph = networkx.DiGraph()
self._data_graph = networkx.DiGraph()
self._simplified_data_graph = None
self._ast_graph = networkx.DiGraph() # A mapping of ProgramVariable to ASTs
self._symbolic_mem_ops = set()
# Data dependency graph per function
self._function_data_dependencies = None
self.view = DDGView(self._cfg, self, simplified=False)
self.simple_view = DDGView(self._cfg, self, simplified=True)
# Local variables
self._live_defs = None
self._temp_variables = None
self._temp_register_symbols = None
self._temp_edges = None
self._temp_register_symbols = None
self._variables_per_statement = None
self._custom_data_per_statement = None
self._register_edges = None
# Begin construction!
self._construct()
#
# Properties
#
@property
def graph(self):
"""
:returns: A networkx DiGraph instance representing the dependence relations between statements.
:rtype: networkx.DiGraph
"""
return self._stmt_graph
@property
def data_graph(self):
"""
Get the data dependence graph.
:return: A networkx DiGraph instance representing data dependence.
:rtype: networkx.DiGraph
"""
return self._data_graph
@property
def simplified_data_graph(self):
"""
:return:
"""
if self._simplified_data_graph is None:
self._simplified_data_graph = self._simplify_data_graph(self.data_graph)
return self._simplified_data_graph
@property
def ast_graph(self):
return self._ast_graph
#
# Public methods
#
[docs] def pp(self):
"""
Pretty printing.
"""
# TODO: make it prettier
for src, dst, data in self.graph.edges(data=True):
print(f"{src} <-- {dst}, {data}")
[docs] def dbg_repr(self):
"""
Representation for debugging.
"""
# TODO:
return str(self.graph)
def __contains__(self, code_location):
"""
Returns whether `code_location` is in the graph.
:param code_location: A CodeLocation instance.
:returns: True/False
"""
return code_location in self.graph
[docs] def get_predecessors(self, code_location):
"""
Returns all predecessors of the code location.
:param code_location: A CodeLocation instance.
:returns: A list of all predecessors.
"""
return self.graph.predecessors(code_location)
[docs] def function_dependency_graph(self, func):
"""
Get a dependency graph for the function `func`.
:param func: The Function object in CFG.function_manager.
:returns: A networkx.DiGraph instance.
"""
if self._function_data_dependencies is None:
self._build_function_dependency_graphs()
if func in self._function_data_dependencies:
return self._function_data_dependencies[func]
# Not found
return None
[docs] def data_sub_graph(self, pv, simplified=True, killing_edges=False, excluding_types=None):
"""
Get a subgraph from the data graph or the simplified data graph that starts from node pv.
:param ProgramVariable pv: The starting point of the subgraph.
:param bool simplified: When True, the simplified data graph is used, otherwise the data graph is used.
:param bool killing_edges: Are killing edges included or not.
:param iterable excluding_types: Excluding edges whose types are among those excluded types.
:return: A subgraph.
:rtype: networkx.MultiDiGraph
"""
result = networkx.MultiDiGraph()
result.add_node(pv)
base_graph = self.simplified_data_graph if simplified else self.data_graph
if pv not in base_graph:
return result
# traverse all edges and add them to the result graph if needed
queue = [pv]
traversed = set()
while queue:
elem = queue[0]
queue = queue[1:]
if elem in traversed:
continue
traversed.add(elem)
out_edges = base_graph.out_edges(elem, data=True)
if not killing_edges:
# remove killing edges
out_edges = [(a, b, data) for a, b, data in out_edges if "type" not in data or data["type"] != "kill"]
if excluding_types:
out_edges = [
(a, b, data)
for a, b, data in out_edges
if "type" not in data or data["type"] not in excluding_types
]
for src, dst, data in out_edges:
result.add_edge(src, dst, **data)
if dst not in traversed:
queue.append(dst)
return result
#
# Private methods
#
def _construct(self):
"""
Construct the data dependence graph.
We track the following types of dependence:
- (Intra-IRSB) temporary variable dependencies
- Register dependencies
- Memory dependencies, although it's very limited. See below.
We track the following types of memory access:
- (Intra-functional) Stack read/write.
Trace changes of stack pointers inside a function, and the dereferences of stack pointers.
- (Inter-functional) Stack read/write.
- (Global) Static memory positions.
Keep a map of all accessible memory positions to their source statements per function. After that, we
traverse the CFG and link each pair of reads/writes together in the order of control-flow.
We do not track the following types of memory access
- Symbolic memory access
Well, they cannot be tracked under fastpath mode (which is the mode we are generating the CTF) anyways.
"""
worklist = []
worklist_set = set()
# Initialize the worklist
if self._start is None:
# initial nodes are those nodes in CFG that has no in-degrees
for n in self._cfg.graph.nodes():
if self._cfg.graph.in_degree(n) == 0:
# Put it into the worklist
job = DDGJob(n, 0)
self._worklist_append(job, worklist, worklist_set)
else:
for n in self._cfg.model.get_all_nodes(self._start):
job = DDGJob(n, 0)
self._worklist_append(job, worklist, worklist_set)
# A dict storing defs set
# DDGJob -> LiveDefinition
live_defs_per_node = {}
while worklist:
# Pop out a node
ddg_job = worklist[0]
l.debug("Processing %s.", ddg_job)
node, call_depth = ddg_job.cfg_node, ddg_job.call_depth
worklist = worklist[1:]
worklist_set.remove(node)
# Grab all final states. There are usually more than one (one state for each successor), and we gotta
# process all of them
final_states = node.final_states
if node in live_defs_per_node:
live_defs = live_defs_per_node[node]
else:
live_defs = LiveDefinitions()
live_defs_per_node[node] = live_defs
successing_nodes = list(self._cfg.graph.successors(node))
# try to assign every final state to a successor and vice versa
match_suc = defaultdict(bool)
match_state = defaultdict(set)
for suc in successing_nodes:
matched = False
for state in final_states:
try:
if state.solver.eval(state.ip) == suc.addr:
match_suc[suc.addr] = True
match_state[state].add(suc)
matched = True
except (SimUnsatError, SimSolverModeError, ZeroDivisionError):
# ignore
matched = matched
if not matched:
break
# whether all final states could be matched to a successor and vice versa
matches = len(match_suc) == len(successing_nodes) and len(match_state) == len(final_states)
for state in final_states:
if state.history.jumpkind == "Ijk_FakeRet" and len(final_states) > 1:
# Skip fakerets if there are other control flow transitions available
continue
new_call_depth = call_depth
if state.history.jumpkind == "Ijk_Call":
new_call_depth += 1
elif state.history.jumpkind == "Ijk_Ret":
new_call_depth -= 1
if self._call_depth is not None and call_depth > self._call_depth:
l.debug("Do not trace into %s due to the call depth limit", state.ip)
continue
new_defs = self._track(state, live_defs, node.irsb.statements if node.irsb is not None else None)
# corresponding_successors = [n for n in successing_nodes if
# not state.ip.symbolic and n.addr == state.solver.eval(state.ip)]
# if not corresponding_successors:
# continue
changed = False
# if every successor can be matched with one or more final states (by IP address),
# only take over the LiveDefinition of matching states
if matches:
add_state_to_sucs = match_state[state]
else:
add_state_to_sucs = successing_nodes
for successing_node in add_state_to_sucs:
if (state.history.jumpkind == "Ijk_Call" or state.history.jumpkind.startswith("Ijk_Sys")) and (
state.ip.symbolic or successing_node.addr != state.solver.eval(state.ip)
):
suc_new_defs = self._filter_defs_at_call_sites(new_defs)
else:
suc_new_defs = new_defs
if successing_node in live_defs_per_node:
defs_for_next_node = live_defs_per_node[successing_node]
else:
defs_for_next_node = LiveDefinitions()
live_defs_per_node[successing_node] = defs_for_next_node
for var, code_loc_set in suc_new_defs.items():
# l.debug("Adding %d new definitions for variable %s.", len(code_loc_set), var)
changed |= defs_for_next_node.add_defs(var, code_loc_set)
if changed:
if (self._call_depth is None) or (
self._call_depth is not None and 0 <= new_call_depth <= self._call_depth
):
# Put all reachable successors back to our work-list again
for successor in self._cfg.model.get_all_successors(node):
nw = DDGJob(successor, new_call_depth)
self._worklist_append(nw, worklist, worklist_set)
def _track(self, state, live_defs, statements):
"""
Given all live definitions prior to this program point, track the changes, and return a new list of live
definitions. We scan through the action list of the new state to track the changes.
:param state: The input state at that program point.
:param live_defs: All live definitions prior to reaching this program point.
:param list statements: A list of VEX statements.
:returns: A list of new live definitions.
:rtype: angr.analyses.ddg.LiveDefinitions
"""
# Make a copy of live_defs
self._live_defs = live_defs.copy()
action_list = list(state.history.recent_actions)
# Since all temporary variables are local, we simply track them in a dict
self._temp_variables = {}
self._temp_register_symbols = {}
# All dependence edges are added to the graph either at the end of this method, or when they are going to be
# overwritten by a new edge. This is because we sometimes have to modify a previous edge (e.g. add new labels
# to the edge)
self._temp_edges = defaultdict(list)
self._register_edges = defaultdict(list)
last_statement_id = None
self._variables_per_statement = (
None # program variables read out in the same statement. we keep a copy of those variables here so
)
# we can link it to the tmp_write action right afterwards
self._custom_data_per_statement = None
for a in action_list:
if last_statement_id is None or last_statement_id != a.stmt_idx:
# update statement ID
last_statement_id = a.stmt_idx
statement = (
statements[last_statement_id] if statements and last_statement_id < len(statements) else None
)
# initialize all per-statement data structures
self._variables_per_statement = []
self._custom_data_per_statement = None
if a.sim_procedure is None:
current_code_location = CodeLocation(a.bbl_addr, a.stmt_idx, ins_addr=a.ins_addr)
else:
current_code_location = CodeLocation(None, None, sim_procedure=a.sim_procedure)
if a.type == "exit":
self._handle_exit(a, current_code_location, state, statement)
elif a.type == "operation":
self._handle_operation(a, current_code_location, state, statement)
elif a.type == "constraint":
pass
else:
handler_name = f"_handle_{a.type}_{a.action}"
if hasattr(self, handler_name):
getattr(self, handler_name)(a, current_code_location, state, statement)
else:
l.debug("Skip an unsupported action %s.", a)
return self._live_defs
def _def_lookup(self, variable): # pylint:disable=no-self-use
"""
This is a backward lookup in the previous defs. Note that, as we are using VSA, it is possible that `variable`
is affected by several definitions.
:param angr.analyses.ddg.LiveDefinitions live_defs:
The collection of live definitions.
:param SimVariable: The variable to lookup for definitions.
:returns: A dict {stmt:labels} where label is the number of individual addresses of `addr_list` (or
the actual set of addresses depending on the keep_addrs flag) that are definted by stmt.
"""
prevdefs = {}
for code_loc in self._live_defs.lookup_defs(variable):
# Label edges with cardinality or actual sets of addresses
if isinstance(variable, SimMemoryVariable):
type_ = "mem"
elif isinstance(variable, SimRegisterVariable):
type_ = "reg"
else:
raise AngrDDGError("Unknown variable type %s" % type(variable))
prevdefs[code_loc] = {"type": type_, "data": variable}
return prevdefs
def _kill(self, variable, code_loc): # pylint:disable=no-self-use
"""
Kill previous defs. addr_list is a list of normalized addresses.
"""
# Case 1: address perfectly match, we kill
# Case 2: a is a subset of the original address
# Case 3: a is a superset of the original address
# the previous definition is killed. mark it in data graph.
if variable in self._live_defs:
for loc in self._live_defs.lookup_defs(variable):
pv = ProgramVariable(variable, loc, arch=self.project.arch)
self._data_graph_add_edge(pv, ProgramVariable(variable, code_loc, arch=self.project.arch), type="kill")
self._live_defs.kill_def(variable, code_loc)
def _get_register_size(self, reg_offset):
"""
Get the size of a register.
:param int reg_offset: Offset of the register.
:return: Size in bytes.
:rtype: int
"""
# TODO: support registers that are not aligned
if reg_offset in self.project.arch.register_names:
reg_name = self.project.arch.register_names[reg_offset]
reg_size = self.project.arch.registers[reg_name][1]
return reg_size
l.warning(
"_get_register_size(): unsupported register offset %d. Assum size 1. "
"More register name mappings should be implemented in archinfo.",
reg_offset,
)
return 1
#
# Action handling
#
@staticmethod
def _get_actual_addrs(action, state):
"""
For memory actions, get a list of addresses it operates on.
:param SimAction action: The action object to work with.
:return: A list of addresses that are accessed with that action.
:rtype: list
"""
if action.actual_addrs is None:
# For now, mem reads don't necessarily have actual_addrs set properly
try:
addr_list = {state.solver.eval(action.addr.ast)}
except (SimSolverModeError, SimUnsatError, ZeroDivisionError):
# FIXME: ZeroDivisionError should have been caught by claripy and simuvex.
# FIXME: see claripy issue #75. this is just a temporary workaround.
# it's symbolic... just continue
addr_list = {0x60000000} # TODO: this is a random address that I pick. Fix it.
else:
addr_list = set(action.actual_addrs)
return addr_list
def _create_memory_variable(self, action, addr, addrs):
"""
Create a SimStackVariable or SimMemoryVariable based on action objects and its address.
:param SimAction action: The action to work with.
:param int addr: The address of the memory variable in creation.
:param list addrs: A list of all addresses that the action was effective on.
:return:
"""
variable = None
if len(addrs) == 1 and len(action.addr.tmp_deps) == 1:
addr_tmp = list(action.addr.tmp_deps)[0]
if addr_tmp in self._temp_register_symbols:
# it must be a stack variable
sort, offset = self._temp_register_symbols[addr_tmp]
base_addr = addr - offset
if base_addr < 0:
base_addr += 1 << self.project.arch.bits
variable = SimStackVariable(offset, action.size.ast // 8, base=sort, base_addr=base_addr)
if variable is None:
variable = SimMemoryVariable(addr, action.size.ast // 8)
return variable
def _make_edges(self, action, prog_var):
"""
:param SimAction action:
:param ProgramVariable prog_var:
:return:
"""
# For each of its register dependency and data dependency, we annotate the corresponding edge
for reg_offset in action.addr.reg_deps:
self._stmt_graph_annotate_edges(self._register_edges[reg_offset], subtype="mem_addr")
reg_variable = SimRegisterVariable(reg_offset, self._get_register_size(reg_offset))
prev_defs = self._def_lookup(reg_variable)
for loc, _ in prev_defs.items():
v = ProgramVariable(reg_variable, loc, arch=self.project.arch)
self._data_graph_add_edge(v, prog_var, type="mem_addr")
for tmp in action.addr.tmp_deps:
self._stmt_graph_annotate_edges(self._temp_edges[tmp], subtype="mem_addr")
if tmp in self._temp_variables:
self._data_graph_add_edge(self._temp_variables[tmp], prog_var, type="mem_addr")
if not action.data.reg_deps and not action.data.tmp_deps:
# might be a constant assignment
v = action.data.ast
if not v.symbolic:
const_var = SimConstantVariable(v._model_concrete.value)
const_progvar = ProgramVariable(const_var, prog_var.location)
self._data_graph_add_edge(const_progvar, prog_var, type="mem_data")
else:
for reg_offset in action.data.reg_deps:
self._stmt_graph_annotate_edges(self._register_edges[reg_offset], subtype="mem_data")
reg_variable = SimRegisterVariable(reg_offset, self._get_register_size(reg_offset))
prev_defs = self._def_lookup(reg_variable)
for loc, _ in prev_defs.items():
v = ProgramVariable(reg_variable, loc, arch=self.project.arch)
self._data_graph_add_edge(v, prog_var, type="mem_data")
for tmp in action.data.tmp_deps:
self._stmt_graph_annotate_edges(self._temp_edges[tmp], subtype="mem_data")
if tmp in self._temp_variables:
self._data_graph_add_edge(self._temp_variables[tmp], prog_var, type="mem_data")
def _handle_mem_read(self, action, code_location, state, statement): # pylint:disable=unused-argument
addrs = self._get_actual_addrs(action, state)
for addr in addrs:
variable = self._create_memory_variable(action, addr, addrs)
variables = []
# get all definitions
defs = self._def_lookup(variable)
if defs:
# for each definition, create an edge on the graph
for definition_location, labels in defs.items():
self._stmt_graph_add_edge(definition_location, code_location, **labels)
pv = ProgramVariable(variable, definition_location, arch=self.project.arch)
variables.append(pv)
self._make_edges(action, pv)
else:
# if no definition is found, then this is the first time this variable is accessed
# mark it as "initial"
pv = ProgramVariable(variable, code_location, initial=True, arch=self.project.arch)
variables.append(pv)
self._make_edges(action, pv)
# make sure to put it into the killing set
self._kill(variable, code_location)
for var in variables:
# record accessed variables in var_per_stmt
self._variables_per_statement.append(var)
def _handle_mem_write(self, action, location, state, statement):
addrs = self._get_actual_addrs(action, state)
for addr in addrs:
variable = self._create_memory_variable(action, addr, addrs)
# kill all previous variables
self._kill(variable, location)
# create a new variable at current location
pv = ProgramVariable(variable, location, arch=self.project.arch)
# make edges
self._make_edges(action, pv)
if isinstance(statement, pyvex.IRStmt.Store) and self._variables_per_statement:
if isinstance(statement.data, pyvex.IRExpr.RdTmp):
# assignment
src_tmp_idx = statement.data.tmp
src_tmp_def = next(
s
for s in self._variables_per_statement
if isinstance(s.variable, SimTemporaryVariable) and s.variable.tmp_id == src_tmp_idx
)
self._ast_graph.add_edge(src_tmp_def, pv)
elif isinstance(statement.data, pyvex.IRExpr.Const):
# assignment
const = statement.data.con.value
self._ast_graph.add_edge(ProgramVariable(SimConstantVariable(const), location), pv)
def _handle_reg_read(self, action, location, state, statement): # pylint:disable=unused-argument
reg_offset = action.offset
variable = SimRegisterVariable(reg_offset, action.data.ast.size() // 8)
# What do we want to do?
definitions = self._def_lookup(variable)
# add edges to the statement dependence graph
for definition_location, labels in definitions.items():
self._stmt_graph_add_edge(definition_location, location, **labels)
# record the edge
self._register_edges[reg_offset].append((definition_location, location))
self._variables_per_statement.append(ProgramVariable(variable, definition_location, arch=self.project.arch))
if not definitions:
# the register was never defined before - it must be passed in as an argument
self._variables_per_statement.append(
ProgramVariable(variable, location, initial=True, arch=self.project.arch)
)
# make sure to put it into the killing set
self._kill(variable, location)
if reg_offset == self.project.arch.sp_offset:
self._custom_data_per_statement = ("sp", 0)
elif reg_offset == self.project.arch.bp_offset:
self._custom_data_per_statement = ("bp", 0)
def _handle_reg_write(self, action, location, state, statement): # pylint:disable=unused-argument
reg_offset = action.offset
variable = SimRegisterVariable(reg_offset, action.data.ast.size() // 8)
self._kill(variable, location)
if reg_offset in self._register_edges:
# clear the recoreded edge, since we don't need to alter that edge anymore
del self._register_edges[reg_offset]
# add a node on the data dependence graph
pv = ProgramVariable(variable, location, arch=self.project.arch)
self._data_graph_add_node(pv)
if not action.reg_deps and not action.tmp_deps:
# moving a constant into the register
# try to parse out the constant from statement
const_variable = SimConstantVariable()
if statement is not None:
if isinstance(statement.data, pyvex.IRExpr.Const):
const_variable = SimConstantVariable(value=statement.data.con.value)
const_pv = ProgramVariable(const_variable, location, arch=self.project.arch)
self._data_graph_add_edge(const_pv, pv)
for tmp in action.tmp_deps:
if tmp in self._temp_variables:
self._data_graph_add_edge(self._temp_variables[tmp], pv)
def _handle_tmp_read(self, action, location, state, statement): # pylint:disable=unused-argument
tmp = action.tmp
tmp_var = self._temp_variables[tmp]
def_loc = tmp_var.location
self._stmt_graph_add_edge(def_loc, location, type="tmp", data=action.tmp)
# record the edge
edge_tuple = (def_loc, location)
self._temp_edges[action.tmp].append(edge_tuple)
if tmp in self._temp_register_symbols:
self._custom_data_per_statement = self._temp_register_symbols[tmp]
self._variables_per_statement.append(tmp_var)
def _handle_tmp_write(self, action, location, state, statement): # pylint:disable=unused-argument
ast = None
tmp = action.tmp
pv = ProgramVariable(SimTemporaryVariable(tmp), location, arch=self.project.arch)
if ast is not None:
for operand in ast.operands:
self._ast_graph.add_edge(operand, ast)
self._ast_graph.add_edge(ast, pv)
self._temp_variables[tmp] = pv
# clear existing edges
if tmp in self._temp_edges:
del self._temp_edges[tmp]
for tmp_dep in action.tmp_deps:
if tmp_dep in self._temp_variables:
self._data_graph_add_edge(self._temp_variables[tmp_dep], pv)
if self._custom_data_per_statement is not None:
self._temp_register_symbols[tmp] = self._custom_data_per_statement
for data in self._variables_per_statement:
self._data_graph_add_edge(data, pv)
if isinstance(statement, pyvex.IRStmt.WrTmp) and self._variables_per_statement:
if isinstance(statement.data, pyvex.IRExpr.RdTmp):
# assignment: dst_tmp = src_tmp
for s in filter(
lambda x: isinstance(x.variable, SimTemporaryVariable) and x.variable.tmp_id != tmp,
self._variables_per_statement,
):
self._ast_graph.add_edge(s, pv)
elif isinstance(statement.data, pyvex.IRExpr.Get):
# assignment: dst_tmp = src_reg
for s in filter(lambda x: isinstance(x.variable, SimRegisterVariable), self._variables_per_statement):
self._ast_graph.add_edge(s, pv)
elif isinstance(statement.data, pyvex.IRExpr.Load):
# assignment: dst_tmp = [ src_mem ]
for s in filter(lambda x: isinstance(x.variable, SimMemoryVariable), self._variables_per_statement):
self._ast_graph.add_edge(s, pv)
if not action.tmp_deps and not self._variables_per_statement and not ast:
# read in a constant
# try to parse out the constant from statement
const_variable = SimConstantVariable()
if statement is not None:
if isinstance(statement, pyvex.IRStmt.Dirty):
l.warning("Dirty statements are not supported in DDG for now.")
elif isinstance(statement.data, pyvex.IRExpr.Const):
const_variable = SimConstantVariable(value=statement.data.con.value)
const_pv = ProgramVariable(const_variable, location, arch=self.project.arch)
self._data_graph_add_edge(const_pv, pv)
def _handle_exit(self, action, location, state, statement): # pylint:disable=unused-argument
# exits should only depend on tmps
for tmp in action.tmp_deps:
prev_code_loc = self._temp_variables[tmp].location
# add the edge to the graph
self._stmt_graph_add_edge(prev_code_loc, location, type="exit", data="tmp")
# log the edge
edge_tuple = (prev_code_loc, location)
self._temp_edges[tmp].append(edge_tuple)
def _handle_operation(self, action, location, state, statement): # pylint:disable=unused-argument
if action.op.endswith("Sub32") or action.op.endswith("Sub64"):
# subtract
expr_0, expr_1 = action.exprs
if expr_0.tmp_deps and (not expr_1.tmp_deps and not expr_1.reg_deps):
# tmp - const
const_value = expr_1.ast.args[0]
tmp = next(iter(expr_0.tmp_deps))
if tmp in self._temp_register_symbols:
sort, offset = self._temp_register_symbols[tmp]
offset -= const_value
if offset < 0:
offset += 1 << self.project.arch.bits
self._custom_data_per_statement = (sort, offset)
elif action.op.endswith("Add32") or action.op.endswith("Add64"):
# add
expr_0, expr_1 = action.exprs
if expr_0.tmp_deps and (not expr_1.tmp_deps and not expr_1.reg_deps):
# tmp + const
const_value = expr_1.ast.args[0]
tmp = next(iter(expr_0.tmp_deps))
if tmp in self._temp_register_symbols:
sort, offset = self._temp_register_symbols[tmp]
offset += const_value
if offset >= (1 << self.project.arch.bits):
offset -= 1 << self.project.arch.bits
self._custom_data_per_statement = (sort, offset)
def _process_operation(self, action, location, state, statement): # pylint:disable=unused-argument
if action.op.endswith("Sub32") or action.op.endswith("Sub64"):
# subtract
expr_0, expr_1 = action.exprs
if expr_0.tmp_deps and (not expr_1.tmp_deps and not expr_1.reg_deps):
# tmp - const
const_value = expr_1.ast.args[0]
tmp = next(iter(expr_0.tmp_deps))
const_def = ProgramVariable(SimConstantVariable(const_value), location)
tmp_def = self._temp_variables[tmp]
ast = AST("-", tmp_def, const_def)
return ast
elif action.op.endswith("Add32") or action.op.endswith("Add64"):
# add
expr_0, expr_1 = action.exprs
if expr_0.tmp_deps and (not expr_1.tmp_deps and not expr_1.reg_deps):
# tmp + const
const_value = expr_1.ast.args[0]
tmp = next(iter(expr_0.tmp_deps))
const_def = ProgramVariable(SimConstantVariable(const_value), location)
tmp_def = self._temp_variables[tmp]
ast = AST("+", tmp_def, const_def)
return ast
return None
#
# Graph operations
#
def _data_graph_add_node(self, node):
"""
Add a noe in the data dependence graph.
:param ProgramVariable node: The node to add.
:return: None
"""
self._data_graph.add_node(node)
self._simplified_data_graph = None
def _data_graph_add_edge(self, src, dst, **edge_labels):
"""
Add an edge in the data dependence graph.
:param ProgramVariable src: Source node.
:param ProgramVariable dst: Destination node.
:param edge_labels: All labels associated with the edge.
:return: None
"""
if src in self._data_graph and dst in self._data_graph[src]:
return
self._data_graph.add_edge(src, dst, **edge_labels)
self._simplified_data_graph = None
def _stmt_graph_add_edge(self, src, dst, **edge_labels):
"""
Add an edge in the statement dependence graph from a program location `src` to another program location `dst`.
:param CodeLocation src: Source node.
:param CodeLocation dst: Destination node.
:param edge_labels: All labels associated with the edge.
:returns: None
"""
# Is that edge already in the graph ?
# If at least one is new, then we are not redoing the same path again
if src in self._stmt_graph and dst in self._stmt_graph[src]:
return
self._stmt_graph.add_edge(src, dst, **edge_labels)
def _stmt_graph_annotate_edges(self, edges_to_annotate, **new_labels):
"""
Add new annotations to edges in the statement dependence graph.
:param list edges_to_annotate: A list of edges to annotate.
:param new_labels: New labels to be added to those edges.
:returns: None
"""
graph = self.graph
for src, dst in edges_to_annotate:
if src not in graph:
continue
if dst not in graph[src]:
continue
data = graph[src][dst]
for k, v in new_labels.items():
if k in data:
if v not in data[k]:
data[k] = data[k] + (v,)
else:
# Construct a tuple
data[k] = (v,)
def _simplify_data_graph(self, data_graph): # pylint:disable=no-self-use
"""
Simplify a data graph by removing all temp variable nodes on the graph.
:param networkx.DiGraph data_graph: The data dependence graph to simplify.
:return: The simplified graph.
:rtype: networkx.MultiDiGraph
"""
graph = networkx.MultiDiGraph(data_graph)
all_nodes = [n for n in graph.nodes() if isinstance(n.variable, SimTemporaryVariable)]
for tmp_node in all_nodes:
# remove each tmp node by linking their successors and predecessors directly
in_edges = graph.in_edges(tmp_node, data=True)
out_edges = graph.out_edges(tmp_node, data=True)
for pred, _, _ in in_edges:
graph.remove_edge(pred, tmp_node)
for _, suc, _ in out_edges:
graph.remove_edge(tmp_node, suc)
for pred, _, data_in in in_edges:
for _, suc, data_out in out_edges:
if pred is not tmp_node and suc is not tmp_node:
if suc not in graph[pred]:
data = data_in.copy()
data.update(data_out)
graph.add_edge(pred, suc, **data)
graph.remove_node(tmp_node)
return graph
def _worklist_append(self, node_wrapper, worklist, worklist_set):
"""
Append a CFGNode and its successors into the work-list, and respect the call-depth limit
:param node_wrapper: The NodeWrapper instance to insert.
:param worklist: The work-list, which is a list.
:param worklist_set: A set of all CFGNodes that are inside the work-list, just for the sake of fast look-up.
It will be updated as well.
:returns: A set of newly-inserted CFGNodes (not NodeWrapper instances).
"""
if node_wrapper.cfg_node in worklist_set:
# It's already in the work-list
return
worklist.append(node_wrapper)
worklist_set.add(node_wrapper.cfg_node)
stack = [node_wrapper]
traversed_nodes = {node_wrapper.cfg_node}
inserted = {node_wrapper.cfg_node}
while stack:
nw = stack.pop()
n, call_depth = nw.cfg_node, nw.call_depth
# Get successors
edges = self._cfg.graph.out_edges(n, data=True)
for _, dst, data in edges:
if (
dst not in traversed_nodes # which means we haven't touch this node in this appending procedure
and dst not in worklist_set
): # which means this node is not in the work-list
# We see a new node!
traversed_nodes.add(dst)
if data["jumpkind"] == "Ijk_Call":
if self._call_depth is None or call_depth < self._call_depth:
inserted.add(dst)
new_nw = DDGJob(dst, call_depth + 1)
worklist.append(new_nw)
worklist_set.add(dst)
stack.append(new_nw)
elif data["jumpkind"] == "Ijk_Ret":
if call_depth > 0:
inserted.add(dst)
new_nw = DDGJob(dst, call_depth - 1)
worklist.append(new_nw)
worklist_set.add(dst)
stack.append(new_nw)
else:
new_nw = DDGJob(dst, call_depth)
inserted.add(dst)
worklist_set.add(dst)
worklist.append(new_nw)
stack.append(new_nw)
return inserted
def _build_function_dependency_graphs(self):
"""
Build dependency graphs for each function, and save them in self._function_data_dependencies.
"""
# This is a map between functions and its corresponding dependencies
self._function_data_dependencies = defaultdict(networkx.DiGraph)
# Group all dependencies first
block_addr_to_func = {}
for _, func in self.kb.functions.items():
for block in func.blocks:
block_addr_to_func[block.addr] = func
for src, dst, data in self.graph.edges(data=True):
src_target_func = None
if src.block_addr in block_addr_to_func:
src_target_func = block_addr_to_func[src.block_addr]
self._function_data_dependencies[src_target_func].add_edge(src, dst, **data)
if dst.block_addr in block_addr_to_func:
dst_target_func = block_addr_to_func[dst.block_addr]
if dst_target_func is not src_target_func:
self._function_data_dependencies[dst_target_func].add_edge(src, dst, **data)
#
# Other private methods
#
def _filter_defs_at_call_sites(self, defs):
"""
If we are not tracing into the function that are called in a real execution, we should properly filter the defs
to account for the behavior of the skipped function at this call site.
This function is a WIP. See TODOs inside.
:param defs:
:return:
"""
# TODO: make definition killing architecture independent and calling convention independent
# TODO: use information from a calling convention analysis
filtered_defs = LiveDefinitions()
for variable, locs in defs.items():
if isinstance(variable, SimRegisterVariable):
if self.project.arch.name == "X86":
if variable.reg in (
self.project.arch.registers["eax"][0],
self.project.arch.registers["ecx"][0],
self.project.arch.registers["edx"][0],
):
continue
filtered_defs.add_defs(variable, locs)
return filtered_defs
[docs] def find_definitions(self, variable, location=None, simplified_graph=True):
"""
Find all definitions of the given variable.
:param SimVariable variable:
:param bool simplified_graph: True if you just want to search in the simplified graph instead of the normal
graph. Usually the simplified graph suffices for finding definitions of register
or memory variables.
:return: A collection of all variable definitions to the specific variable.
:rtype: list
"""
if simplified_graph:
graph = self.simplified_data_graph
else:
graph = self.data_graph
defs = []
n: ProgramVariable
for n in graph.nodes():
if n.variable == variable:
if location is None:
defs.append(n)
else:
# TODO: finish this part
if n.location.block_addr == location.block_addr:
defs.append(n)
return defs
[docs] def find_consumers(self, var_def, simplified_graph=True):
"""
Find all consumers to the specified variable definition.
:param ProgramVariable var_def: The variable definition.
:param bool simplified_graph: True if we want to search in the simplified graph, False otherwise.
:return: A collection of all consumers to the specified variable definition.
:rtype: list
"""
if simplified_graph:
graph = self.simplified_data_graph
else:
graph = self.data_graph
if var_def not in graph:
return []
consumers = []
srcs = [var_def]
traversed = set()
while srcs:
src = srcs.pop()
out_edges = graph.out_edges(src, data=True)
for _, dst, data in out_edges:
if "type" in data and data["type"] == "kill":
# skip killing edges
continue
if isinstance(dst.variable, SimTemporaryVariable):
if dst not in traversed:
srcs.append(dst)
traversed.add(dst)
else:
if dst not in consumers:
consumers.append(dst)
return consumers
[docs] def find_killers(self, var_def, simplified_graph=True):
"""
Find all killers to the specified variable definition.
:param ProgramVariable var_def: The variable definition.
:param bool simplified_graph: True if we want to search in the simplified graph, False otherwise.
:return: A collection of all killers to the specified variable definition.
:rtype: list
"""
if simplified_graph:
graph = self.simplified_data_graph
else:
graph = self.data_graph
if var_def not in graph:
return []
killers = []
out_edges = graph.out_edges(var_def, data=True)
for _, dst, data in out_edges:
if "type" in data and data["type"] == "kill":
killers.append(dst)
return killers
[docs] def find_sources(self, var_def, simplified_graph=True):
"""
Find all sources to the specified variable definition.
:param ProgramVariable var_def: The variable definition.
:param bool simplified_graph: True if we want to search in the simplified graph, False otherwise.
:return: A collection of all sources to the specified variable definition.
:rtype: list
"""
if simplified_graph:
graph = self.simplified_data_graph
else:
graph = self.data_graph
if var_def not in graph:
return []
sources = []
defs = [var_def]
traversed = set()
while defs:
definition = defs.pop()
in_edges = graph.in_edges(definition, data=True)
for src, _, data in in_edges:
if "type" in data and data["type"] == "kill":
continue
if isinstance(src.variable, SimTemporaryVariable):
if src not in traversed:
defs.append(src)
traversed.add(src)
else:
if src not in sources:
sources.append(src)
return sources
from angr.analyses import AnalysesHub
AnalysesHub.register_default("DDG", DDG)