Source code for angr.analyses.vsa_ddg

import logging
from collections import defaultdict

import networkx
from . import Analysis, VFG

from ..code_location import CodeLocation
from ..errors import AngrDDGError
from ..sim_variable import SimRegisterVariable, SimMemoryVariable

l = logging.getLogger(name=__name__)


[docs]class DefUseChain: """ Stand for a def-use chain. it is generated by the DDG itself. """
[docs] def __init__(self, def_loc, use_loc, variable): """ Constructor. :param def_loc: :param use_loc: :param variable: :return: """ self.def_loc = def_loc self.use_loc = use_loc self.variable = variable
[docs]class VSA_DDG(Analysis): """ A Data dependency graph based on VSA states. That means we don't (and shouldn't) expect any symbolic expressions. """
[docs] def __init__( self, vfg=None, start_addr=None, interfunction_level=0, context_sensitivity_level=2, keep_data=False, ): """ Constructor. :param vfg: An already constructed VFG. If not specified, a new VFG will be created with other specified parameters. `vfg` and `start_addr` cannot both be unspecified. :param start_addr: The address where to start the analysis (typically, a function's entry point). :param interfunction_level: See VFG analysis. :param context_sensitivity_level: See VFG analysis. :param keep_data: Whether we keep set of addresses as edges in the graph, or just the cardinality of the sets, which can be used as a "weight". """ # sanity check if vfg is None and start_addr is None: raise AngrDDGError("Argument vfg and start_addr cannot both be unspecified.") if vfg is not None: self._vfg = vfg else: self._vfg = self.project.analyses[VFG].prep()( function_start=start_addr, interfunction_level=interfunction_level, context_sensitivity_level=context_sensitivity_level, ) self.graph = networkx.DiGraph() self.keep_data = keep_data self._simproc_map = {} self._imarks = {} self._explore()
# # Properties # def __contains__(self, code_location): """ If `code_location` is in the graph. :param code_location: A CodeLocation instance. :returns: True/False. """ return code_location in self.graph # # Public methods #
[docs] def get_predecessors(self, code_location): """ Returns all predecessors of `code_location`. :param code_location: A CodeLocation instance. :returns: A list of all predecessors. """ return self.graph.predecessors(code_location)
# # Private methods # def _explore(self): """ Starting from the start_node, explore the entire VFG, and perform the following: - Generate def-use chains for all registers and memory addresses using a worklist """ # TODO: The worklist algorithm can definitely use some optimizations. It is a future work. # The worklist holds individual VFGNodes that comes from the VFG # Initialize the worklist with all nodes in VFG worklist = list(self._vfg.graph.nodes()) # Set up a set of worklist for fast inclusion test worklist_set = set(worklist) # A dict storing defs set # variable -> locations live_defs_per_node = {} while worklist: # Pop out a node node = worklist[0] worklist_set.remove(node) worklist = worklist[1:] # Grab all final states. There are usually more than one (one state for each successor), and we gotta # process all of them final_states = node.final_states if node in live_defs_per_node: live_defs = live_defs_per_node[node] else: live_defs = {} live_defs_per_node[node] = live_defs successing_nodes = self._vfg.graph.successors(node) for state in final_states: if state.history.jumpkind == "Ijk_FakeRet" and len(final_states) > 1: # Skip fakerets if there are other control flow transitions available continue # TODO: Match the jumpkind # TODO: Support cases where IP is undecidable corresponding_successors = [n for n in successing_nodes if n.addr == state.solver.eval(state.ip)] if not corresponding_successors: continue successing_node = corresponding_successors[0] new_defs = self._track(state, live_defs) if successing_node in live_defs_per_node: defs_for_next_node = live_defs_per_node[successing_node] else: defs_for_next_node = {} live_defs_per_node[successing_node] = defs_for_next_node changed = False for var, code_loc_set in new_defs.items(): if var not in defs_for_next_node: defs_for_next_node[var] = code_loc_set changed = True else: for code_loc in code_loc_set: if code_loc not in defs_for_next_node[var]: defs_for_next_node[var].add(code_loc) changed = True if changed: # Put all reachable successors back to our worklist again if successing_node not in worklist_set: worklist.append(successing_node) worklist_set.add(successing_node) all_successors_dict = networkx.dfs_successors(self._vfg.graph, source=successing_node) for successors in all_successors_dict.values(): for s in successors: if s not in worklist_set: worklist.append(s) worklist_set.add(s) def _track(self, state, live_defs): """ Given all live definitions prior to this program point, track the changes, and return a new list of live definitions. We scan through the action list of the new state to track the changes. :param state: The input state at that program point. :param live_defs: A list of all live definitions prior to reaching this program point. :returns: A list of new live definitions. """ # Make a copy of live_defs live_defs = live_defs.copy() action_list = list(state.history.recent_actions) # Since all temporary variables are local, we simply track them in a local dict temps = {} # All dependence edges are added to the graph either at the end of this method, or when they are going to be # overwritten by a new edge. This is because we sometimes have to modify a previous edge (e.g. add new labels # to the edge) temps_to_edges = defaultdict(list) regs_to_edges = defaultdict(list) def _annotate_edges_in_dict(dict_, key, **new_labels): """ :param dict_: The dict, can be either `temps_to_edges` or `regs_to_edges` :param key: The key used in finding elements in the dict :param new_labels: New labels to be added to those edges """ for edge_tuple in dict_[key]: # unpack it _, _, labels = edge_tuple for k, v in new_labels.items(): if k in labels: labels[k] = labels[k] + (v,) else: # Construct a tuple labels[k] = (v,) def _dump_edge_from_dict(dict_, key, del_key=True): """ Pick an edge from the dict based on the key specified, add it to our graph, and remove the key from dict. :param dict_: The dict, can be either `temps_to_edges` or `regs_to_edges`. :param key: The key used in finding elements in the dict. """ for edge_tuple in dict_[key]: # unpack it prev_code_loc, current_code_loc, labels = edge_tuple # Add the new edge self._add_edge(prev_code_loc, current_code_loc, **labels) # Clear it if del_key: del dict_[key] for a in action_list: if a.bbl_addr is None: current_code_loc = CodeLocation(None, None, sim_procedure=a.sim_procedure) else: current_code_loc = CodeLocation(a.bbl_addr, a.stmt_idx, ins_addr=a.ins_addr) if a.type == "mem": if a.actual_addrs is None: # For now, mem reads don't necessarily have actual_addrs set properly addr_list = { aw.to_valueset(state) for aw in state.memory._concretize_address_descriptor(a.addr.ast) } else: addr_list = set(a.actual_addrs) for addr in addr_list: variable = SimMemoryVariable(addr, a.data.ast.size()) # TODO: Properly unpack the SAO if a.action == "read": # Create an edge between def site and use site prevdefs = self._def_lookup(live_defs, variable) for prev_code_loc, labels in prevdefs.items(): self._read_edge = True self._add_edge(prev_code_loc, current_code_loc, **labels) else: # if a.action == "write": # Kill the existing live def self._kill(live_defs, variable, current_code_loc) # For each of its register dependency and data dependency, we revise the corresponding edge for reg_off in a.addr.reg_deps: _annotate_edges_in_dict(regs_to_edges, reg_off, subtype="mem_addr") for tmp in a.addr.tmp_deps: _annotate_edges_in_dict(temps_to_edges, tmp, subtype="mem_addr") for reg_off in a.data.reg_deps: _annotate_edges_in_dict(regs_to_edges, reg_off, subtype="mem_data") for tmp in a.data.tmp_deps: _annotate_edges_in_dict(temps_to_edges, tmp, subtype="mem_data") elif a.type == "reg": # For now, we assume a.offset is not symbolic # TODO: Support symbolic register offsets # variable = SimRegisterVariable(a.offset, a.data.ast.size()) variable = SimRegisterVariable(a.offset, self.project.arch.bits) if a.action == "read": # What do we want to do? prevdefs = self._def_lookup(live_defs, variable) if a.offset in regs_to_edges: _dump_edge_from_dict(regs_to_edges, a.offset) for prev_code_loc, labels in prevdefs.items(): edge_tuple = (prev_code_loc, current_code_loc, labels) regs_to_edges[a.offset].append(edge_tuple) else: # write self._kill(live_defs, variable, current_code_loc) elif a.type == "tmp": # tmp is definitely not symbolic if a.action == "read": prev_code_loc = temps[a.tmp] edge_tuple = (prev_code_loc, current_code_loc, {"type": "tmp", "data": a.tmp}) if a.tmp in temps_to_edges: _dump_edge_from_dict(temps_to_edges, a.tmp) temps_to_edges[a.tmp].append(edge_tuple) else: # write temps[a.tmp] = current_code_loc elif a.type == "exit": # exits should only depend on tmps for tmp in a.tmp_deps: prev_code_loc = temps[tmp] edge_tuple = (prev_code_loc, current_code_loc, {"type": "exit", "data": tmp}) if tmp in temps_to_edges: _dump_edge_from_dict(temps_to_edges, tmp) temps_to_edges[tmp].append(edge_tuple) # In the end, dump all other edges in those two dicts for reg_offset in regs_to_edges: _dump_edge_from_dict(regs_to_edges, reg_offset, del_key=False) for tmp in temps_to_edges: _dump_edge_from_dict(temps_to_edges, tmp, del_key=False) return live_defs # TODO : This docstring is out of date, what is addr_list? def _def_lookup(self, live_defs, variable): """ This is a backward lookup in the previous defs. :param addr_list: a list of normalized addresses. Note that, as we are using VSA, it is possible that @a is affected by several definitions. :returns: a dict {stmt:labels} where label is the number of individual addresses of addr_list (or the actual set of addresses depending on the keep_addrs flag) that are definted by stmt. """ prevdefs = {} if variable in live_defs: code_loc_set = live_defs[variable] for code_loc in code_loc_set: # Label edges with cardinality or actual sets of addresses if isinstance(variable, SimMemoryVariable): type_ = "mem" elif isinstance(variable, SimRegisterVariable): type_ = "reg" else: raise AngrDDGError("Unknown variable type %s" % type(variable)) if self.keep_data is True: data = variable prevdefs[code_loc] = {"type": type_, "data": data} else: if code_loc in prevdefs: count = prevdefs[code_loc]["count"] + 1 else: count = 0 prevdefs[code_loc] = {"type": type_, "count": count} return prevdefs def _kill(self, live_defs, variable, code_loc): """ Kill previous defs. `addr_list` is a list of normalized addresses. """ # Case 1: address perfectly match, we kill # Case 2: a is a subset of the original address # Case 3: a is a superset of the original address live_defs[variable] = {code_loc} l.debug("XX CodeLoc %s kills variable %s", code_loc, variable) def _add_edge(self, s_a, s_b, **edge_labels): """ Add an edge in the graph from `s_a` to statement `s_b`, where `s_a` and `s_b` are tuples of statements of the form (irsb_addr, stmt_idx). """ # Is that edge already in the graph ? # If at least one is new, then we are not redoing the same path again if (s_a, s_b) not in self.graph.edges(): self.graph.add_edge(s_a, s_b, **edge_labels) self._new = True l.info("New edge: %s --> %s", s_a, s_b)
[docs] def get_all_nodes(self, simrun_addr, stmt_idx): """ Get all DDG nodes matching the given basic block address and statement index. """ nodes = [] for n in self.graph.nodes(): if n.simrun_addr == simrun_addr and n.stmt_idx == stmt_idx: nodes.add(n) return nodes
from angr.analyses import AnalysesHub AnalysesHub.register_default("VSA_DDG", VSA_DDG)