from typing import Tuple, Optional, Dict, List
from collections import defaultdict
import logging
import networkx
[docs]def shallow_reverse(g) -> networkx.DiGraph:
"""
Make a shallow copy of a directional graph and reverse the edges. This is a workaround to solve the issue that one
cannot easily make a shallow reversed copy of a graph in NetworkX 2, since networkx.reverse(copy=False) now returns
a GraphView, and GraphViews are always read-only.
:param networkx.DiGraph g: The graph to reverse.
:return: A new networkx.DiGraph that has all nodes and all edges of the original graph, with
edges reversed.
"""
new_g = networkx.DiGraph()
new_g.add_nodes_from(g.nodes())
for src, dst, data in g.edges(data=True):
new_g.add_edge(dst, src, **data)
return new_g
[docs]def inverted_idoms(graph: networkx.DiGraph) -> Tuple[networkx.DiGraph, Optional[Dict]]:
"""
Invert the given graph and generate the immediate dominator tree on the inverted graph. This is useful for
computing post-dominators.
:param graph: The graph to invert and generate immediate dominator tree for.
:return: A tuple of the inverted graph and the immediate dominator tree.
"""
end_nodes = {n for n in graph.nodes() if graph.out_degree(n) == 0}
inverted_graph: networkx.DiGraph = shallow_reverse(graph)
if end_nodes:
if len(end_nodes) > 1:
# make sure there is only one end node
dummy_node = "DUMMY_NODE"
for end_node in end_nodes:
inverted_graph.add_edge(dummy_node, end_node)
endnode = dummy_node
else:
endnode = next(iter(end_nodes)) # pick the end node
idoms = networkx.immediate_dominators(inverted_graph, endnode)
else:
idoms = None
return inverted_graph, idoms
[docs]def to_acyclic_graph(
graph: networkx.DiGraph, ordered_nodes: Optional[List] = None, loop_heads: Optional[List] = None
) -> networkx.DiGraph:
"""
Convert a given DiGraph into an acyclic graph.
:param graph: The graph to convert.
:param ordered_nodes: A list of nodes sorted in a topological order.
:param loop_heads: A list of known loop head nodes.
:return: The converted acyclic graph.
"""
if ordered_nodes is None:
# take the quasi-topological order of the graph
from angr.analyses.cfg.cfg_utils import CFGUtils # pylint:disable=import-outside-toplevel
ordered_nodes = CFGUtils.quasi_topological_sort_nodes(graph, loop_heads=loop_heads)
acyclic_graph = networkx.DiGraph()
# add each node and its edge into the graph
visited = set()
for node in ordered_nodes:
visited.add(node)
acyclic_graph.add_node(node)
for successor in graph.successors(node):
if successor not in visited:
acyclic_graph.add_edge(node, successor)
return acyclic_graph
[docs]def dfs_back_edges(graph, start_node):
"""
Do a DFS traversal of the graph, and return with the back edges.
Note: This is just a naive recursive implementation, feel free to replace it.
I couldn't find anything in networkx to do this functionality. Although the
name suggest it, but `dfs_labeled_edges` is doing something different.
:param graph: The graph to traverse.
:param start_node: The node where to start the traversal
:returns: An iterator of 'backward' edges
"""
visited = set()
finished = set()
def _dfs_back_edges_core(node):
visited.add(node)
for child in iter(graph[node]):
if child not in finished:
if child in visited:
yield node, child
else:
yield from _dfs_back_edges_core(child)
finished.add(node)
yield from _dfs_back_edges_core(start_node)
[docs]def subgraph_between_nodes(graph, source, frontier, include_frontier=False):
"""
For a directed graph, return a subgraph that includes all nodes going from a source node to a target node.
:param networkx.DiGraph graph: The directed graph.
:param source: The source node.
:param list frontier: A collection of target nodes.
:param bool include_frontier: Should nodes in frontier be included in the subgraph.
:return: A subgraph.
:rtype: networkx.DiGraph
"""
graph = networkx.DiGraph(graph) # make a copy
for pred in list(graph.predecessors(source)):
# make sure we cannot go from any other node to the source node
graph.remove_edge(pred, source)
g0 = networkx.DiGraph()
if source not in graph or any(node not in graph for node in frontier):
raise KeyError("Source node or frontier nodes are not in the source graph.")
# BFS on graph and add new nodes to g0
queue = [source]
traversed = set()
frontier = set(frontier)
while queue:
node = queue.pop(0)
traversed.add(node)
for _, succ, data in graph.out_edges(node, data=True):
g0.add_edge(node, succ, **data)
if succ in traversed or succ in frontier:
continue
for frontier_node in frontier:
if networkx.has_path(graph, succ, frontier_node):
queue.append(succ)
break
# recursively remove all nodes that have less than two neighbors
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
while to_remove:
g0.remove_nodes_from(to_remove)
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
if not include_frontier:
# remove the frontier nodes
g0.remove_nodes_from(frontier)
return g0
[docs]def dominates(idom, dominator_node, node):
n = node
while n:
if n == dominator_node:
return True
if n in idom and n != idom[n]:
n = idom[n]
else:
n = None
return False
#
# Dominance frontier
#
[docs]def compute_dominance_frontier(graph, domtree):
"""
Compute a dominance frontier based on the given post-dominator tree.
This implementation is based on figure 2 of paper An Efficient Method of Computing Static Single Assignment
Form by Ron Cytron, etc.
:param graph: The graph where we want to compute the dominance frontier.
:param domtree: The dominator tree
:returns: A dict of dominance frontier
"""
df = {}
# Perform a post-order search on the dominator tree
for x in networkx.dfs_postorder_nodes(domtree):
if x not in graph:
# Skip nodes that are not in the graph
continue
df[x] = set()
# local set
for y in graph.successors(x):
if x not in domtree.predecessors(y):
df[x].add(y)
# up set
if x is None:
continue
for z in domtree.successors(x):
if z is x:
continue
if z not in df:
continue
for y in df[z]:
if x not in list(domtree.predecessors(y)):
df[x].add(y)
return df
#
# Dominators and post-dominators
#
[docs]class TemporaryNode:
"""
A temporary node.
Used as the start node and end node in post-dominator tree generation. Also used in some test cases.
"""
__slots__ = ["_label"]
def __init__(self, label):
self._label = label
def __repr__(self):
return "TN[%s]" % self._label
def __eq__(self, other):
if isinstance(other, TemporaryNode) and other._label == self._label:
return True
return False
def __hash__(self):
return hash(("TemporaryNode", self._label))
[docs]class ContainerNode:
"""
A container node.
Only used in dominator tree generation. We did this so we can set the index property without modifying the
original object.
"""
__slots__ = ["_obj", "index"]
def __init__(self, obj):
self._obj = obj
self.index = None
@property
def obj(self):
return self._obj
def __eq__(self, other):
if isinstance(other, ContainerNode):
return self._obj is other._obj
return False
def __hash__(self):
return hash(("CN", self._obj))
def __repr__(self):
return "CN[%s]" % repr(self._obj)
[docs]class Dominators:
dom: networkx.DiGraph
def __init__(self, graph, entry_node, successors_func=None, reverse=False):
self._l = logging.getLogger("utils.graph.dominators")
self._graph_successors_func = successors_func
self._reverse = reverse # Set it to True to generate a post-dominator tree.
# Temporary variables
self._ancestor = None
self._semi = None
self._label = None
# Output
self.dom = None # type: ignore # this is guaranteed to be not null after the __init__ returns
self.prepared_graph = None
self._construct(graph, entry_node)
def _graph_successors(self, graph, node):
"""
Return the successors of a node in the graph.
This method can be overriden in case there are special requirements with the graph and the successors. For
example, when we are dealing with a control flow graph, we may not want to get the FakeRet successors.
:param graph: The graph.
:param node: The node of which we want to get the successors.
:return: An iterator of successors.
:rtype: iter
"""
if self._graph_successors_func is not None:
return self._graph_successors_func(graph, node)
return graph.successors(node)
def _construct(self, graph, entry_node):
"""
Find post-dominators for each node in the graph.
This implementation is based on paper A Fast Algorithm for Finding Dominators in a Flow Graph by Thomas
Lengauer and Robert E. Tarjan from Stanford University, ACM Transactions on Programming Languages and Systems,
Vol. 1, No. 1, July 1979
"""
# Step 1
_prepared_graph, vertices, parent = self._prepare_graph(graph, entry_node)
# vertices is a list of ContainerNode instances
# parent is a dict storing the mapping from ContainerNode to ContainerNode
# Each node in prepared_graph is a ContainerNode instance
bucket = defaultdict(set)
dom = [None] * (len(vertices))
self._ancestor = [None] * (len(vertices) + 1)
for i in range(len(vertices) - 1, 0, -1):
w = vertices[i]
# Step 2
if w not in parent:
# It's one of the start nodes
continue
predecessors = _prepared_graph.predecessors(w)
for v in predecessors:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[w.index].index:
self._semi[w.index] = self._semi[u.index]
bucket[vertices[self._semi[w.index].index].index].add(w)
self._pd_link(parent[w], w)
# Step 3
for v in bucket[parent[w].index]:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[v.index].index:
dom[v.index] = u
else:
dom[v.index] = parent[w]
bucket[parent[w].index].clear()
for i in range(1, len(vertices)):
w = vertices[i]
if w not in parent:
continue
if dom[w.index].index != vertices[self._semi[w.index].index].index:
dom[w.index] = dom[dom[w.index].index]
self.dom = networkx.DiGraph() # The post-dom tree described in a directional graph
for i in range(1, len(vertices)):
if dom[i] is not None and vertices[i] is not None:
self.dom.add_edge(dom[i].obj, vertices[i].obj)
# Output
self.prepared_graph = _prepared_graph
def _prepare_graph(self, graph, entry):
# We want to reverse the graph, and label each node according to its order in a DFS
new_graph = networkx.DiGraph()
n = entry
queue = [n]
start_node = TemporaryNode("start_node")
# Put the start_node into a Container as well
start_node = ContainerNode(start_node)
# Create the end_node, too
end_node = ContainerNode(TemporaryNode("end_node"))
container_nodes = {}
traversed_nodes = set()
while queue:
node = queue.pop()
successors = list(self._graph_successors(graph, node))
# Put it into a container
if node in container_nodes:
container_node = container_nodes[node]
else:
container_node = ContainerNode(node)
container_nodes[node] = container_node
traversed_nodes.add(container_node)
if len(successors) == 0:
# Note that this condition may never be satisfied if there is no real "end node" in the graph: the graph
# may end with a loop.
if self._reverse:
# Add an edge between the start node and this node
new_graph.add_edge(start_node, container_node)
else:
# Add an edge between our this node and end node
new_graph.add_edge(container_node, end_node)
for s in successors:
if s in container_nodes:
container_s = container_nodes[s]
else:
container_s = ContainerNode(s)
container_nodes[s] = container_s
if self._reverse:
new_graph.add_edge(container_s, container_node) # Reversed
else:
new_graph.add_edge(container_node, container_s) # Reversed
if container_s not in traversed_nodes:
queue.append(s)
if self._reverse:
# Add the end node
new_graph.add_edge(container_nodes[n], end_node)
else:
# Add the start node
new_graph.add_edge(start_node, container_nodes[n])
all_nodes_count = new_graph.number_of_nodes()
self._l.debug("There should be %d nodes in all", all_nodes_count)
counter = 0
vertices = [ContainerNode("placeholder")]
scanned_nodes = set()
parent = {}
while True:
# DFS from the current start node
stack = [start_node]
while len(stack) > 0:
node = stack.pop()
counter += 1
# Mark it as scanned
scanned_nodes.add(node)
# Put the container node into vertices list
vertices.append(node)
# Put each successors into the stack
successors = new_graph.successors(node)
# Set the index property of it
node.index = counter
for s in successors:
if s not in scanned_nodes:
stack.append(s)
parent[s] = node
scanned_nodes.add(s)
if counter >= all_nodes_count:
break
self._l.debug(
"%d nodes are left out during the DFS. They must formed a cycle themselves.", all_nodes_count - counter
)
# Find those nodes
leftovers = [s for s in traversed_nodes if s not in scanned_nodes]
new_graph.add_edge(start_node, leftovers[0])
# We have to start over...
counter = 0
parent = {}
scanned_nodes = set()
vertices = [ContainerNode("placeholder")]
self._semi = vertices[::]
self._label = vertices[::]
return new_graph, vertices, parent
def _pd_link(self, v, w):
self._ancestor[w.index] = v
def _pd_eval(self, v):
if self._ancestor[v.index] is None:
return v
else:
self._pd_compress(v)
return self._label[v.index]
def _pd_compress(self, v):
if self._ancestor[self._ancestor[v.index].index] is not None:
self._pd_compress(self._ancestor[v.index])
if (
self._semi[self._label[self._ancestor[v.index].index].index].index
< self._semi[self._label[v.index].index].index
):
self._label[v.index] = self._label[self._ancestor[v.index].index]
self._ancestor[v.index] = self._ancestor[self._ancestor[v.index].index]
[docs]class PostDominators(Dominators):
def __init__(self, graph, entry_node, successors_func=None):
super().__init__(graph, entry_node, successors_func=successors_func, reverse=True)
@property
def post_dom(self) -> networkx.DiGraph:
return self.dom