Source code for angr.analyses.congruency_check

import logging

import claripy

from . import Analysis

l = logging.getLogger(name=__name__)
# l.setLevel(logging.DEBUG)


[docs]class CongruencyCheck(Analysis): """ This is an analysis to ensure that angr executes things identically with different execution backends (i.e., unicorn vs vex). """
[docs] def __init__(self, throw=False): """ Initializes a CongruencyCheck analysis. :param throw: whether to raise an exception if an incongruency is found. """ self._throw = throw self.simgr = None self.prev_pg = None
[docs] def set_state_options( self, left_add_options=None, left_remove_options=None, right_add_options=None, right_remove_options=None ): """ Checks that the specified state options result in the same states over the next `depth` states. """ s_right = self.project.factory.full_init_state( add_options=right_add_options, remove_options=right_remove_options, args=[], ) s_left = self.project.factory.full_init_state( add_options=left_add_options, remove_options=left_remove_options, args=[], ) return self.set_states(s_left, s_right)
[docs] def set_states(self, left_state, right_state): """ Checks that the specified paths stay the same over the next `depth` states. """ simgr = self.project.factory.simulation_manager(right_state) simgr.stash(to_stash="right") simgr.active.append(left_state) simgr.stash(to_stash="left") simgr.stash(to_stash="stashed_left") simgr.stash(to_stash="stashed_right") return self.set_simgr(simgr)
[docs] def set_simgr(self, simgr): self.simgr = simgr return self
@staticmethod def _sync_steps(simgr, max_steps=None): l.debug("Sync-stepping pathgroup...") l.debug( "... left width: %s, right width: %s", simgr.left[0].history.block_count if len(simgr.left) > 0 else None, simgr.right[0].history.block_count if len(simgr.right) > 0 else None, ) if len(simgr.errored) != 0 and (len(simgr.left) == 0 or len(simgr.right) == 0): l.debug("... looks like a path errored") return simgr if len(simgr.left) == 0 and len(simgr.right) != 0: l.debug("... left is deadended; stepping right %s times", max_steps) npg = simgr.run(stash="right", n=max_steps) elif len(simgr.right) == 0 and len(simgr.left) != 0: l.debug("... right is deadended; stepping left %s times", max_steps) npg = simgr.run(stash="left", n=max_steps) elif len(simgr.right) == 0 and len(simgr.left) == 0: l.debug("... both deadended.") return simgr elif simgr.left[0].history.block_count == simgr.right[0].history.block_count: l.debug("... synced") return simgr elif simgr.left[0].history.block_count < simgr.right[0].history.block_count: l.debug( "... right is ahead; stepping left %s times", simgr.right[0].history.block_count - simgr.left[0].history.block_count, ) npg = simgr.run( stash="left", until=lambda lpg: lpg.left[0].history.block_count >= simgr.right[0].history.block_count, n=max_steps, ) elif simgr.right[0].history.block_count < simgr.left[0].history.block_count: l.debug( "... left is ahead; stepping right %s times", simgr.left[0].history.block_count - simgr.right[0].history.block_count, ) npg = simgr.run( stash="right", until=lambda lpg: lpg.right[0].history.block_count >= simgr.left[0].history.block_count, n=max_steps, ) return CongruencyCheck._sync_steps(npg) def _validate_incongruency(self): """ Checks that a detected incongruency is not caused by translation backends having a different idea of what constitutes a basic block. """ ot = self._throw try: self._throw = False l.debug("Validating incongruency.") if ("UNICORN" in self.simgr.right[0].options) ^ ("UNICORN" in self.simgr.left[0].options): if "UNICORN" in self.simgr.right[0].options: unicorn_stash = "right" normal_stash = "left" else: unicorn_stash = "left" normal_stash = "right" unicorn_path = self.simgr.stashes[unicorn_stash][0] normal_path = self.simgr.stashes[normal_stash][0] if unicorn_path.arch.name in ("X86", "AMD64"): # unicorn "falls behind" on loop and rep instructions, since # it sees them as ending a basic block. Here, we will # step the unicorn until it's caught up npg = self.project.factory.simulation_manager(unicorn_path) npg.explore(find=lambda p: p.addr == normal_path.addr, n=200) if len(npg.found) == 0: l.debug("Validator failed to sync paths.") return True new_unicorn = npg.found[0] delta = new_unicorn.history.block_count - normal_path.history.block_count normal_path.history.recent_block_count += delta new_normal = normal_path elif unicorn_path.arch.name == "MIPS32": # unicorn gets ahead here, because VEX falls behind for unknown reasons # for example, this block: # # 0x1016f20: lui $gp, 0x17 # 0x1016f24: addiu $gp, $gp, -0x35c0 # 0x1016f28: addu $gp, $gp, $t9 # 0x1016f2c: addiu $sp, $sp, -0x28 # 0x1016f30: sw $ra, 0x24($sp) # 0x1016f34: sw $s0, 0x20($sp) # 0x1016f38: sw $gp, 0x10($sp) # 0x1016f3c: lw $v0, -0x6cf0($gp) # 0x1016f40: move $at, $at npg = self.project.factory.simulation_manager(normal_path) npg.explore(find=lambda p: p.addr == unicorn_path.addr, n=200) if len(npg.found) == 0: l.debug("Validator failed to sync paths.") return True new_normal = npg.found[0] delta = new_normal.history.block_count - unicorn_path.history.block_count unicorn_path.history.recent_block_count += delta new_unicorn = unicorn_path else: l.debug("Dunno!") return True if self.compare_paths(new_unicorn, new_normal): l.debug("Divergence accounted for by unicorn.") self.simgr.stashes[unicorn_stash][0] = new_unicorn self.simgr.stashes[normal_stash][0] = new_normal return False else: l.warning("Divergence unaccounted for by unicorn.") return True else: # no idea l.warning("Divergence unaccounted for.") return True finally: self._throw = ot def _report_incongruency(self, *args): l.warning(*args) if self._throw: raise AngrIncongruencyError(*args)
[docs] def run(self, depth=None): """ Checks that the paths in the specified path group stay the same over the next `depth` bytes. The path group should have a "left" and a "right" stash, each with a single path. """ # pg_history = [ ] if len(self.simgr.right) != 1 or len(self.simgr.left) != 1: self._report_incongruency("Single path in pg.left and pg.right required.") return False if "UNICORN" in self.simgr.one_right.options and depth is not None: self.simgr.one_right.unicorn.max_steps = depth if "UNICORN" in self.simgr.one_left.options and depth is not None: self.simgr.one_left.unicorn.max_steps = depth l.debug("Performing initial path comparison.") if not self.compare_paths(self.simgr.left[0], self.simgr.right[0]): self._report_incongruency("Initial path comparison check failed.") return False while len(self.simgr.left) > 0 and len(self.simgr.right) > 0: if depth is not None: self._update_progress(100.0 * float(self.simgr.one_left.history.block_count) / depth) if len(self.simgr.deadended) != 0: self._report_incongruency("Unexpected deadended paths before step.") return False if len(self.simgr.right) == 0 and len(self.simgr.left) == 0: l.debug("All done!") return True if len(self.simgr.right) != 1 or len(self.simgr.left) != 1: self._report_incongruency("Different numbers of paths in left and right stash..") return False # do a step l.debug("Stepping right path with weighted length %d/%d", self.simgr.right[0].history.block_count, depth) self.prev_pg = self.simgr.copy() # pylint:disable=unused-variable self.simgr.step(stash="right") CongruencyCheck._sync_steps(self.simgr) if len(self.simgr.errored) != 0: self._report_incongruency("Unexpected errored paths.") return False try: if not self.compare_path_group(self.simgr) and self._validate_incongruency(): self._report_incongruency("Path group comparison failed.") return False except AngrIncongruencyError: if self._validate_incongruency(): raise if depth is not None: self.simgr.drop(stash="left", filter_func=lambda p: p.history.block_count >= depth) self.simgr.drop(stash="right", filter_func=lambda p: p.history.block_count >= depth) self.simgr.right.sort(key=lambda p: p.addr) self.simgr.left.sort(key=lambda p: p.addr) self.simgr.stashed_right[:] = self.simgr.stashed_right[::-1] self.simgr.stashed_left[:] = self.simgr.stashed_left[::-1] self.simgr.move("stashed_right", "right") self.simgr.move("stashed_left", "left") if len(self.simgr.left) > 1: self.simgr.split(from_stash="left", limit=1, to_stash="stashed_left") self.simgr.split(from_stash="right", limit=1, to_stash="stashed_right")
[docs] def compare_path_group(self, pg): if len(pg.left) != len(pg.right): self._report_incongruency("Number of left and right paths differ.") return False if len(pg.deadended) % 2 != 0: self._report_incongruency("Odd number of deadended paths after step.") return False pg.drop(stash="deadended") if len(pg.left) == 0 and len(pg.right) == 0: return True # make sure the paths are the same for pl, pr in zip(sorted(pg.left, key=lambda p: p.addr), sorted(pg.right, key=lambda p: p.addr)): if not self.compare_paths(pl, pr): self._report_incongruency("Differing paths.") return False return True
[docs] def compare_states(self, sl, sr): """ Compares two states for similarity. """ joint_solver = claripy.Solver() # make sure the canonicalized constraints are the same n_map, n_counter, n_canon_constraint = claripy.And( *sr.solver.constraints ).canonicalize() # pylint:disable=no-member u_map, u_counter, u_canon_constraint = claripy.And( *sl.solver.constraints ).canonicalize() # pylint:disable=no-member if n_canon_constraint is not u_canon_constraint: # https://github.com/Z3Prover/z3/issues/2359 # don't try to simplify unless we really need to, as it can introduce serious nondeterminism n_canoner_constraint = sr.solver.simplify(n_canon_constraint) u_canoner_constraint = sl.solver.simplify(u_canon_constraint) else: n_canoner_constraint = u_canoner_constraint = n_canon_constraint joint_solver.add((n_canoner_constraint, u_canoner_constraint)) if n_canoner_constraint is not u_canoner_constraint: # extra check: are these two constraints equivalent? tmp_solver = claripy.Solver() a = tmp_solver.satisfiable(extra_constraints=(n_canoner_constraint == u_canoner_constraint,)) b = tmp_solver.satisfiable(extra_constraints=(n_canoner_constraint != u_canoner_constraint,)) if not (a is True and b is False): self._report_incongruency("Different constraints!") return False # get the differences in registers and memory mem_diff = sr.memory.changed_bytes(sl.memory) reg_diff = sr.registers.changed_bytes(sl.registers) # this is only for unicorn if "UNICORN" in sl.options or "UNICORN" in sr.options: if sl.arch.name == "X86": reg_diff -= set(range(40, 52)) # ignore cc psuedoregisters reg_diff -= set(range(320, 324)) # some other VEX weirdness reg_diff -= set(range(340, 344)) # ip_at_syscall elif sl.arch.name == "AMD64": reg_diff -= set(range(144, 168)) # ignore cc psuedoregisters # make sure the differences in registers and memory are actually just renamed # versions of the same ASTs for diffs, (um, nm) in ( (reg_diff, (sl.registers, sr.registers)), (mem_diff, (sl.memory, sr.memory)), ): for i in diffs: bn = nm.load(i, 1) bu = um.load(i, 1) bnc = bn.canonicalize(var_map=n_map, counter=n_counter)[-1] buc = bu.canonicalize(var_map=u_map, counter=u_counter)[-1] if bnc is not buc: self._report_incongruency("Different memory or registers (index %d, values %r and %r)!", i, bn, bu) return False # make sure the flags are the same if sl.arch.name in ("AMD64", "X86", "ARM", "ARMEL", "ARMHF", "AARCH64"): # pylint: disable=unused-variable sr.regs.cc_op, sr.regs.cc_dep1, sr.regs.cc_dep2, sr.regs.cc_ndep # n_bkp sl.regs.cc_op, sl.regs.cc_dep1, sl.regs.cc_dep2, sl.regs.cc_ndep # u_bkp if sl.arch.name in ("AMD64", "X86"): n_flags = sr.regs.eflags.canonicalize(var_map=n_map, counter=n_counter)[-1] u_flags = sl.regs.eflags.canonicalize(var_map=u_map, counter=u_counter)[-1] else: n_flags = sr.regs.flags.canonicalize(var_map=n_map, counter=n_counter)[-1] u_flags = sl.regs.flags.canonicalize(var_map=u_map, counter=u_counter)[-1] if n_flags is not u_flags and sl.solver.simplify(n_flags) is not sr.solver.simplify(u_flags): self._report_incongruency("Different flags!") return False return True
[docs] def compare_paths(self, pl, pr): l.debug("Comparing paths...") if not self.compare_states(pl, pr): self._report_incongruency("Failed state similarity check!") return False if pr.history.block_count != pl.history.block_count: self._report_incongruency("Different weights!") return False if pl.addr != pr.addr: self._report_incongruency("Different addresses!") return False return True
from ..errors import AngrIncongruencyError from angr.analyses import AnalysesHub AnalysesHub.register_default("CongruencyCheck", CongruencyCheck)