from typing import Optional, Dict, Set, Iterable, Union, List
from functools import reduce
import networkx
import claripy
from cle.loader import Loader
from ...code_location import CodeLocation
from ...knowledge_plugins.key_definitions.atoms import Atom, MemoryLocation
from ...knowledge_plugins.key_definitions.definition import Definition
from ...knowledge_plugins.key_definitions.undefined import UNDEFINED
from ...knowledge_plugins.cfg import CFGModel
from .external_codeloc import ExternalCodeLocation
def _is_definition(node):
return isinstance(node, Definition)
[docs]class DepGraph:
"""
The representation of a dependency graph: a directed graph, where nodes are definitions, and edges represent uses.
Mostly a wrapper around a <networkx.DiGraph>.
"""
def __init__(self, graph: Optional[networkx.DiGraph] = None):
"""
:param graph: A graph where nodes are definitions, and edges represent uses.
"""
# Used for memoization of the `transitive_closure` method.
self._transitive_closures: Dict = {}
if graph and not all(map(_is_definition, graph.nodes)):
raise TypeError("In a DepGraph, nodes need to be <%s>s." % Definition.__name__)
self._graph: networkx.DiGraph = graph if graph is not None else networkx.DiGraph()
@property
def graph(self) -> networkx.DiGraph:
return self._graph
[docs] def add_node(self, node: Definition) -> None:
"""
:param node: The definition to add to the definition-use graph.
"""
self._graph.add_node(node)
[docs] def add_edge(self, source: Definition, destination: Definition, **labels) -> None:
"""
The edge to add to the definition-use graph. Will create nodes that are not yet present.
:param source: The "source" definition, used by the "destination".
:param destination: The "destination" definition, using the variable defined by "source".
:param labels: Optional keyword arguments to represent edge labels.
"""
self._graph.add_edge(source, destination, **labels)
[docs] def nodes(self) -> networkx.classes.reportviews.NodeView:
return self._graph.nodes()
[docs] def predecessors(self, node: Definition) -> networkx.classes.reportviews.NodeView:
"""
:param node: The definition to get the predecessors of.
"""
return self._graph.predecessors(node)
[docs] def transitive_closure(self, definition: Definition) -> networkx.DiGraph:
"""
Compute the "transitive closure" of a given definition.
Obtained by transitively aggregating the ancestors of this definition in the graph.
Note: Each definition is memoized to avoid any kind of recomputation across the lifetime of this object.
:param definition: The Definition to get transitive closure for.
:return: A graph of the transitive closure of the given definition.
"""
def _transitive_closure(
def_: Definition,
graph: networkx.DiGraph,
result: networkx.DiGraph,
visited: Optional[Set[Definition]] = None,
):
"""
Returns a joint graph that comprises the transitive closure of all defs that `def_` depends on and the
current graph `result`. `result` is updated.
"""
if def_ in self._transitive_closures:
closure = self._transitive_closures[def_]
# merge closure into result
result.add_edges_from(closure.edges())
return result
if def_ not in graph:
return result
predecessors = list(graph.predecessors(def_))
result.add_node(def_)
for pred in predecessors:
edge_data = graph.get_edge_data(pred, def_)
if edge_data is None:
result.add_edge(pred, def_)
else:
result.add_edge(pred, def_, **edge_data)
visited = visited or set()
visited.add(def_)
predecessors_to_visit = set(predecessors) - set(visited)
closure = reduce(
lambda acc, def0: _transitive_closure(def0, graph, acc, visited), predecessors_to_visit, result
)
self._transitive_closures[def_] = closure
return closure
return _transitive_closure(definition, self._graph, networkx.DiGraph())
[docs] def contains_atom(self, atom: Atom) -> bool:
return any(map(lambda definition: definition.atom == atom, self.nodes()))
[docs] def add_dependencies_for_concrete_pointers_of(
self, values: Iterable[Union[claripy.ast.Base, int]], definition: Definition, cfg: CFGModel, loader: Loader
):
"""
When a given definition holds concrete pointers, make sure the <MemoryLocation>s they point to are present in
the dependency graph; Adds them if necessary.
:param values:
:param definition: The definition which has data that can contain concrete pointers.
:param cfg: The CFG, containing information about memory data.
:param loader:
"""
assert definition in self.nodes(), "The given Definition must be present in the given graph."
known_predecessor_addresses: List[Union[int, claripy.ast.Base]] = list(
# Needs https://github.com/python/mypy/issues/6847
map(
lambda definition: definition.atom.addr, # type: ignore
filter(lambda p: isinstance(p.atom, MemoryLocation), self.predecessors(definition)),
)
)
# concretize addresses where possible
concrete_known_pred_addresses = []
for address in known_predecessor_addresses:
if isinstance(address, claripy.ast.Base):
if address.concrete:
concrete_known_pred_addresses.append(address._model_concrete.value)
else:
concrete_known_pred_addresses.append(address)
unknown_concrete_addresses: Set[int] = set()
for v in values:
if isinstance(v, claripy.ast.Base) and v.concrete:
v = v._model_concrete.value
if isinstance(v, int):
if v not in concrete_known_pred_addresses:
unknown_concrete_addresses.add(v)
for address in unknown_concrete_addresses:
data_at_address = cfg.memory_data.get(address, None) if cfg is not None else None
if data_at_address is None or data_at_address.sort not in ["string", "unknown"]:
continue
section = loader.main_object.find_section_containing(address)
read_only = False if section is None else not section.is_writable
code_location = CodeLocation(0, 0, info={"readonly": True}) if read_only else ExternalCodeLocation()
def _string_and_length_from(data_at_address):
if data_at_address.content is None:
return UNDEFINED, data_at_address.size
else:
return data_at_address.content.decode("utf-8"), data_at_address.size + 1
_, string_length = _string_and_length_from(data_at_address)
memory_location_definition = Definition(
MemoryLocation(address, string_length),
code_location,
)
self.graph.add_edge(memory_location_definition, definition)