from typing import Tuple, Optional, Dict, List, Set
from collections import defaultdict
import logging
import networkx
[docs]def shallow_reverse(g) -> networkx.DiGraph:
"""
Make a shallow copy of a directional graph and reverse the edges. This is a workaround to solve the issue that one
cannot easily make a shallow reversed copy of a graph in NetworkX 2, since networkx.reverse(copy=False) now returns
a GraphView, and GraphViews are always read-only.
:param networkx.DiGraph g: The graph to reverse.
:return: A new networkx.DiGraph that has all nodes and all edges of the original graph, with
edges reversed.
"""
new_g = networkx.DiGraph()
new_g.add_nodes_from(g.nodes())
for src, dst, data in g.edges(data=True):
new_g.add_edge(dst, src, **data)
return new_g
[docs]def inverted_idoms(graph: networkx.DiGraph) -> Tuple[networkx.DiGraph, Optional[Dict]]:
"""
Invert the given graph and generate the immediate dominator tree on the inverted graph. This is useful for
computing post-dominators.
:param graph: The graph to invert and generate immediate dominator tree for.
:return: A tuple of the inverted graph and the immediate dominator tree.
"""
end_nodes = {n for n in graph.nodes() if graph.out_degree(n) == 0}
inverted_graph: networkx.DiGraph = shallow_reverse(graph)
if end_nodes:
if len(end_nodes) > 1:
# make sure there is only one end node
dummy_node = "DUMMY_NODE"
for end_node in end_nodes:
inverted_graph.add_edge(dummy_node, end_node)
endnode = dummy_node
else:
endnode = next(iter(end_nodes)) # pick the end node
idoms = networkx.immediate_dominators(inverted_graph, endnode)
else:
idoms = None
return inverted_graph, idoms
[docs]def to_acyclic_graph(
graph: networkx.DiGraph, ordered_nodes: Optional[List] = None, loop_heads: Optional[List] = None
) -> networkx.DiGraph:
"""
Convert a given DiGraph into an acyclic graph.
:param graph: The graph to convert.
:param ordered_nodes: A list of nodes sorted in a topological order.
:param loop_heads: A list of known loop head nodes.
:return: The converted acyclic graph.
"""
if ordered_nodes is None:
# take the quasi-topological order of the graph
ordered_nodes = GraphUtils.quasi_topological_sort_nodes(graph, loop_heads=loop_heads)
acyclic_graph = networkx.DiGraph()
# add each node and its edge into the graph
visited = set()
for node in ordered_nodes:
visited.add(node)
acyclic_graph.add_node(node)
for successor in graph.successors(node):
if successor not in visited:
acyclic_graph.add_edge(node, successor)
return acyclic_graph
[docs]def dfs_back_edges(graph, start_node):
"""
Do a DFS traversal of the graph, and return with the back edges.
Note: This is just a naive recursive implementation, feel free to replace it.
I couldn't find anything in networkx to do this functionality. Although the
name suggest it, but `dfs_labeled_edges` is doing something different.
:param graph: The graph to traverse.
:param start_node: The node where to start the traversal
:returns: An iterator of 'backward' edges
"""
visited = set()
finished = set()
def _dfs_back_edges_core(node):
visited.add(node)
for child in iter(graph[node]):
if child not in finished:
if child in visited:
yield node, child
else:
yield from _dfs_back_edges_core(child)
finished.add(node)
yield from _dfs_back_edges_core(start_node)
[docs]def subgraph_between_nodes(graph, source, frontier, include_frontier=False):
"""
For a directed graph, return a subgraph that includes all nodes going from a source node to a target node.
:param networkx.DiGraph graph: The directed graph.
:param source: The source node.
:param list frontier: A collection of target nodes.
:param bool include_frontier: Should nodes in frontier be included in the subgraph.
:return: A subgraph.
:rtype: networkx.DiGraph
"""
graph = networkx.DiGraph(graph) # make a copy
for pred in list(graph.predecessors(source)):
# make sure we cannot go from any other node to the source node
graph.remove_edge(pred, source)
g0 = networkx.DiGraph()
if source not in graph or any(node not in graph for node in frontier):
raise KeyError("Source node or frontier nodes are not in the source graph.")
# BFS on graph and add new nodes to g0
queue = [source]
traversed = set()
frontier = set(frontier)
while queue:
node = queue.pop(0)
traversed.add(node)
for _, succ, data in graph.out_edges(node, data=True):
if g0.has_edge(node, succ):
continue
g0.add_edge(node, succ, **data)
if succ in traversed or succ in frontier:
continue
for frontier_node in frontier:
if networkx.has_path(graph, succ, frontier_node):
queue.append(succ)
break
# recursively remove all nodes that have less than two neighbors
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
while to_remove:
g0.remove_nodes_from(to_remove)
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
if not include_frontier:
# remove the frontier nodes
g0.remove_nodes_from(frontier)
return g0
[docs]def dominates(idom, dominator_node, node):
n = node
while n:
if n == dominator_node:
return True
if n in idom and n != idom[n]:
n = idom[n]
else:
n = None
return False
#
# Dominance frontier
#
[docs]def compute_dominance_frontier(graph, domtree):
"""
Compute a dominance frontier based on the given post-dominator tree.
This implementation is based on figure 2 of paper An Efficient Method of Computing Static Single Assignment
Form by Ron Cytron, etc.
:param graph: The graph where we want to compute the dominance frontier.
:param domtree: The dominator tree
:returns: A dict of dominance frontier
"""
df = {}
# Perform a post-order search on the dominator tree
for x in networkx.dfs_postorder_nodes(domtree):
if x not in graph:
# Skip nodes that are not in the graph
continue
df[x] = set()
# local set
for y in graph.successors(x):
if x not in domtree.predecessors(y):
df[x].add(y)
# up set
if x is None:
continue
for z in domtree.successors(x):
if z is x:
continue
if z not in df:
continue
for y in df[z]:
if x not in list(domtree.predecessors(y)):
df[x].add(y)
return df
#
# Dominators and post-dominators
#
[docs]class TemporaryNode:
"""
A temporary node.
Used as the start node and end node in post-dominator tree generation. Also used in some test cases.
"""
__slots__ = ["_label"]
[docs] def __init__(self, label):
self._label = label
def __repr__(self):
return "TN[%s]" % self._label
def __eq__(self, other):
if isinstance(other, TemporaryNode) and other._label == self._label:
return True
return False
def __hash__(self):
return hash(("TemporaryNode", self._label))
[docs]class ContainerNode:
"""
A container node.
Only used in dominator tree generation. We did this so we can set the index property without modifying the
original object.
"""
__slots__ = ["_obj", "index"]
[docs] def __init__(self, obj):
self._obj = obj
self.index = None
@property
def obj(self):
return self._obj
def __eq__(self, other):
if isinstance(other, ContainerNode):
return self._obj is other._obj
return False
def __hash__(self):
return hash(("CN", self._obj))
def __repr__(self):
return "CN[%s]" % repr(self._obj)
[docs]class Dominators:
"""
Describes dominators in a graph.
"""
dom: networkx.DiGraph
[docs] def __init__(self, graph, entry_node, successors_func=None, reverse=False):
self._l = logging.getLogger("utils.graph.dominators")
self._graph_successors_func = successors_func
self._reverse = reverse # Set it to True to generate a post-dominator tree.
# Temporary variables
self._ancestor = None
self._semi = None
self._label = None
# Output
self.dom = None # type: ignore # this is guaranteed to be not null after the __init__ returns
self.prepared_graph = None
self._construct(graph, entry_node)
def _graph_successors(self, graph, node):
"""
Return the successors of a node in the graph.
This method can be overriden in case there are special requirements with the graph and the successors. For
example, when we are dealing with a control flow graph, we may not want to get the FakeRet successors.
:param graph: The graph.
:param node: The node of which we want to get the successors.
:return: An iterator of successors.
:rtype: iter
"""
if self._graph_successors_func is not None:
return self._graph_successors_func(graph, node)
return graph.successors(node)
def _construct(self, graph, entry_node):
"""
Find post-dominators for each node in the graph.
This implementation is based on paper A Fast Algorithm for Finding Dominators in a Flow Graph by Thomas
Lengauer and Robert E. Tarjan from Stanford University, ACM Transactions on Programming Languages and Systems,
Vol. 1, No. 1, July 1979
"""
# Step 1
_prepared_graph, vertices, parent = self._prepare_graph(graph, entry_node)
# vertices is a list of ContainerNode instances
# parent is a dict storing the mapping from ContainerNode to ContainerNode
# Each node in prepared_graph is a ContainerNode instance
bucket = defaultdict(set)
dom = [None] * (len(vertices))
self._ancestor = [None] * (len(vertices) + 1)
for i in range(len(vertices) - 1, 0, -1):
w = vertices[i]
# Step 2
if w not in parent:
# It's one of the start nodes
continue
predecessors = _prepared_graph.predecessors(w)
for v in predecessors:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[w.index].index:
self._semi[w.index] = self._semi[u.index]
bucket[vertices[self._semi[w.index].index].index].add(w)
self._pd_link(parent[w], w)
# Step 3
for v in bucket[parent[w].index]:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[v.index].index:
dom[v.index] = u
else:
dom[v.index] = parent[w]
bucket[parent[w].index].clear()
for i in range(1, len(vertices)):
w = vertices[i]
if w not in parent:
continue
if dom[w.index].index != vertices[self._semi[w.index].index].index:
dom[w.index] = dom[dom[w.index].index]
self.dom = networkx.DiGraph() # The post-dom tree described in a directional graph
for i in range(1, len(vertices)):
if dom[i] is not None and vertices[i] is not None:
self.dom.add_edge(dom[i].obj, vertices[i].obj)
# Output
self.prepared_graph = _prepared_graph
def _prepare_graph(self, graph, entry):
# We want to reverse the graph, and label each node according to its order in a DFS
new_graph = networkx.DiGraph()
n = entry
queue = [n]
start_node = TemporaryNode("start_node")
# Put the start_node into a Container as well
start_node = ContainerNode(start_node)
# Create the end_node, too
end_node = ContainerNode(TemporaryNode("end_node"))
container_nodes = {}
traversed_nodes = set()
while queue:
node = queue.pop()
successors = list(self._graph_successors(graph, node))
# Put it into a container
if node in container_nodes:
container_node = container_nodes[node]
else:
container_node = ContainerNode(node)
container_nodes[node] = container_node
traversed_nodes.add(container_node)
if len(successors) == 0:
# Note that this condition may never be satisfied if there is no real "end node" in the graph: the graph
# may end with a loop.
if self._reverse:
# Add an edge between the start node and this node
new_graph.add_edge(start_node, container_node)
else:
# Add an edge between our this node and end node
new_graph.add_edge(container_node, end_node)
for s in successors:
if s in container_nodes:
container_s = container_nodes[s]
else:
container_s = ContainerNode(s)
container_nodes[s] = container_s
if self._reverse:
new_graph.add_edge(container_s, container_node) # Reversed
else:
new_graph.add_edge(container_node, container_s) # Reversed
if container_s not in traversed_nodes:
queue.append(s)
if self._reverse:
# Add the end node
new_graph.add_edge(container_nodes[n], end_node)
else:
# Add the start node
new_graph.add_edge(start_node, container_nodes[n])
all_nodes_count = new_graph.number_of_nodes()
self._l.debug("There should be %d nodes in all", all_nodes_count)
counter = 0
vertices = [ContainerNode("placeholder")]
scanned_nodes = set()
parent = {}
while True:
# DFS from the current start node
stack = [start_node]
while len(stack) > 0:
node = stack.pop()
counter += 1
# Mark it as scanned
scanned_nodes.add(node)
# Put the container node into vertices list
vertices.append(node)
# Put each successors into the stack
successors = new_graph.successors(node)
# Set the index property of it
node.index = counter
for s in successors:
if s not in scanned_nodes:
stack.append(s)
parent[s] = node
scanned_nodes.add(s)
if counter >= all_nodes_count:
break
self._l.debug(
"%d nodes are left out during the DFS. They must formed a cycle themselves.", all_nodes_count - counter
)
# Find those nodes
leftovers = [s for s in traversed_nodes if s not in scanned_nodes]
new_graph.add_edge(start_node, leftovers[0])
# We have to start over...
counter = 0
parent = {}
scanned_nodes = set()
vertices = [ContainerNode("placeholder")]
self._semi = vertices[::]
self._label = vertices[::]
return new_graph, vertices, parent
def _pd_link(self, v, w):
self._ancestor[w.index] = v
def _pd_eval(self, v):
if self._ancestor[v.index] is None:
return v
else:
self._pd_compress(v)
return self._label[v.index]
def _pd_compress(self, v):
if self._ancestor[self._ancestor[v.index].index] is not None:
self._pd_compress(self._ancestor[v.index])
if (
self._semi[self._label[self._ancestor[v.index].index].index].index
< self._semi[self._label[v.index].index].index
):
self._label[v.index] = self._label[self._ancestor[v.index].index]
self._ancestor[v.index] = self._ancestor[self._ancestor[v.index].index]
[docs]class PostDominators(Dominators):
"""
Describe post-dominators in a graph.
"""
[docs] def __init__(self, graph, entry_node, successors_func=None):
super().__init__(graph, entry_node, successors_func=successors_func, reverse=True)
@property
def post_dom(self) -> networkx.DiGraph:
return self.dom
[docs]class SCCPlaceholder:
"""
Describes a placeholder for strongly-connected-components in a graph.
"""
__slots__ = ("scc_id",)
[docs] def __init__(self, scc_id):
self.scc_id = scc_id
def __eq__(self, other):
return isinstance(other, SCCPlaceholder) and other.scc_id == self.scc_id
def __hash__(self):
return hash("scc_placeholder_%d" % self.scc_id)
[docs]class GraphUtils:
"""
A helper class with some static methods and algorithms implemented, that in fact, might take more than just normal
CFGs.
"""
[docs] @staticmethod
def find_merge_points(function_addr, function_endpoints, graph): # pylint:disable=unused-argument
"""
Given a local transition graph of a function, find all merge points inside, and then perform a
quasi-topological sort of those merge points.
A merge point might be one of the following cases:
- two or more paths come together, and ends at the same address.
- end of the current function
:param int function_addr: Address of the function.
:param list function_endpoints: Endpoints of the function. They typically come from Function.endpoints.
:param networkx.DiGraph graph: A local transition graph of a function. Normally it comes from Function.graph.
:return: A list of ordered addresses of merge points.
:rtype: list
"""
merge_points = set()
for node in graph.nodes():
if graph.in_degree(node) > 1:
merge_points.add(node)
ordered_merge_points = GraphUtils.quasi_topological_sort_nodes(graph, merge_points)
addrs = [n.addr for n in ordered_merge_points]
return addrs
[docs] @staticmethod
def find_widening_points(function_addr, function_endpoints, graph): # pylint: disable=unused-argument
"""
Given a local transition graph of a function, find all widening points inside.
Correctly choosing widening points is very important in order to not lose too much information during static
analysis. We mainly consider merge points that has at least one loop back edges coming in as widening points.
:param int function_addr: Address of the function.
:param list function_endpoints: Endpoints of the function, typically coming from Function.endpoints.
:param networkx.DiGraph graph: A local transition graph of a function, normally Function.graph.
:return: A list of addresses of widening points.
:rtype: list
"""
sccs = networkx.strongly_connected_components(graph)
widening_addrs = set()
for scc in sccs:
if len(scc) == 1:
node = next(iter(scc))
if graph.has_edge(node, node):
# self loop
widening_addrs.add(node.addr)
else:
for n in scc:
predecessors = graph.predecessors(n)
if any(p not in scc for p in predecessors):
widening_addrs.add(n.addr)
break
return list(widening_addrs)
[docs] @staticmethod
def reverse_post_order_sort_nodes(graph, nodes=None):
"""
Sort a given set of nodes in reverse post ordering.
:param networkx.DiGraph graph: A local transition graph of a function.
:param iterable nodes: A collection of nodes to sort.
:return: A list of sorted nodes.
:rtype: list
"""
post_order = networkx.dfs_postorder_nodes(graph)
if nodes is None:
return reversed(list(post_order))
addrs_to_index = {n.addr: i for (i, n) in enumerate(post_order)}
return sorted(nodes, key=lambda n: addrs_to_index[n.addr], reverse=True)
[docs] @staticmethod
def quasi_topological_sort_nodes(
graph: networkx.DiGraph, nodes: Optional[List] = None, loop_heads: Optional[List] = None
) -> List:
"""
Sort a given set of nodes from a graph based on the following rules:
# - if A -> B and not B -> A, then we have A < B
# - if A -> B and B -> A, then the ordering is undefined
Following the above rules gives us a quasi-topological sorting of nodes in the graph. It also works for cyclic
graphs.
:param graph: A local transition graph of the function.
:param nodes: A list of nodes to sort. None if you want to sort all nodes inside the graph.
:param loop_heads: A list of nodes that should be treated loop heads.
:return: A list of ordered nodes.
"""
# fast path for single node graphs
if graph.number_of_nodes() == 1:
if nodes is None:
return list(graph.nodes)
return [n for n in graph.nodes() if n in nodes]
# make a copy to the graph since we are gonna modify it
graph_copy = networkx.DiGraph()
# find all strongly connected components in the graph
sccs = [scc for scc in networkx.strongly_connected_components(graph) if len(scc) > 1]
# collapse all strongly connected components
for src, dst in graph.edges():
scc_index = GraphUtils._components_index_node(sccs, src)
if scc_index is not None:
src = SCCPlaceholder(scc_index)
scc_index = GraphUtils._components_index_node(sccs, dst)
if scc_index is not None:
dst = SCCPlaceholder(scc_index)
if isinstance(src, SCCPlaceholder) and isinstance(dst, SCCPlaceholder) and src == dst:
if src not in graph_copy:
graph_copy.add_node(src)
continue
if src == dst:
if src not in graph_copy:
graph_copy.add_node(src)
continue
graph_copy.add_edge(src, dst)
# add loners
out_degree_zero_nodes = [node for (node, degree) in graph.out_degree() if degree == 0]
for node in out_degree_zero_nodes:
if graph.in_degree(node) == 0:
graph_copy.add_node(node)
# topological sort on acyclic graph `graph_copy`
tmp_nodes = networkx.topological_sort(graph_copy)
ordered_nodes = []
for n in tmp_nodes:
if isinstance(n, SCCPlaceholder):
GraphUtils._append_scc(graph, ordered_nodes, sccs[n.scc_id], loop_heads=loop_heads)
else:
ordered_nodes.append(n)
if nodes is None:
return ordered_nodes
nodes = set(nodes)
ordered_nodes = [n for n in ordered_nodes if n in nodes]
return ordered_nodes
@staticmethod
def _components_index_node(components, node):
for i, comp in enumerate(components):
if node in comp:
return i
return None
@staticmethod
def _append_scc(graph: networkx.DiGraph, ordered_nodes: List, scc: Set, loop_heads: Optional[List] = None) -> None:
"""
Append all nodes from a strongly connected component to a list of ordered nodes and ensure the topological
order.
:param graph: The graph where all nodes belong to.
:param ordered_nodes: Ordered nodes.
:param scc: A set of nodes that forms a strongly connected component in the graph.
"""
loop_head = None
if loop_heads is not None:
# find the first node that appears in loop_heads
for n in scc:
if n in loop_heads:
loop_head = n
break
if loop_head is None:
# find the first node in the strongly connected component that is the successor to any node in
# ordered_nodes
for parent_node in reversed(ordered_nodes):
for n in scc:
if n in graph[parent_node]:
loop_head = n
break
if loop_head is not None:
break
if loop_head is None:
# randomly pick one
loop_head = next(iter(scc))
subgraph: networkx.DiGraph = graph.subgraph(scc).copy()
for src, _ in list(subgraph.in_edges(loop_head)):
subgraph.remove_edge(src, loop_head)
ordered_nodes.extend(GraphUtils.quasi_topological_sort_nodes(subgraph))