Source code for angr.analyses.bindiff

import logging
import math
import types
from collections import deque, defaultdict

import networkx

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from angr.knowledge_plugins import Function

from . import Analysis, CFGEmulated

from ..errors import SimEngineError, SimMemoryError

# todo include an explanation of the algorithm
# todo include a method that detects any change other than constants
# todo use function names / string references where available

l = logging.getLogger(name=__name__)

# basic block changes
DIFF_TYPE = "type"
DIFF_VALUE = "value"


# exception for trying find basic block changes
[docs]class UnmatchedStatementsException(Exception): pass
# statement difference classes
[docs]class Difference:
[docs] def __init__(self, diff_type, value_a, value_b): self.type = diff_type self.value_a = value_a self.value_b = value_b
[docs]class ConstantChange:
[docs] def __init__(self, offset, value_a, value_b): self.offset = offset self.value_a = value_a self.value_b = value_b
# helper methods def _euclidean_dist(vector_a, vector_b): """ :param vector_a: A list of numbers. :param vector_b: A list of numbers. :returns: The euclidean distance between the two vectors. """ dist = 0 for x, y in zip(vector_a, vector_b): dist += (x - y) * (x - y) return math.sqrt(dist) def _get_closest_matches(input_attributes, target_attributes): """ :param input_attributes: First dictionary of objects to attribute tuples. :param target_attributes: Second dictionary of blocks to attribute tuples. :returns: A dictionary of objects in the input_attributes to the closest objects in the target_attributes. """ closest_matches = {} # for each object in the first set find the objects with the closest target attributes for a in input_attributes: best_dist = float("inf") best_matches = [] for b in target_attributes: dist = _euclidean_dist(input_attributes[a], target_attributes[b]) if dist < best_dist: best_matches = [b] best_dist = dist elif dist == best_dist: best_matches.append(b) closest_matches[a] = best_matches return closest_matches # from https://rosettacode.org/wiki/Levenshtein_distance def _levenshtein_distance(s1, s2): """ :param s1: A list or string :param s2: Another list or string :returns: The levenshtein distance between the two """ if len(s1) > len(s2): s1, s2 = s2, s1 distances = range(len(s1) + 1) for index2, num2 in enumerate(s2): new_distances = [index2 + 1] for index1, num1 in enumerate(s1): if num1 == num2: new_distances.append(distances[index1]) else: new_distances.append(1 + min((distances[index1], distances[index1 + 1], new_distances[-1]))) distances = new_distances return distances[-1] def _normalized_levenshtein_distance(s1, s2, acceptable_differences): """ This function calculates the levenshtein distance but allows for elements in the lists to be different by any number in the set acceptable_differences. :param s1: A list. :param s2: Another list. :param acceptable_differences: A set of numbers. If (s2[i]-s1[i]) is in the set then they are considered equal. :returns: """ if len(s1) > len(s2): s1, s2 = s2, s1 acceptable_differences = {-i for i in acceptable_differences} distances = range(len(s1) + 1) for index2, num2 in enumerate(s2): new_distances = [index2 + 1] for index1, num1 in enumerate(s1): if num2 - num1 in acceptable_differences: new_distances.append(distances[index1]) else: new_distances.append(1 + min((distances[index1], distances[index1 + 1], new_distances[-1]))) distances = new_distances return distances[-1] def _is_better_match(x, y, matched_a, matched_b, attributes_dict_a, attributes_dict_b): """ :param x: The first element of a possible match. :param y: The second element of a possible match. :param matched_a: The current matches for the first set. :param matched_b: The current matches for the second set. :param attributes_dict_a: The attributes for each element in the first set. :param attributes_dict_b: The attributes for each element in the second set. :returns: True/False """ attributes_x = attributes_dict_a[x] attributes_y = attributes_dict_b[y] if x in matched_a: attributes_match = attributes_dict_b[matched_a[x]] if _euclidean_dist(attributes_x, attributes_y) >= _euclidean_dist(attributes_x, attributes_match): return False if y in matched_b: attributes_match = attributes_dict_a[matched_b[y]] if _euclidean_dist(attributes_x, attributes_y) >= _euclidean_dist(attributes_y, attributes_match): return False return True
[docs]def differing_constants(block_a, block_b): """ Compares two basic blocks and finds all the constants that differ from the first block to the second. :param block_a: The first block to compare. :param block_b: The second block to compare. :returns: Returns a list of differing constants in the form of ConstantChange, which has the offset in the block and the respective constants. """ statements_a = [s for s in block_a.vex.statements if s.tag != "Ist_IMark"] + [block_a.vex.next] statements_b = [s for s in block_b.vex.statements if s.tag != "Ist_IMark"] + [block_b.vex.next] if len(statements_a) != len(statements_b): raise UnmatchedStatementsException("Blocks have different numbers of statements") start_1 = min(block_a.instruction_addrs) start_2 = min(block_b.instruction_addrs) changes = [] # check statements current_offset = None for statement, statement_2 in zip(statements_a, statements_b): # sanity check if statement.tag != statement_2.tag: raise UnmatchedStatementsException("Statement tag has changed") if statement.tag == "Ist_IMark": if statement.addr - start_1 != statement_2.addr - start_2: raise UnmatchedStatementsException("Instruction length has changed") current_offset = statement.addr - start_1 continue differences = compare_statement_dict(statement, statement_2) for d in differences: if d.type != DIFF_VALUE: raise UnmatchedStatementsException("Instruction has changed") else: changes.append(ConstantChange(current_offset, d.value_a, d.value_b)) return changes
[docs]def compare_statement_dict(statement_1, statement_2): # should return whether or not the statement's type/effects changed # need to return the specific number that changed too if type(statement_1) != type(statement_2): return [Difference(DIFF_TYPE, None, None)] # None if statement_1 is None and statement_2 is None: return [] # constants if isinstance(statement_1, (int, float, str, bytes)): if isinstance(statement_1, float) and math.isnan(statement_1) and math.isnan(statement_2): return [] elif statement_1 == statement_2: return [] else: return [Difference(None, statement_1, statement_2)] # tuples/lists if isinstance(statement_1, (tuple, list)): if len(statement_1) != len(statement_2): return Difference(DIFF_TYPE, None, None) differences = [] for s1, s2 in zip(statement_1, statement_2): differences += compare_statement_dict(s1, s2) return differences # Yan's weird types differences = [] for attr in statement_1.__slots__: # don't check arch, property, or methods if attr == "arch": continue if hasattr(statement_1.__class__, attr) and isinstance(getattr(statement_1.__class__, attr), property): continue if isinstance(getattr(statement_1, attr), types.MethodType): continue new_diffs = compare_statement_dict(getattr(statement_1, attr), getattr(statement_2, attr)) # set the difference types for diff in new_diffs: if diff.type is None: diff.type = attr differences += new_diffs return differences
[docs]class NormalizedBlock: # block may span multiple calls
[docs] def __init__(self, block, function): addresses = [block.addr] if block.addr in function.merged_blocks: for a in function.merged_blocks[block.addr]: addresses.append(a.addr) self.addr = block.addr self.addresses = addresses self.statements = [] self.all_constants = [] self.operations = [] self.call_targets = [] self.blocks = [] self.instruction_addrs = [] if block.addr in function.call_sites: self.call_targets = function.call_sites[block.addr] self.jumpkind = None for a in addresses: block = function.project.factory.block(a) self.instruction_addrs += block.instruction_addrs irsb = block.vex self.blocks.append(block) self.statements += irsb.statements self.all_constants += irsb.all_constants self.operations += irsb.operations self.jumpkind = irsb.jumpkind self.size = sum([b.size for b in self.blocks])
def __repr__(self): size = sum([b.size for b in self.blocks]) return "<Normalized Block for %#x, %d bytes>" % (self.addr, size)
[docs]class NormalizedFunction: # a more normalized function
[docs] def __init__(self, function: "Function"): # start by copying the graph self.graph: networkx.DiGraph = function.graph.copy() self.project = function._function_manager._kb._project self.call_sites = {} self.startpoint = function.startpoint self.merged_blocks = {} self.orig_function = function # find nodes which end in call and combine them done = False while not done: done = True for node in self.graph.nodes(): try: bl = self.project.factory.block(node.addr) except (SimMemoryError, SimEngineError): continue # merge if it ends with a single call, and the successor has only one predecessor and succ is after successors = list(self.graph.successors(node)) if ( bl.vex.jumpkind == "Ijk_Call" and len(successors) == 1 and len(list(self.graph.predecessors(successors[0]))) == 1 and successors[0].addr > node.addr ): # add edges to the successors of its successor, and delete the original successors succ = list(self.graph.successors(node))[0] for s in self.graph.successors(succ): self.graph.add_edge(node, s) self.graph.remove_node(succ) done = False # add to merged blocks if node not in self.merged_blocks: self.merged_blocks[node] = [] self.merged_blocks[node].append(succ) if succ in self.merged_blocks: self.merged_blocks[node] += self.merged_blocks[succ] del self.merged_blocks[succ] # stop iterating and start over break # set up call sites for n in self.graph.nodes(): call_targets = [] if n.addr in self.orig_function.get_call_sites(): call_targets.append(self.orig_function.get_call_target(n.addr)) if n.addr in self.merged_blocks: for block in self.merged_blocks[n]: if block.addr in self.orig_function.get_call_sites(): call_targets.append(self.orig_function.get_call_target(block.addr)) if len(call_targets) > 0: self.call_sites[n] = call_targets
[docs]class FunctionDiff: """ This class computes the a diff between two functions. """
[docs] def __init__(self, function_a: "Function", function_b: "Function", bindiff=None): """ :param function_a: The first angr Function object to diff. :param function_b: The second angr Function object. :param bindiff: An optional Bindiff object. Used for some extra normalization during basic block comparison. """ self._function_a = NormalizedFunction(function_a) self._function_b = NormalizedFunction(function_b) self._project_a = self._function_a.project self._project_b = self._function_b.project self._bindiff = bindiff self._attributes_a = {} self._attributes_b = {} self._block_matches = set() self._unmatched_blocks_from_a = set() self._unmatched_blocks_from_b = set() self._compute_diff()
@property def probably_identical(self): """ :returns: Whether or not these two functions are identical. """ if len(self._unmatched_blocks_from_a | self._unmatched_blocks_from_b) > 0: return False for a, b in self._block_matches: if not self.blocks_probably_identical(a, b): return False return True @property def identical_blocks(self): """ :returns: A list of block matches which appear to be identical """ identical_blocks = [] for block_a, block_b in self._block_matches: if self.blocks_probably_identical(block_a, block_b): identical_blocks.append((block_a, block_b)) return identical_blocks @property def differing_blocks(self): """ :returns: A list of block matches which appear to differ """ differing_blocks = [] for block_a, block_b in self._block_matches: if not self.blocks_probably_identical(block_a, block_b): differing_blocks.append((block_a, block_b)) return differing_blocks @property def blocks_with_differing_constants(self): """ :return: A list of block matches which appear to differ """ differing_blocks = [] diffs = {} for block_a, block_b in self._block_matches: if self.blocks_probably_identical(block_a, block_b) and not self.blocks_probably_identical( block_a, block_b, check_constants=True ): differing_blocks.append((block_a, block_b)) for block_a, block_b in differing_blocks: ba = NormalizedBlock(block_a, self._function_a) bb = NormalizedBlock(block_b, self._function_b) diffs[(block_a, block_b)] = FunctionDiff._block_diff_constants(ba, bb) return diffs @property def block_matches(self): return self._block_matches @property def unmatched_blocks(self): return self._unmatched_blocks_from_a, self._unmatched_blocks_from_b
[docs] @staticmethod def get_normalized_block(addr, function): """ :param addr: Where to start the normalized block. :param function: A function containing the block address. :returns: A normalized basic block. """ return NormalizedBlock(addr, function)
[docs] def block_similarity(self, block_a, block_b): """ :param block_a: The first block address. :param block_b: The second block address. :returns: The similarity of the basic blocks, normalized for the base address of the block and function call addresses. """ # handle sim procedure blocks if self._project_a.is_hooked(block_a) and self._project_b.is_hooked(block_b): if self._project_a._sim_procedures[block_a] == self._project_b._sim_procedures[block_b]: return 1.0 else: return 0.0 try: block_a = NormalizedBlock(block_a, self._function_a) except (SimMemoryError, SimEngineError): block_a = None try: block_b = NormalizedBlock(block_b, self._function_b) except (SimMemoryError, SimEngineError): block_b = None # if both were None then they are assumed to be the same, if only one was the same they are assumed to differ if block_a is None and block_b is None: return 1.0 elif block_a is None or block_b is None: return 0.0 # get all elements for computing similarity tags_a = [s.tag for s in block_a.statements] tags_b = [s.tag for s in block_b.statements] consts_a = [c.value for c in block_a.all_constants] consts_b = [c.value for c in block_b.all_constants] all_registers_a = [s.offset for s in block_a.statements if hasattr(s, "offset")] all_registers_b = [s.offset for s in block_b.statements if hasattr(s, "offset")] jumpkind_a = block_a.jumpkind jumpkind_b = block_b.jumpkind # compute total distance total_dist = 0 total_dist += _levenshtein_distance(tags_a, tags_b) total_dist += _levenshtein_distance(block_a.operations, block_b.operations) total_dist += _levenshtein_distance(all_registers_a, all_registers_b) acceptable_differences = self._get_acceptable_constant_differences(block_a, block_b) total_dist += _normalized_levenshtein_distance(consts_a, consts_b, acceptable_differences) total_dist += 0 if jumpkind_a == jumpkind_b else 1 # compute similarity num_values = max(len(tags_a), len(tags_b)) num_values += max(len(consts_a), len(consts_b)) num_values += max(len(block_a.operations), len(block_b.operations)) num_values += 1 # jumpkind similarity = 1 - (float(total_dist) / num_values) return similarity
[docs] def blocks_probably_identical(self, block_a, block_b, check_constants=False): """ :param block_a: The first block address. :param block_b: The second block address. :param check_constants: Whether or not to require matching constants in blocks. :returns: Whether or not the blocks appear to be identical. """ # handle sim procedure blocks if self._project_a.is_hooked(block_a) and self._project_b.is_hooked(block_b): return self._project_a._sim_procedures[block_a] == self._project_b._sim_procedures[block_b] try: block_a = NormalizedBlock(block_a, self._function_a) except (SimMemoryError, SimEngineError): block_a = None try: block_b = NormalizedBlock(block_b, self._function_b) except (SimMemoryError, SimEngineError): block_b = None # if both were None then they are assumed to be the same, if only one was None they are assumed to differ if block_a is None and block_b is None: return True elif block_a is None or block_b is None: return False # if they represent a different number of blocks they are not the same if len(block_a.blocks) != len(block_b.blocks): return False # check differing constants try: diff_constants = FunctionDiff._block_diff_constants(block_a, block_b) except UnmatchedStatementsException: return False if not check_constants: return True # get values of differences that probably indicate no change acceptable_differences = self._get_acceptable_constant_differences(block_a, block_b) # todo match globals for c in diff_constants: if (c.value_a, c.value_b) in self._block_matches: # constants point to matched basic blocks continue if self._bindiff is not None and (c.value_a and c.value_b) in self._bindiff.function_matches: # constants point to matched functions continue # if both are in the binary we'll assume it's okay, although we should really match globals # TODO use global matches if self._project_a.loader.main_object.contains_addr( c.value_a ) and self._project_b.loader.main_object.contains_addr(c.value_b): continue # if the difference is equal to the difference in block addr's or successor addr's we'll say it's also okay if c.value_b - c.value_a in acceptable_differences: continue # otherwise they probably are different return False # the blocks appear to be identical return True
@staticmethod def _block_diff_constants(block_a, block_b): diff_constants = [] for irsb_a, irsb_b in zip(block_a.blocks, block_b.blocks): diff_constants += differing_constants(irsb_a, irsb_b) return diff_constants @staticmethod def _compute_block_attributes(function: NormalizedFunction): """ :param function: A normalized function object. :returns: A dictionary of basic block addresses to tuples of attributes. """ # The attributes we use are the distance form function start, distance from function exit and whether # or not it has a subfunction call distances_from_start = FunctionDiff._distances_from_function_start(function) distances_from_exit = FunctionDiff._distances_from_function_exit(function) call_sites = function.call_sites attributes = {} for block in function.graph.nodes(): if block in call_sites: number_of_subfunction_calls = len(call_sites[block]) else: number_of_subfunction_calls = 0 # there really shouldn't be blocks that can't be reached from the start, but there are for now dist_start = distances_from_start[block] if block in distances_from_start else 10000 dist_exit = distances_from_exit[block] if block in distances_from_exit else 10000 attributes[block] = (dist_start, dist_exit, number_of_subfunction_calls) return attributes @staticmethod def _distances_from_function_start(function: NormalizedFunction): """ :param function: A normalized Function object. :returns: A dictionary of basic block addresses and their distance to the start of the function. """ return networkx.single_source_shortest_path_length(function.graph, function.startpoint) @staticmethod def _distances_from_function_exit(function: NormalizedFunction): """ :param function: A normalized Function object. :returns: A dictionary of basic block addresses and their distance to the exit of the function. """ reverse_graph: networkx.DiGraph = function.graph.reverse() # we aren't guaranteed to have an exit from the function so explicitly add the node reverse_graph.add_node("start") found_exits = False for n in function.graph.nodes(): if len(list(function.graph.successors(n))) == 0: reverse_graph.add_edge("start", n) found_exits = True # if there were no exits (a function with a while 1) let's consider the block with the highest address to # be the exit. This isn't the most scientific way, but since this case is pretty rare it should be okay if not found_exits: last = max(function.graph.nodes(), key=lambda x: x.addr) reverse_graph.add_edge("start", last) dists = networkx.single_source_shortest_path_length(reverse_graph, "start") # remove temp node del dists["start"] # correct for the added node for n in dists: dists[n] -= 1 return dists def _compute_diff(self): """ Computes the diff of the functions and saves the result. """ # get the attributes for all blocks l.debug( "Computing diff of functions: %s, %s", ("%#x" % self._function_a.startpoint.addr) if self._function_a.startpoint is not None else "None", ("%#x" % self._function_b.startpoint.addr) if self._function_b.startpoint is not None else "None", ) self.attributes_a = self._compute_block_attributes(self._function_a) self.attributes_b = self._compute_block_attributes(self._function_b) # get the initial matches initial_matches = self._get_block_matches( self.attributes_a, self.attributes_b, tiebreak_with_block_similarity=False ) # Use a queue so we process matches in the order that they are found to_process = deque(initial_matches) # Keep track of which matches we've already added to the queue processed_matches = {(x, y) for (x, y) in initial_matches} # Keep a dict of current matches, which will be updated if better matches are found matched_a = {} matched_b = {} for x, y in processed_matches: matched_a[x] = y matched_b[y] = x # while queue is not empty while to_process: (block_a, block_b) = to_process.pop() l.debug("FunctionDiff: Processing (%#x, %#x)", block_a.addr, block_b.addr) # we could find new matches in the successors or predecessors of functions block_a_succ = list(self._function_a.graph.successors(block_a)) block_b_succ = list(self._function_b.graph.successors(block_b)) block_a_pred = list(self._function_a.graph.predecessors(block_a)) block_b_pred = list(self._function_b.graph.predecessors(block_b)) # propagate the difference in blocks as delta delta = tuple((i - j) for i, j in zip(self.attributes_b[block_b], self.attributes_a[block_a])) # get possible new matches new_matches = [] # if the blocks are identical then the successors should most likely be matched in the same order if self.blocks_probably_identical(block_a, block_b) and len(block_a_succ) == len(block_b_succ): ordered_succ_a = self._get_ordered_successors(self._project_a, block_a, block_a_succ) ordered_succ_b = self._get_ordered_successors(self._project_b, block_b, block_b_succ) new_matches.extend(zip(ordered_succ_a, ordered_succ_b)) new_matches += self._get_block_matches( self.attributes_a, self.attributes_b, block_a_succ, block_b_succ, delta, tiebreak_with_block_similarity=True, ) new_matches += self._get_block_matches( self.attributes_a, self.attributes_b, block_a_pred, block_b_pred, delta, tiebreak_with_block_similarity=True, ) # for each of the possible new matches add it if it improves the matching for x, y in new_matches: if (x, y) not in processed_matches: processed_matches.add((x, y)) l.debug("FunctionDiff: checking if (%#x, %#x) is better", x.addr, y.addr) # if it's a better match than what we already have use it if _is_better_match(x, y, matched_a, matched_b, self.attributes_a, self.attributes_b): l.debug("FunctionDiff: adding possible match (%#x, %#x)", x.addr, y.addr) if x in matched_a: old_match = matched_a[x] del matched_b[old_match] if y in matched_b: old_match = matched_b[y] del matched_a[old_match] matched_a[x] = y matched_b[y] = x to_process.appendleft((x, y)) # reformat matches into a set of pairs self._block_matches = {(x, y) for (x, y) in matched_a.items()} # get the unmatched blocks self._unmatched_blocks_from_a = {x for x in self._function_a.graph.nodes() if x not in matched_a} self._unmatched_blocks_from_b = {x for x in self._function_b.graph.nodes() if x not in matched_b} @staticmethod def _get_ordered_successors(project, block, succ): try: # add them in order of the vex addr = block.addr succ = set(succ) ordered_succ = [] bl = project.factory.block(addr) for x in bl.vex.all_constants: if x in succ: ordered_succ.append(x) # add the rest (sorting might be better than no order) for s in sorted(succ - set(ordered_succ), key=lambda x: x.addr): ordered_succ.append(s) return ordered_succ except (SimMemoryError, SimEngineError): return sorted(succ, key=lambda x: x.addr) def _get_block_matches( self, attributes_a, attributes_b, filter_set_a=None, filter_set_b=None, delta=(0, 0, 0), tiebreak_with_block_similarity=False, ): """ :param attributes_a: A dict of blocks to their attributes :param attributes_b: A dict of blocks to their attributes The following parameters are optional. :param filter_set_a: A set to limit attributes_a to the blocks in this set. :param filter_set_b: A set to limit attributes_b to the blocks in this set. :param delta: An offset to add to each vector in attributes_a. :returns: A list of tuples of matching objects. """ # get the attributes that are in the sets if filter_set_a is None: filtered_attributes_a = {k: v for k, v in attributes_a.items()} else: filtered_attributes_a = {k: v for k, v in attributes_a.items() if k in filter_set_a} if filter_set_b is None: filtered_attributes_b = {k: v for k, v in attributes_b.items()} else: filtered_attributes_b = {k: v for k, v in attributes_b.items() if k in filter_set_b} # add delta for k in filtered_attributes_a: filtered_attributes_a[k] = tuple((i + j) for i, j in zip(filtered_attributes_a[k], delta)) for k in filtered_attributes_b: filtered_attributes_b[k] = tuple((i + j) for i, j in zip(filtered_attributes_b[k], delta)) # get closest closest_a = _get_closest_matches(filtered_attributes_a, filtered_attributes_b) closest_b = _get_closest_matches(filtered_attributes_b, filtered_attributes_a) if tiebreak_with_block_similarity: # use block similarity to break ties in the first set for a in closest_a: if len(closest_a[a]) > 1: best_similarity = 0 best = [] for x in closest_a[a]: similarity = self.block_similarity(a, x) if similarity > best_similarity: best_similarity = similarity best = [x] elif similarity == best_similarity: best.append(x) closest_a[a] = best # use block similarity to break ties in the second set for b in closest_b: if len(closest_b[b]) > 1: best_similarity = 0 best = [] for x in closest_b[b]: similarity = self.block_similarity(x, b) if similarity > best_similarity: best_similarity = similarity best = [x] elif similarity == best_similarity: best.append(x) closest_b[b] = best # a match (x,y) is good if x is the closest to y and y is the closest to x matches = [] for a in closest_a: if len(closest_a[a]) == 1: match = closest_a[a][0] if len(closest_b[match]) == 1 and closest_b[match][0] == a: matches.append((a, match)) return matches def _get_acceptable_constant_differences(self, block_a, block_b): # keep a set of the acceptable differences in constants between the two blocks acceptable_differences = set() acceptable_differences.add(0) block_a_base = block_a.instruction_addrs[0] block_b_base = block_b.instruction_addrs[0] acceptable_differences.add(block_b_base - block_a_base) # get matching successors for target_a, target_b in zip(block_a.call_targets, block_b.call_targets): # these can be none if we couldn't resolve the call target if target_a is None or target_b is None: continue acceptable_differences.add(target_b - target_a) acceptable_differences.add((target_b - block_b_base) - (target_a - block_a_base)) # get the difference between the data segments # this is hackish if ( ".bss" in self._project_a.loader.main_object.sections_map and ".bss" in self._project_b.loader.main_object.sections_map ): bss_a = self._project_a.loader.main_object.sections_map[".bss"].min_addr bss_b = self._project_b.loader.main_object.sections_map[".bss"].min_addr acceptable_differences.add(bss_b - bss_a) acceptable_differences.add((bss_b - block_b_base) - (bss_a - block_a_base)) return acceptable_differences
[docs]class BinDiff(Analysis): """ This class computes the a diff between two binaries represented by angr Projects """
[docs] def __init__(self, other_project, enable_advanced_backward_slicing=False, cfg_a=None, cfg_b=None): """ :param other_project: The second project to diff """ l.debug("Computing cfg's") back_traversal = not enable_advanced_backward_slicing if cfg_a is None: # self.cfg_a = self.project.analyses.CFG(resolve_indirect_jumps=True) # self.cfg_b = other_project.analyses.CFG(resolve_indirect_jumps=True) self.cfg_a = self.project.analyses[CFGEmulated].prep()( context_sensitivity_level=1, keep_state=True, enable_symbolic_back_traversal=back_traversal, enable_advanced_backward_slicing=enable_advanced_backward_slicing, ) self.cfg_b = other_project.analyses[CFGEmulated].prep()( context_sensitivity_level=1, keep_state=True, enable_symbolic_back_traversal=back_traversal, enable_advanced_backward_slicing=enable_advanced_backward_slicing, ) else: self.cfg_a = cfg_a self.cfg_b = cfg_b l.debug("Done computing cfg's") self._p2 = other_project self._attributes_a = {} self._attributes_b = {} self._function_diffs = {} self.function_matches = set() self._unmatched_functions_from_a = set() self._unmatched_functions_from_b = set() self._compute_diff()
[docs] def functions_probably_identical(self, func_a_addr, func_b_addr, check_consts=False): """ Compare two functions and return True if they appear identical. :param func_a_addr: The address of the first function (in the first binary). :param func_b_addr: The address of the second function (in the second binary). :returns: Whether or not the functions appear to be identical. """ if self.cfg_a.project.is_hooked(func_a_addr) and self.cfg_b.project.is_hooked(func_b_addr): return self.cfg_a.project._sim_procedures[func_a_addr] == self.cfg_b.project._sim_procedures[func_b_addr] func_diff = self.get_function_diff(func_a_addr, func_b_addr) if check_consts: return func_diff.probably_identical_with_consts return func_diff.probably_identical
@property def identical_functions(self): """ :returns: A list of function matches that appear to be identical """ identical_funcs = [] for func_a, func_b in self.function_matches: if self.functions_probably_identical(func_a, func_b): identical_funcs.append((func_a, func_b)) return identical_funcs @property def differing_functions(self): """ :returns: A list of function matches that appear to differ """ different_funcs = [] for func_a, func_b in self.function_matches: if not self.functions_probably_identical(func_a, func_b): different_funcs.append((func_a, func_b)) return different_funcs
[docs] def differing_functions_with_consts(self): """ :return: A list of function matches that appear to differ including just by constants """ different_funcs = [] for func_a, func_b in self.function_matches: if not self.functions_probably_identical(func_a, func_b, check_consts=True): different_funcs.append((func_a, func_b)) return different_funcs
@property def differing_blocks(self): """ :returns: A list of block matches that appear to differ """ differing_blocks = [] for func_a, func_b in self.function_matches: differing_blocks.extend(self.get_function_diff(func_a, func_b).differing_blocks) return differing_blocks @property def identical_blocks(self): """ :return A list of all block matches that appear to be identical """ identical_blocks = [] for func_a, func_b in self.function_matches: identical_blocks.extend(self.get_function_diff(func_a, func_b).identical_blocks) return identical_blocks @property def blocks_with_differing_constants(self): """ :return: A dict of block matches with differing constants to the tuple of constants """ diffs = {} for func_a, func_b in self.function_matches: diffs.update(self.get_function_diff(func_a, func_b).blocks_with_differing_constants) return diffs @property def unmatched_functions(self): return self._unmatched_functions_from_a, self._unmatched_functions_from_b # gets the diff of two functions in the binaries
[docs] def get_function_diff(self, function_addr_a, function_addr_b): """ :param function_addr_a: The address of the first function (in the first binary) :param function_addr_b: The address of the second function (in the second binary) :returns: the FunctionDiff of the two functions """ pair = (function_addr_a, function_addr_b) if pair not in self._function_diffs: function_a = self.cfg_a.kb.functions.function(function_addr_a) function_b = self.cfg_b.kb.functions.function(function_addr_b) self._function_diffs[pair] = FunctionDiff(function_a, function_b, self) return self._function_diffs[pair]
@staticmethod def _compute_function_attributes(cfg): """ :param cfg: An angr CFG object :returns: a dictionary of function addresses to tuples of attributes """ # the attributes we use are the number of basic blocks, number of edges, and number of subfunction calls attributes = {} all_funcs = set(cfg.kb.callgraph.nodes()) for function_addr in cfg.kb.functions: # skip syscalls and functions which are None in the cfg if cfg.kb.functions.function(function_addr) is None or cfg.kb.functions.function(function_addr).is_syscall: continue if cfg.kb.functions.function(function_addr) is not None: normalized_funtion = NormalizedFunction(cfg.kb.functions.function(function_addr)) number_of_basic_blocks = len(normalized_funtion.graph.nodes()) number_of_edges = len(normalized_funtion.graph.edges()) else: number_of_basic_blocks = 0 number_of_edges = 0 if function_addr in all_funcs: number_of_subfunction_calls = len(list(cfg.kb.callgraph.successors(function_addr))) else: number_of_subfunction_calls = 0 attributes[function_addr] = (number_of_basic_blocks, number_of_edges, number_of_subfunction_calls) return attributes def _get_call_site_matches(self, func_a, func_b): possible_matches = set() # Make sure those functions are not SimProcedures f_a = self.cfg_a.kb.functions.function(func_a) f_b = self.cfg_b.kb.functions.function(func_b) if f_a.startpoint is None or f_b.startpoint is None: return possible_matches fd = self.get_function_diff(func_a, func_b) basic_block_matches = fd.block_matches function_a = fd._function_a function_b = fd._function_b for a, b in basic_block_matches: if a in function_a.call_sites and b in function_b.call_sites: # add them in order for target_a, target_b in zip(function_a.call_sites[a], function_b.call_sites[b]): possible_matches.add((target_a, target_b)) # add them in reverse, since if a new call was added the ordering from each side # will remain constant until the change for target_a, target_b in zip(reversed(function_a.call_sites[a]), reversed(function_b.call_sites[b])): possible_matches.add((target_a, target_b)) return possible_matches def _get_plt_matches(self): plt_matches = [] for name, addr in self.project.loader.main_object.plt.items(): if name in self._p2.loader.main_object.plt: plt_matches.append((addr, self._p2.loader.main_object.plt[name])) # in the case of sim procedures the actual sim procedure might be in the interfunction graph, not the plt entry func_to_addr_a = {} func_to_addr_b = {} for k, hook in self.project._sim_procedures.items(): if "resolves" in hook.kwargs: func_to_addr_a[hook.kwargs["resolves"]] = k for k, hook in self._p2._sim_procedures.items(): if "resolves" in hook.kwargs: func_to_addr_b[hook.kwargs["resolves"]] = k for name, addr in func_to_addr_a.items(): if name in func_to_addr_b: plt_matches.append((addr, func_to_addr_b[name])) # remove ones that aren't in the interfunction graph, because these seem to not be consistent all_funcs_a = set(self.cfg_a.kb.callgraph.nodes()) all_funcs_b = set(self.cfg_b.kb.callgraph.nodes()) plt_matches = [x for x in plt_matches if x[0] in all_funcs_a and x[1] in all_funcs_b] return plt_matches def _get_name_matches(self): names_to_addrs_a = defaultdict(list) for f in self.cfg_a.functions.values(): if not f.name.startswith("sub_"): names_to_addrs_a[f.name].append(f.addr) names_to_addrs_b = defaultdict(list) for f in self.cfg_b.functions.values(): if not f.name.startswith("sub_"): names_to_addrs_b[f.name].append(f.addr) name_matches = [] for name, addrs in names_to_addrs_a.items(): if name in names_to_addrs_b: for addr_a, addr_b in zip(addrs, names_to_addrs_b[name]): # if binary a and binary b have different numbers of functions with the same name, we will see them # in unmatched functions in the end. name_matches.append((addr_a, addr_b)) return name_matches def _compute_diff(self): # get the attributes for all functions self.attributes_a = self._compute_function_attributes(self.cfg_a) self.attributes_b = self._compute_function_attributes(self.cfg_b) # get the initial matches initial_matches = self._get_plt_matches() initial_matches += self._get_name_matches() initial_matches += self._get_function_matches(self.attributes_a, self.attributes_b) for a, b in initial_matches: l.debug("Initially matched (%#x, %#x)", a, b) # Use a queue so we process matches in the order that they are found to_process = deque(initial_matches) # Keep track of which matches we've already added to the queue processed_matches = {(x, y) for (x, y) in initial_matches} # Keep a dict of current matches, which will be updated if better matches are found matched_a = {} matched_b = {} for x, y in processed_matches: matched_a[x] = y matched_b[y] = x callgraph_a_nodes = set(self.cfg_a.kb.callgraph.nodes()) callgraph_b_nodes = set(self.cfg_b.kb.callgraph.nodes()) # while queue is not empty while to_process: (func_a, func_b) = to_process.pop() l.debug("Processing (%#x, %#x)", func_a, func_b) # we could find new matches in the successors or predecessors of functions if not self.project.loader.main_object.contains_addr(func_a): continue if not self._p2.loader.main_object.contains_addr(func_b): continue func_a_succ = self.cfg_a.kb.callgraph.successors(func_a) if func_a in callgraph_a_nodes else [] func_b_succ = self.cfg_b.kb.callgraph.successors(func_b) if func_b in callgraph_b_nodes else [] func_a_pred = self.cfg_a.kb.callgraph.predecessors(func_a) if func_a in callgraph_a_nodes else [] func_b_pred = self.cfg_b.kb.callgraph.predecessors(func_b) if func_b in callgraph_b_nodes else [] # get possible new matches new_matches = set( self._get_function_matches(self.attributes_a, self.attributes_b, func_a_succ, func_b_succ) ) new_matches |= set( self._get_function_matches(self.attributes_a, self.attributes_b, func_a_pred, func_b_pred) ) # could also find matches as function calls of matched basic blocks new_matches.update(self._get_call_site_matches(func_a, func_b)) # for each of the possible new matches add it if it improves the matching for x, y in new_matches: # skip none functions and syscalls func_a = self.cfg_a.kb.functions.function(x) if func_a is None or func_a.is_simprocedure or func_a.is_syscall: continue func_b = self.cfg_b.kb.functions.function(y) if func_b is None or func_b.is_simprocedure or func_b.is_syscall: continue if (x, y) not in processed_matches: processed_matches.add((x, y)) # if it's a better match than what we already have use it l.debug("Checking function match %s, %s", hex(x), hex(y)) if _is_better_match(x, y, matched_a, matched_b, self.attributes_a, self.attributes_b): l.debug("Adding potential match %s, %s", hex(x), hex(y)) if x in matched_a: old_match = matched_a[x] del matched_b[old_match] l.debug("Removing previous match (%#x, %#x)", x, old_match) if y in matched_b: old_match = matched_b[y] del matched_a[old_match] l.debug("Removing previous match (%#x, %#x)", old_match, y) matched_a[x] = y matched_b[y] = x to_process.appendleft((x, y)) # reformat matches into a set of pairs self.function_matches = set() for x, y in matched_a.items(): # only keep if the pair is in the binary ranges if self.project.loader.main_object.contains_addr(x) and self._p2.loader.main_object.contains_addr(y): self.function_matches.add((x, y)) # get the unmatched functions self._unmatched_functions_from_a = {x for x in self.attributes_a.keys() if x not in matched_a} self._unmatched_functions_from_b = {x for x in self.attributes_b.keys() if x not in matched_b} # remove unneeded function diffs for x, y in dict(self._function_diffs): if (x, y) not in self.function_matches: del self._function_diffs[(x, y)] @staticmethod def _get_function_matches(attributes_a, attributes_b, filter_set_a=None, filter_set_b=None): """ :param attributes_a: A dict of functions to their attributes :param attributes_b: A dict of functions to their attributes The following parameters are optional. :param filter_set_a: A set to limit attributes_a to the functions in this set. :param filter_set_b: A set to limit attributes_b to the functions in this set. :returns: A list of tuples of matching objects. """ # get the attributes that are in the sets if filter_set_a is None: filtered_attributes_a = {k: v for k, v in attributes_a.items()} else: filtered_attributes_a = {k: v for k, v in attributes_a.items() if k in filter_set_a} if filter_set_b is None: filtered_attributes_b = {k: v for k, v in attributes_b.items()} else: filtered_attributes_b = {k: v for k, v in attributes_b.items() if k in filter_set_b} # get closest closest_a = _get_closest_matches(filtered_attributes_a, filtered_attributes_b) closest_b = _get_closest_matches(filtered_attributes_b, filtered_attributes_a) # a match (x,y) is good if x is the closest to y and y is the closest to x matches = [] for a in closest_a: if len(closest_a[a]) == 1: match = closest_a[a][0] if len(closest_b[match]) == 1 and closest_b[match][0] == a: matches.append((a, match)) return matches
from angr.analyses import AnalysesHub AnalysesHub.register_default("BinDiff", BinDiff)