Source code for angr.analyses.reaching_definitions.dep_graph

from typing import Optional, Dict, Set, Iterable, Union, List
from functools import reduce

import networkx

import claripy
from cle.loader import Loader

from ...code_location import CodeLocation
from ...knowledge_plugins.key_definitions.atoms import Atom, MemoryLocation
from ...knowledge_plugins.key_definitions.definition import Definition
from ...knowledge_plugins.key_definitions.undefined import UNDEFINED
from ...knowledge_plugins.cfg import CFGModel
from .external_codeloc import ExternalCodeLocation


def _is_definition(node):
    return isinstance(node, Definition)


[docs]class DepGraph: """ The representation of a dependency graph: a directed graph, where nodes are definitions, and edges represent uses. Mostly a wrapper around a <networkx.DiGraph>. """ def __init__(self, graph: Optional[networkx.DiGraph] = None): """ :param graph: A graph where nodes are definitions, and edges represent uses. """ # Used for memoization of the `transitive_closure` method. self._transitive_closures: Dict = {} if graph and not all(map(_is_definition, graph.nodes)): raise TypeError("In a DepGraph, nodes need to be <%s>s." % Definition.__name__) self._graph: networkx.DiGraph = graph if graph is not None else networkx.DiGraph() @property def graph(self) -> networkx.DiGraph: return self._graph
[docs] def add_node(self, node: Definition) -> None: """ :param node: The definition to add to the definition-use graph. """ self._graph.add_node(node)
[docs] def add_edge(self, source: Definition, destination: Definition, **labels) -> None: """ The edge to add to the definition-use graph. Will create nodes that are not yet present. :param source: The "source" definition, used by the "destination". :param destination: The "destination" definition, using the variable defined by "source". :param labels: Optional keyword arguments to represent edge labels. """ self._graph.add_edge(source, destination, **labels)
[docs] def nodes(self) -> networkx.classes.reportviews.NodeView: return self._graph.nodes()
[docs] def predecessors(self, node: Definition) -> networkx.classes.reportviews.NodeView: """ :param node: The definition to get the predecessors of. """ return self._graph.predecessors(node)
[docs] def transitive_closure(self, definition: Definition) -> networkx.DiGraph: """ Compute the "transitive closure" of a given definition. Obtained by transitively aggregating the ancestors of this definition in the graph. Note: Each definition is memoized to avoid any kind of recomputation across the lifetime of this object. :param definition: The Definition to get transitive closure for. :return: A graph of the transitive closure of the given definition. """ def _transitive_closure( def_: Definition, graph: networkx.DiGraph, result: networkx.DiGraph, visited: Optional[Set[Definition]] = None, ): """ Returns a joint graph that comprises the transitive closure of all defs that `def_` depends on and the current graph `result`. `result` is updated. """ if def_ in self._transitive_closures: closure = self._transitive_closures[def_] # merge closure into result result.add_edges_from(closure.edges()) return result if def_ not in graph: return result predecessors = list(graph.predecessors(def_)) result.add_node(def_) for pred in predecessors: edge_data = graph.get_edge_data(pred, def_) if edge_data is None: result.add_edge(pred, def_) else: result.add_edge(pred, def_, **edge_data) visited = visited or set() visited.add(def_) predecessors_to_visit = set(predecessors) - set(visited) closure = reduce( lambda acc, def0: _transitive_closure(def0, graph, acc, visited), predecessors_to_visit, result ) self._transitive_closures[def_] = closure return closure return _transitive_closure(definition, self._graph, networkx.DiGraph())
[docs] def contains_atom(self, atom: Atom) -> bool: return any(map(lambda definition: definition.atom == atom, self.nodes()))
[docs] def add_dependencies_for_concrete_pointers_of( self, values: Iterable[Union[claripy.ast.Base, int]], definition: Definition, cfg: CFGModel, loader: Loader ): """ When a given definition holds concrete pointers, make sure the <MemoryLocation>s they point to are present in the dependency graph; Adds them if necessary. :param values: :param definition: The definition which has data that can contain concrete pointers. :param cfg: The CFG, containing information about memory data. :param loader: """ assert definition in self.nodes(), "The given Definition must be present in the given graph." known_predecessor_addresses: List[Union[int, claripy.ast.Base]] = list( # Needs https://github.com/python/mypy/issues/6847 map( lambda definition: definition.atom.addr, # type: ignore filter(lambda p: isinstance(p.atom, MemoryLocation), self.predecessors(definition)), ) ) # concretize addresses where possible concrete_known_pred_addresses = [] for address in known_predecessor_addresses: if isinstance(address, claripy.ast.Base): if address.concrete: concrete_known_pred_addresses.append(address._model_concrete.value) else: concrete_known_pred_addresses.append(address) unknown_concrete_addresses: Set[int] = set() for v in values: if isinstance(v, claripy.ast.Base) and v.concrete: v = v._model_concrete.value if isinstance(v, int): if v not in concrete_known_pred_addresses: unknown_concrete_addresses.add(v) for address in unknown_concrete_addresses: data_at_address = cfg.memory_data.get(address, None) if cfg is not None else None if data_at_address is None or data_at_address.sort not in ["string", "unknown"]: continue section = loader.main_object.find_section_containing(address) read_only = False if section is None else not section.is_writable code_location = CodeLocation(0, 0, info={"readonly": True}) if read_only else ExternalCodeLocation() def _string_and_length_from(data_at_address): if data_at_address.content is None: return UNDEFINED, data_at_address.size else: return data_at_address.content.decode("utf-8"), data_at_address.size + 1 _, string_length = _string_and_length_from(data_at_address) memory_location_definition = Definition( MemoryLocation(address, string_length), code_location, ) self.graph.add_edge(memory_location_definition, definition)