from typing import Tuple, Optional, Dict, List, Set
from collections import defaultdict
import logging
import networkx
import networkx.algorithms
[docs]def shallow_reverse(g) -> networkx.DiGraph:
"""
Make a shallow copy of a directional graph and reverse the edges. This is a workaround to solve the issue that one
cannot easily make a shallow reversed copy of a graph in NetworkX 2, since networkx.reverse(copy=False) now returns
a GraphView, and GraphViews are always read-only.
:param networkx.DiGraph g: The graph to reverse.
:return: A new networkx.DiGraph that has all nodes and all edges of the original graph, with
edges reversed.
"""
new_g = networkx.DiGraph()
new_g.add_nodes_from(g.nodes())
for src, dst, data in g.edges(data=True):
new_g.add_edge(dst, src, **data)
return new_g
[docs]def inverted_idoms(graph: networkx.DiGraph) -> Tuple[networkx.DiGraph, Optional[Dict]]:
"""
Invert the given graph and generate the immediate dominator tree on the inverted graph. This is useful for
computing post-dominators.
:param graph: The graph to invert and generate immediate dominator tree for.
:return: A tuple of the inverted graph and the immediate dominator tree.
"""
end_nodes = {n for n in graph.nodes() if graph.out_degree(n) == 0}
inverted_graph: networkx.DiGraph = shallow_reverse(graph)
if end_nodes:
if len(end_nodes) > 1:
# make sure there is only one end node
dummy_node = "DUMMY_NODE"
for end_node in end_nodes:
inverted_graph.add_edge(dummy_node, end_node)
endnode = dummy_node
else:
endnode = next(iter(end_nodes)) # pick the end node
idoms = networkx.immediate_dominators(inverted_graph, endnode)
else:
idoms = None
return inverted_graph, idoms
[docs]def to_acyclic_graph(
graph: networkx.DiGraph, ordered_nodes: Optional[List] = None, loop_heads: Optional[List] = None
) -> networkx.DiGraph:
"""
Convert a given DiGraph into an acyclic graph.
:param graph: The graph to convert.
:param ordered_nodes: A list of nodes sorted in a topological order.
:param loop_heads: A list of known loop head nodes.
:return: The converted acyclic graph.
"""
if ordered_nodes is None:
# take the quasi-topological order of the graph
ordered_nodes = GraphUtils.quasi_topological_sort_nodes(graph, loop_heads=loop_heads)
acyclic_graph = networkx.DiGraph()
# add each node and its edge into the graph
visited = set()
for node in ordered_nodes:
visited.add(node)
acyclic_graph.add_node(node)
for successor in graph.successors(node):
if successor not in visited:
acyclic_graph.add_edge(node, successor)
return acyclic_graph
[docs]def dfs_back_edges(graph, start_node):
"""
Perform an iterative DFS traversal of the graph, returning back edges.
:param graph: The graph to traverse.
:param start_node: The node where to start the traversal.
:returns: An iterator of 'backward' edges.
"""
if start_node not in graph:
return # Ensures that the start node is in the graph
visited = set() # Tracks visited nodes
finished = set() # Tracks nodes whose descendants are fully explored
stack = [(start_node, iter(graph[start_node]))]
while stack:
node, children = stack[-1]
visited.add(node)
try:
child = next(children)
if child in visited:
if child not in finished:
yield node, child # Found a back edge
elif child not in finished: # Check if the child has not been finished
stack.append((child, iter(graph[child])))
except StopIteration:
stack.pop() # Done with this node's children
finished.add(node) # Mark this node as finished
[docs]def subgraph_between_nodes(graph, source, frontier, include_frontier=False):
"""
For a directed graph, return a subgraph that includes all nodes going from a source node to a target node.
:param networkx.DiGraph graph: The directed graph.
:param source: The source node.
:param list frontier: A collection of target nodes.
:param bool include_frontier: Should nodes in frontier be included in the subgraph.
:return: A subgraph.
:rtype: networkx.DiGraph
"""
graph = networkx.DiGraph(graph) # make a copy
for pred in list(graph.predecessors(source)):
# make sure we cannot go from any other node to the source node
graph.remove_edge(pred, source)
g0 = networkx.DiGraph()
if source not in graph or any(node not in graph for node in frontier):
raise KeyError("Source node or frontier nodes are not in the source graph.")
# BFS on graph and add new nodes to g0
queue = [source]
traversed = set()
frontier = set(frontier)
while queue:
node = queue.pop(0)
traversed.add(node)
for _, succ, data in graph.out_edges(node, data=True):
if g0.has_edge(node, succ):
continue
g0.add_edge(node, succ, **data)
if succ in traversed or succ in frontier:
continue
for frontier_node in frontier:
if networkx.has_path(graph, succ, frontier_node):
queue.append(succ)
break
# recursively remove all nodes that have less than two neighbors
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
while to_remove:
g0.remove_nodes_from(to_remove)
to_remove = [
n
for n in g0.nodes()
if n not in frontier and n is not source and (g0.out_degree[n] == 0 or g0.in_degree[n] == 0)
]
if not include_frontier:
# remove the frontier nodes
g0.remove_nodes_from(frontier)
return g0
[docs]def dominates(idom, dominator_node, node):
n = node
while n:
if n == dominator_node:
return True
if n in idom and n != idom[n]:
n = idom[n]
else:
n = None
return False
#
# Dominance frontier
#
[docs]def compute_dominance_frontier(graph, domtree):
"""
Compute a dominance frontier based on the given post-dominator tree.
This implementation is based on figure 2 of paper An Efficient Method of Computing Static Single Assignment
Form by Ron Cytron, etc.
:param graph: The graph where we want to compute the dominance frontier.
:param domtree: The dominator tree
:returns: A dict of dominance frontier
"""
df = {}
# Perform a post-order search on the dominator tree
for x in networkx.dfs_postorder_nodes(domtree):
if x not in graph:
# Skip nodes that are not in the graph
continue
df[x] = set()
# local set
for y in graph.successors(x):
if x not in domtree.predecessors(y):
df[x].add(y)
# up set
if x is None:
continue
for z in domtree.successors(x):
if z is x:
continue
if z not in df:
continue
for y in df[z]:
if x not in list(domtree.predecessors(y)):
df[x].add(y)
return df
#
# Dominators and post-dominators
#
[docs]class TemporaryNode:
"""
A temporary node.
Used as the start node and end node in post-dominator tree generation. Also used in some test cases.
"""
__slots__ = ["_label"]
[docs] def __init__(self, label):
self._label = label
def __repr__(self):
return "TN[%s]" % self._label
def __eq__(self, other):
if isinstance(other, TemporaryNode) and other._label == self._label:
return True
return False
def __hash__(self):
return hash(("TemporaryNode", self._label))
[docs]class ContainerNode:
"""
A container node.
Only used in dominator tree generation. We did this so we can set the index property without modifying the
original object.
"""
__slots__ = ["_obj", "index"]
[docs] def __init__(self, obj):
self._obj = obj
self.index = None
@property
def obj(self):
return self._obj
def __eq__(self, other):
if isinstance(other, ContainerNode):
return self._obj is other._obj
return False
def __hash__(self):
return hash(("CN", self._obj))
def __repr__(self):
return "CN[%s]" % repr(self._obj)
[docs]class Dominators:
"""
Describes dominators in a graph.
"""
dom: networkx.DiGraph
[docs] def __init__(self, graph, entry_node, successors_func=None, reverse=False):
self._l = logging.getLogger("utils.graph.dominators")
self._graph_successors_func = successors_func
self._reverse = reverse # Set it to True to generate a post-dominator tree.
# Temporary variables
self._ancestor = None
self._semi = None
self._label = None
# Output
self.dom = None # type: ignore # this is guaranteed to be not null after the __init__ returns
self.prepared_graph = None
self._construct(graph, entry_node)
def _graph_successors(self, graph, node):
"""
Return the successors of a node in the graph.
This method can be overriden in case there are special requirements with the graph and the successors. For
example, when we are dealing with a control flow graph, we may not want to get the FakeRet successors.
:param graph: The graph.
:param node: The node of which we want to get the successors.
:return: An iterator of successors.
:rtype: iter
"""
if self._graph_successors_func is not None:
return self._graph_successors_func(graph, node)
return graph.successors(node)
def _construct(self, graph, entry_node):
"""
Find post-dominators for each node in the graph.
This implementation is based on paper A Fast Algorithm for Finding Dominators in a Flow Graph by Thomas
Lengauer and Robert E. Tarjan from Stanford University, ACM Transactions on Programming Languages and Systems,
Vol. 1, No. 1, July 1979
"""
# Step 1
_prepared_graph, vertices, parent = self._prepare_graph(graph, entry_node)
# vertices is a list of ContainerNode instances
# parent is a dict storing the mapping from ContainerNode to ContainerNode
# Each node in prepared_graph is a ContainerNode instance
bucket = defaultdict(set)
dom = [None] * (len(vertices))
self._ancestor = [None] * (len(vertices) + 1)
for i in range(len(vertices) - 1, 0, -1):
w = vertices[i]
# Step 2
if w not in parent:
# It's one of the start nodes
continue
predecessors = _prepared_graph.predecessors(w)
for v in predecessors:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[w.index].index:
self._semi[w.index] = self._semi[u.index]
bucket[vertices[self._semi[w.index].index].index].add(w)
self._pd_link(parent[w], w)
# Step 3
for v in bucket[parent[w].index]:
u = self._pd_eval(v)
if self._semi[u.index].index < self._semi[v.index].index:
dom[v.index] = u
else:
dom[v.index] = parent[w]
bucket[parent[w].index].clear()
for i in range(1, len(vertices)):
w = vertices[i]
if w not in parent:
continue
if dom[w.index].index != vertices[self._semi[w.index].index].index:
dom[w.index] = dom[dom[w.index].index]
self.dom = networkx.DiGraph() # The post-dom tree described in a directional graph
for i in range(1, len(vertices)):
if dom[i] is not None and vertices[i] is not None:
self.dom.add_edge(dom[i].obj, vertices[i].obj)
# Output
self.prepared_graph = _prepared_graph
def _prepare_graph(self, graph, entry):
# We want to reverse the graph, and label each node according to its order in a DFS
new_graph = networkx.DiGraph()
n = entry
queue = [n]
start_node = TemporaryNode("start_node")
# Put the start_node into a Container as well
start_node = ContainerNode(start_node)
# Create the end_node, too
end_node = ContainerNode(TemporaryNode("end_node"))
container_nodes = {}
traversed_nodes = set()
while queue:
node = queue.pop()
successors = list(self._graph_successors(graph, node))
# Put it into a container
if node in container_nodes:
container_node = container_nodes[node]
else:
container_node = ContainerNode(node)
container_nodes[node] = container_node
traversed_nodes.add(container_node)
if len(successors) == 0:
# Note that this condition may never be satisfied if there is no real "end node" in the graph: the graph
# may end with a loop.
if self._reverse:
# Add an edge between the start node and this node
new_graph.add_edge(start_node, container_node)
else:
# Add an edge between our this node and end node
new_graph.add_edge(container_node, end_node)
for s in successors:
if s in container_nodes:
container_s = container_nodes[s]
else:
container_s = ContainerNode(s)
container_nodes[s] = container_s
if self._reverse:
new_graph.add_edge(container_s, container_node) # Reversed
else:
new_graph.add_edge(container_node, container_s) # Reversed
if container_s not in traversed_nodes:
queue.append(s)
if self._reverse:
# Add the end node
new_graph.add_edge(container_nodes[n], end_node)
else:
# Add the start node
new_graph.add_edge(start_node, container_nodes[n])
all_nodes_count = new_graph.number_of_nodes()
self._l.debug("There should be %d nodes in all", all_nodes_count)
counter = 0
vertices = [ContainerNode("placeholder")]
scanned_nodes = set()
parent = {}
while True:
# DFS from the current start node
stack = [start_node]
while len(stack) > 0:
node = stack.pop()
if node in scanned_nodes:
continue
counter += 1
# Mark it as scanned
scanned_nodes.add(node)
# Put the container node into vertices list
vertices.append(node)
# Put each successors into the stack
successors = new_graph.successors(node)
# Set the index property of it
node.index = counter
for s in successors:
if s not in scanned_nodes:
stack.append(s)
parent[s] = node
if counter >= all_nodes_count:
break
self._l.debug(
"%d nodes are left out during the DFS. They must formed a cycle themselves.", all_nodes_count - counter
)
# Find those nodes
leftovers = [s for s in traversed_nodes if s not in scanned_nodes]
new_graph.add_edge(start_node, leftovers[0])
# We have to start over...
counter = 0
parent = {}
scanned_nodes = set()
vertices = [ContainerNode("placeholder")]
self._semi = vertices[::]
self._label = vertices[::]
return new_graph, vertices, parent
def _pd_link(self, v, w):
self._ancestor[w.index] = v
def _pd_eval(self, v):
if self._ancestor[v.index] is None:
return v
else:
self._pd_compress(v)
return self._label[v.index]
def _pd_compress(self, v):
if self._ancestor[self._ancestor[v.index].index] is not None:
self._pd_compress(self._ancestor[v.index])
if (
self._semi[self._label[self._ancestor[v.index].index].index].index
< self._semi[self._label[v.index].index].index
):
self._label[v.index] = self._label[self._ancestor[v.index].index]
self._ancestor[v.index] = self._ancestor[self._ancestor[v.index].index]
[docs]class PostDominators(Dominators):
"""
Describe post-dominators in a graph.
"""
[docs] def __init__(self, graph, entry_node, successors_func=None):
super().__init__(graph, entry_node, successors_func=successors_func, reverse=True)
@property
def post_dom(self) -> networkx.DiGraph:
return self.dom
[docs]class SCCPlaceholder:
"""
Describes a placeholder for strongly-connected-components in a graph.
"""
__slots__ = ("scc_id",)
[docs] def __init__(self, scc_id):
self.scc_id = scc_id
def __eq__(self, other):
return isinstance(other, SCCPlaceholder) and other.scc_id == self.scc_id
def __hash__(self):
return hash("scc_placeholder_%d" % self.scc_id)
[docs]class GraphUtils:
"""
A helper class with some static methods and algorithms implemented, that in fact, might take more than just normal
CFGs.
"""
[docs] @staticmethod
def find_merge_points(function_addr, function_endpoints, graph): # pylint:disable=unused-argument
"""
Given a local transition graph of a function, find all merge points inside, and then perform a
quasi-topological sort of those merge points.
A merge point might be one of the following cases:
- two or more paths come together, and ends at the same address.
- end of the current function
:param int function_addr: Address of the function.
:param list function_endpoints: Endpoints of the function. They typically come from Function.endpoints.
:param networkx.DiGraph graph: A local transition graph of a function. Normally it comes from Function.graph.
:return: A list of ordered addresses of merge points.
:rtype: list
"""
merge_points = set()
for node in graph.nodes():
if graph.in_degree(node) > 1:
merge_points.add(node)
ordered_merge_points = GraphUtils.quasi_topological_sort_nodes(graph, merge_points)
addrs = [n.addr for n in ordered_merge_points]
return addrs
[docs] @staticmethod
def find_widening_points(function_addr, function_endpoints, graph): # pylint: disable=unused-argument
"""
Given a local transition graph of a function, find all widening points inside.
Correctly choosing widening points is very important in order to not lose too much information during static
analysis. We mainly consider merge points that has at least one loop back edges coming in as widening points.
:param int function_addr: Address of the function.
:param list function_endpoints: Endpoints of the function, typically coming from Function.endpoints.
:param networkx.DiGraph graph: A local transition graph of a function, normally Function.graph.
:return: A list of addresses of widening points.
:rtype: list
"""
sccs = networkx.strongly_connected_components(graph)
widening_addrs = set()
for scc in sccs:
if len(scc) == 1:
node = next(iter(scc))
if graph.has_edge(node, node):
# self loop
widening_addrs.add(node.addr)
else:
for n in scc:
predecessors = graph.predecessors(n)
if any(p not in scc for p in predecessors):
widening_addrs.add(n.addr)
break
return list(widening_addrs)
[docs] @staticmethod
def reverse_post_order_sort_nodes(graph, nodes=None):
"""
Sort a given set of nodes in reverse post ordering.
:param networkx.DiGraph graph: A local transition graph of a function.
:param iterable nodes: A collection of nodes to sort.
:return: A list of sorted nodes.
:rtype: list
"""
post_order = networkx.dfs_postorder_nodes(graph)
if nodes is None:
return reversed(list(post_order))
addrs_to_index = {n.addr: i for (i, n) in enumerate(post_order)}
return sorted(nodes, key=lambda n: addrs_to_index[n.addr], reverse=True)
[docs] @staticmethod
def quasi_topological_sort_nodes(
graph: networkx.DiGraph, nodes: Optional[List] = None, loop_heads: Optional[List] = None
) -> List:
"""
Sort a given set of nodes from a graph based on the following rules:
# - if A -> B and not B -> A, then we have A < B
# - if A -> B and B -> A, then the ordering is undefined
Following the above rules gives us a quasi-topological sorting of nodes in the graph. It also works for cyclic
graphs.
:param graph: A local transition graph of the function.
:param nodes: A list of nodes to sort. None if you want to sort all nodes inside the graph.
:param loop_heads: A list of nodes that should be treated loop heads.
:return: A list of ordered nodes.
"""
# fast path for single node graphs
if graph.number_of_nodes() == 1:
if nodes is None:
return list(graph.nodes)
return [n for n in graph.nodes() if n in nodes]
# make a copy to the graph since we are gonna modify it
graph_copy = networkx.DiGraph()
# find all strongly connected components in the graph
sccs = [scc for scc in networkx.strongly_connected_components(graph) if len(scc) > 1]
def _sort_edge(edge):
"""
A sorter to make a deterministic order of edges.
"""
_src, _dst = edge
src_addr, dst_addr = 0, 0
if hasattr(_src, "addr"):
src_addr = _src.addr
elif isinstance(_src, int):
src_addr = _src
if hasattr(_dst, "addr"):
dst_addr = _dst.addr
elif isinstance(_dst, int):
dst_addr = _dst
return src_addr + dst_addr
# collapse all strongly connected components
edges = sorted(list(graph.edges()), key=_sort_edge)
for src, dst in edges:
scc_index = GraphUtils._components_index_node(sccs, src)
if scc_index is not None:
src = SCCPlaceholder(scc_index)
scc_index = GraphUtils._components_index_node(sccs, dst)
if scc_index is not None:
dst = SCCPlaceholder(scc_index)
if isinstance(src, SCCPlaceholder) and isinstance(dst, SCCPlaceholder) and src == dst:
if src not in graph_copy:
graph_copy.add_node(src)
continue
if src == dst:
if src not in graph_copy:
graph_copy.add_node(src)
continue
graph_copy.add_edge(src, dst)
# add loners
out_degree_zero_nodes = [node for (node, degree) in graph.out_degree() if degree == 0]
for node in out_degree_zero_nodes:
if graph.in_degree(node) == 0:
graph_copy.add_node(node)
# topological sort on acyclic graph `graph_copy`
tmp_nodes = networkx.topological_sort(graph_copy)
ordered_nodes = []
for n in tmp_nodes:
if isinstance(n, SCCPlaceholder):
GraphUtils._append_scc(graph, ordered_nodes, sccs[n.scc_id], loop_head_candidates=loop_heads)
else:
ordered_nodes.append(n)
if nodes is None:
return ordered_nodes
nodes = set(nodes)
ordered_nodes = [n for n in ordered_nodes if n in nodes]
return ordered_nodes
@staticmethod
def _components_index_node(components, node):
for i, comp in enumerate(components):
if node in comp:
return i
return None
@staticmethod
def _append_scc(
graph: networkx.DiGraph, ordered_nodes: List, scc: Set, loop_head_candidates: Optional[List] = None
) -> None:
"""
Append all nodes from a strongly connected component to a list of ordered nodes and ensure the topological
order.
:param graph: The graph where all nodes belong to.
:param ordered_nodes: Ordered nodes.
:param scc: A set of nodes that forms a strongly connected component in the graph.
"""
loop_head = None
if loop_head_candidates is not None:
# find the first node that appears in loop_heads
loop_head_candidates = set(loop_head_candidates)
for n in scc:
if n in loop_head_candidates:
loop_head = n
break
if loop_head is None:
for parent_node in reversed(ordered_nodes):
# find all successors to this node
succs = set(graph.successors(parent_node))
scc_succs = scc.intersection(succs)
if len(scc_succs) == 1:
loop_head = next(iter(scc_succs))
break
if len(scc_succs) > 1:
# calculate the distance between each pair of nodes within scc_succs, pick the one with the
# shortest total distance
scc_node_distance = defaultdict(int)
for scc_succ in scc_succs:
for other_node in scc_succs:
if other_node is scc_succ:
continue
scc_node_distance[scc_succ] += networkx.algorithms.shortest_path_length(
graph, scc_succ, other_node
)
distance_to_node = {v: k for k, v in scc_node_distance.items()}
lowest_distance = min(distance_to_node)
loop_head = distance_to_node[lowest_distance]
break
if loop_head is None:
# randomly pick one
loop_head = next(iter(scc))
subgraph: networkx.DiGraph = graph.subgraph(scc).copy()
for src, _ in list(subgraph.in_edges(loop_head)):
subgraph.remove_edge(src, loop_head)
# panic mode: if the strongly connected component has too many edges (imagine an almost complete graph), it
# will take too long to converge if we only remove one node out of the component each time. we introduce a
# panic mode that will aggressively remove edges
if len(subgraph) > 3000 and len(subgraph.edges) > len(subgraph) * 1.4:
for n in scc:
if subgraph.in_degree[n] >= 1 and subgraph.out_degree[n] >= 1:
for src in list(subgraph.predecessors(n)):
if src is not n:
subgraph.remove_edge(src, n)
if len(subgraph.edges) <= len(subgraph) * 1.4:
break
ordered_nodes.extend(GraphUtils.quasi_topological_sort_nodes(subgraph))