Source code for angr.engines.unicorn

import copy
import functools
import logging

import archinfo

from ..errors import SimIRSBError, SimIRSBNoDecodeError, SimValueError
from .engine import SuccessorsMixin
from .vex.heavy.heavy import VEXEarlyExit
from .. import sim_options as o
from ..misc.ux import once
from ..state_plugins.inspect import BP_AFTER, BP_BEFORE
from ..state_plugins.unicorn_engine import STOP, _UC_NATIVE, unicorn as uc_module
from ..utils.constants import DEFAULT_STATEMENT

# pylint: disable=arguments-differ

l = logging.getLogger(name=__name__)


[docs]class SimEngineUnicorn(SuccessorsMixin): """ Concrete execution in the Unicorn Engine, a fork of qemu. Responds to the following parameters in the step stack: - step: How many basic blocks we want to execute - extra_stop_points: A collection of addresses at which execution should halt """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Cache of details of basic blocks containing statements that need to re-executed self._block_details_cache = {} # Addresses of basic blocks which native interface will not execute self._stop_block_addrs_cache = set() # Stop reasons to track and not switch to native interface for those basic blocks self._stop_reasons_to_track = STOP.unsupported_reasons | { STOP.STOP_STOPPOINT, STOP.STOP_ERROR, STOP.STOP_NODECODE, STOP.STOP_SYSCALL, STOP.STOP_EXECNONE, STOP.STOP_ZEROPAGE, STOP.STOP_NOSTART, STOP.STOP_SEGFAULT, STOP.STOP_ZERO_DIV, STOP.STOP_HLT, STOP.STOP_SYSCALL_ARM, STOP.STOP_X86_CPUID, }
def __getstate__(self): parent_ret = super().__getstate__() return (parent_ret, self._block_details_cache, self._stop_block_addrs_cache, self._stop_reasons_to_track) def __setstate__(self, args): super().__setstate__(args[0]) self._block_details_cache = args[1] self._stop_block_addrs_cache = args[2] self._stop_reasons_to_track = args[3] def __check(self, num_inst=None, **kwargs): # pylint: disable=unused-argument state = self.state if o.UNICORN not in state.options: l.debug("Unicorn-engine is not enabled.") return False if uc_module is None or _UC_NATIVE is None: if once("unicorn_install_warning"): l.error( "You are attempting to use unicorn engine support even though it or the angr native layer " "isn't installed" ) return False self.__countdown(state) # should the countdown still be updated if we're not stepping a whole block? # current decision: leave it updated, since we are moving forward if num_inst is not None: # we don't support single stepping with unicorn return False unicorn = state.unicorn # shorthand # if we have a concrete target we want the program to synchronize the segment # registers before, otherwise undefined behavior could happen. if state.project.concrete_target and self.project.arch.name in ("x86", "x86_64"): if not state.concrete.segment_registers_initialized: l.debug("segment register must be synchronized with the concrete target before using unicorn engine") return False if state.regs.ip.symbolic: l.debug("symbolic IP!") return False if unicorn.countdown_symbolic_stop > 0: l.info("not enough blocks since symbolic stop (%d more)", unicorn.countdown_symbolic_stop) return False if unicorn.countdown_unsupported_stop > 0: l.info( "not enough blocks since unsupported VEX statement/expression stop (%d more)", unicorn.countdown_unsupported_stop, ) return False if unicorn.countdown_nonunicorn_blocks > 0: l.info("not enough runs since last unicorn (%d)", unicorn.countdown_nonunicorn_blocks) return False if unicorn.countdown_stop_point > 0: l.info("not enough blocks since stop point (%d more)", unicorn.countdown_stop_point) elif o.UNICORN_SYM_REGS_SUPPORT not in state.options and not unicorn._check_registers(): l.info("failed register check") return False if state.addr in self._stop_block_addrs_cache: l.info("Block will likely not execute in native interface") return False return True @staticmethod def __countdown(state): state.unicorn.countdown_nonunicorn_blocks -= 1 state.unicorn.countdown_symbolic_stop -= 1 state.unicorn.countdown_unsupported_stop -= 1 state.unicorn.countdown_stop_point -= 1 def _execute_block_instrs_in_vex(self, block_details): if block_details["block_addr"] not in self._block_details_cache: vex_block = self._get_vex_block_details(block_details["block_addr"], block_details["block_size"]) self._block_details_cache[block_details["block_addr"]] = vex_block else: vex_block = self._block_details_cache[block_details["block_addr"]] # Save breakpoints for restoring later saved_mem_read_breakpoints = copy.copy(self.state.inspect._breakpoints["mem_read"]) saved_mem_write_breakpoints = copy.copy(self.state.inspect._breakpoints["mem_write"]) for reg_name, reg_value in block_details["registers"]: self.state.registers.store(reg_name, reg_value, inspect=False, disable_actions=True) self.state.scratch.set_tyenv(vex_block.tyenv) for stmt_entry in block_details["stmts"]: self._instr_mem_reads = list(stmt_entry["mem_dep"]) # pylint:disable=attribute-defined-outside-init if self._instr_mem_reads: # Insert breakpoint to set the correct memory read address self.state.inspect.b("mem_read", when=BP_BEFORE, action=self._set_correct_mem_read_addr) self.state.inspect.b("mem_write", when=BP_AFTER, action=self._save_mem_write_addrs) execute_default_exit = True # Execute handler from HeavyVEXMixin for the statement vex_stmt = vex_block.statements[stmt_entry["stmt_idx"]] self.stmt_idx = stmt_entry["stmt_idx"] # pylint:disable=attribute-defined-outside-init try: super()._handle_vex_stmt(vex_stmt) # pylint:disable=no-member except VEXEarlyExit: # Only one path is satisfiable in this branch. execute_default_exit = False # Restore breakpoints self.state.inspect._breakpoints["mem_read"] = copy.copy(saved_mem_read_breakpoints) self.state.inspect._breakpoints["mem_write"] = copy.copy(saved_mem_write_breakpoints) del self._instr_mem_reads if execute_default_exit and block_details["has_symbolic_exit"]: # Process block's default exit self.stmt_idx = DEFAULT_STATEMENT # pylint:disable=attribute-defined-outside-init super()._handle_vex_defaultexit(vex_block.next, vex_block.jumpkind) # pylint:disable=no-member # Restore breakpoints for succ_state in self.successors.successors: succ_state.inspect._breakpoints["mem_read"] = copy.copy(saved_mem_read_breakpoints) succ_state.inspect._breakpoints["mem_write"] = copy.copy(saved_mem_read_breakpoints) del self.stmt_idx def _execute_symbolic_instrs(self, syscall_data): recent_bbl_addrs = None stop_details = None self._instr_mem_write_addrs = set() # pylint:disable=attribute-defined-outside-init for block_details in self.state.unicorn._get_details_of_blocks_with_symbolic_vex_stmts(): self.state.scratch.guard = self.state.solver.true try: if self.state.os_name == "CGC" and block_details["block_addr"] in { self.state.unicorn.cgc_random_addr, self.state.unicorn.cgc_receive_addr, }: # Re-execute CGC syscall reg_vals = dict(block_details["registers"]) curr_regs = self.state.regs # If any regs are not present in the block details for re-execute, they are probably symbolic and so # were not saved in native interface. Use current register values in those cases: they should have # correct values right now. if block_details["block_addr"] == self.state.unicorn.cgc_receive_addr: # rx_bytes argument is set to 0 since we care about updating symbolic values only syscall_args = [ reg_vals.get("ebx", curr_regs.ebx), reg_vals.get("ecx", curr_regs.ecx), reg_vals.get("edx", curr_regs.edx), 0, ] syscall_simproc = self.state.project.simos.syscall_from_number(3, abi=None) syscall_simproc.arch = self.state.arch syscall_simproc.project = self.state.project syscall_simproc.state = self.state syscall_simproc.cc = self.state.project.simos.syscall_cc(self.state) ret_val = getattr(syscall_simproc, syscall_simproc.run_func)(*syscall_args) self.state.registers.store("eax", ret_val, inspect=False, disable_actions=True) elif block_details["block_addr"] == self.state.unicorn.cgc_random_addr: syscall_simproc = self.state.project.simos.syscall_from_number(7, abi=None) # rnd_bytes argument is set to 0 since we care about updating symbolic values only syscall_args = [reg_vals.get("ebx", curr_regs.ebx), reg_vals.get("ecx", curr_regs.ecx), 0] if o.UNICORN_HANDLE_CGC_RANDOM_SYSCALL in self.state.options: # Update concrete value before invoking syscall concrete_data = b"" curr_size = 0 max_size = self.state.solver.eval(syscall_args[1]) while curr_size != max_size: next_entry = syscall_data["random"].pop(0) curr_size = curr_size + next_entry[1] endianness = "little" if self.state.arch.memory_endness == "Iend_LE" else "big" concrete_data = concrete_data + next_entry[0].to_bytes(next_entry[1], endianness) else: concrete_data = None syscall_simproc.arch = self.state.arch syscall_simproc.project = self.state.project syscall_simproc.state = self.state syscall_simproc.cc = self.state.project.simos.syscall_cc(self.state) ret_val = getattr(syscall_simproc, syscall_simproc.run_func)(*syscall_args, concrete_data) self.state.registers.store("eax", ret_val, inspect=False, disable_actions=True) else: if block_details["has_symbolic_exit"]: curr_succs_count = len(self.successors.successors) if not recent_bbl_addrs: recent_bbl_addrs = self.state.unicorn.get_recent_bbl_addrs() if not stop_details: stop_details = self.state.unicorn.get_stop_details() self._execute_block_instrs_in_vex(block_details) if block_details["has_symbolic_exit"]: curr_succs = self.successors.successors if len(curr_succs) == curr_succs_count + 1: # There is only one newly added satisfiable successor state and so that is the state that # follows path being traced self.state = curr_succs[curr_succs_count] self.successors.flat_successors.remove(self.state) self.successors.all_successors.remove(self.state) self.successors.successors.remove(self.state) else: # There are multiple satisfiable states. Use the state's record of basic blocks executed # and block where native interface stopped to determine which state followed the path traced # till now next_block_on_path = None if block_details["block_hist_ind"] + 1 < len(recent_bbl_addrs): next_block_on_path = recent_bbl_addrs[block_details["block_hist_ind"] + 1] else: next_block_on_path = stop_details.block_addr for succ in curr_succs[curr_succs_count:]: if succ.addr == next_block_on_path: self.state = succ self.successors.flat_successors.remove(succ) self.successors.successors.remove(succ) break else: raise Exception("Multiple valid successor states found but none followed the trace!") except SimValueError as e: l.error(e) del self._instr_mem_write_addrs def _get_vex_block_details(self, block_addr, block_size): # Mostly based on the lifting code in HeavyVEXMixin # pylint:disable=no-member irsb = super().lift_vex(addr=block_addr, state=self.state, size=block_size) if irsb.size == 0: if irsb.jumpkind == "Ijk_NoDecode": if not self.state.project.is_hooked(irsb.addr): raise SimIRSBNoDecodeError( f"IR decoding error at 0x{irsb.addr:02x}. You can hook this instruction" " with a python replacement using project.hook" f"(0x{irsb.addr:02x}, your_function, length=length_of_instruction)." ) raise SimIRSBError("Block is hooked with custom code but original block was executed in unicorn") raise SimIRSBError(f"Empty IRSB found at 0x{irsb.addr:02x}.") return irsb def _set_correct_mem_read_addr(self, state): assert len(self._instr_mem_reads) != 0 mem_read_val = b"" mem_read_size = 0 mem_read_address = None mem_read_taint_map = [] while mem_read_size != state.inspect.mem_read_length and self._instr_mem_reads: next_val = self._instr_mem_reads.pop(0) if not mem_read_address: mem_read_address = next_val["address"] if next_val["symbolic"]: if next_val["address"] in self._instr_mem_write_addrs: # This address was modified during re-execution. Ignore taint reported by native interface mem_read_taint_map.append(-1) else: mem_read_taint_map.append(1) else: mem_read_taint_map.append(0) mem_read_size += 1 mem_read_val += next_val["value"] assert state.inspect.mem_read_length == mem_read_size state.inspect.mem_read_address = state.solver.BVV(mem_read_address, state.inspect.mem_read_address.size()) if mem_read_taint_map.count(-1) != mem_read_size: # Since read is might need bitmap adjustment, insert breakpoint to return the correct concrete value self.state.inspect.b( "mem_read", when=BP_AFTER, action=functools.partial( self._set_correct_mem_read_val, value=mem_read_val, taint_map=mem_read_taint_map ), ) def _set_correct_mem_read_val(self, state, value, taint_map): # pylint: disable=no-self-use state.inspect._breakpoints["mem_read"].pop() if taint_map.count(0) == state.inspect.mem_read_length: # The value is completely concrete if state.arch.memory_endness == archinfo.Endness.LE: state.inspect.mem_read_expr = state.solver.BVV(value[::-1]) else: state.inspect.mem_read_expr = state.solver.BVV(value) else: # The value may be partially concrete. Set the symbolic bitmap to read correct value and restore it mem_read_addr = state.solver.eval(state.inspect.mem_read_address) mem_read_len = state.inspect.mem_read_length saved_taints = [] for offset in range(mem_read_len): page_num, page_off = state.memory._divide_addr(mem_read_addr + offset) page_obj = state.memory._get_page(page_num, writing=False) saved_taints.append(page_obj.symbolic_bitmap[page_off]) restore_taints = False if saved_taints != taint_map: # Symbolic bitmap needs fixing before reading value from memory. restore_taints = True for offset, expected_taint in enumerate(taint_map): if expected_taint != -1: page_num, page_off = state.memory._divide_addr(mem_read_addr + offset) page_obj = state.memory._get_page(page_num, writing=False) page_obj.symbolic_bitmap[page_off] = expected_taint curr_value = state.memory.load( mem_read_addr, mem_read_len, endness=state.arch.memory_endness, inspect=False, disable_actions=True ) if restore_taints: for offset, saved_taint in enumerate(saved_taints): page_num, page_off = state.memory._divide_addr(mem_read_addr + offset) page_obj = state.memory._get_page(page_num, writing=False) page_obj.symbolic_bitmap[page_off] = saved_taint if taint_map.count(0) != 0: # Update concrete bytes using values reported by native interface curr_value_bytes = curr_value.chop(8) if state.arch.memory_endness == archinfo.Endness.LE: curr_value_bytes.reverse() for offset, expected_taint in enumerate(taint_map): if expected_taint == 0: curr_value_bytes[offset] = state.solver.BVV(value[offset], 8) if state.arch.memory_endness == archinfo.Endness.LE: curr_value_bytes = reversed(curr_value_bytes) curr_value = state.solver.Concat(*curr_value_bytes) state.inspect.mem_read_expr = curr_value def _save_mem_write_addrs(self, state): mem_write_addr = state.solver.eval(state.inspect.mem_write_address) self._instr_mem_write_addrs.update(range(mem_write_addr, mem_write_addr + state.inspect.mem_write_length))
[docs] def process_successors(self, successors, **kwargs): state = self.state if not self.__check(**kwargs): return super().process_successors(successors, **kwargs) extra_stop_points = kwargs.get("extra_stop_points", None) last_block_details = kwargs.get("last_block_details", None) step = kwargs.get("step", None) if extra_stop_points is None: extra_stop_points = set(self.project._sim_procedures) else: # convert extra_stop_points to a set extra_stop_points = set(extra_stop_points) extra_stop_points.update(self.project._sim_procedures) if successors.addr in extra_stop_points: # trying to start unicorn execution on a stop point, fallback to next engine return super().process_successors(successors, **kwargs) successors.sort = "Unicorn" # add all instruction breakpoints as extra_stop_points if state.supports_inspect: for bp in state.inspect._breakpoints["instruction"]: # if there is an instruction breakpoint on every instruction, it does not make sense # to use unicorn. if "instruction" not in bp.kwargs: l.info("disabling unicorn because of breakpoint on every instruction") return super().process_successors(successors, **kwargs) # add the breakpoint to extra_stop_points. We don't care if the breakpoint is BP_BEFORE or # BP_AFTER, this is only to stop unicorn when we get near a breakpoint. The breakpoint itself # will then be handled by another engine that can more accurately step instruction-by-instruction. extra_stop_points.add(bp.kwargs["instruction"]) # initialize unicorn plugin try: syscall_data = kwargs["syscall_data"] if "syscall_data" in kwargs else None fd_bytes = kwargs["fd_bytes"] if "fd_bytes" in kwargs else None state.unicorn.setup(syscall_data=syscall_data, fd_bytes=fd_bytes) except SimValueError: # it's trying to set a symbolic register somehow # fail out, force fallback to next engine return super().process_successors(successors, **kwargs) try: state.unicorn.set_stops(extra_stop_points) if last_block_details is not None: state.unicorn.set_last_block_details(last_block_details) state.unicorn.set_tracking( track_bbls=o.UNICORN_TRACK_BBL_ADDRS in state.options, track_stack=o.UNICORN_TRACK_STACK_POINTERS in state.options, ) state.unicorn.hook() state.unicorn.start(step=step) self._execute_symbolic_instrs(syscall_data=syscall_data) state.unicorn.finish(self.state) finally: state.unicorn.destroy(self.state) state = self.state if state.unicorn.stop_reason in self._stop_reasons_to_track: if state.unicorn.steps == 0: self._stop_block_addrs_cache.add(state.addr) else: self._stop_block_addrs_cache.add(state.unicorn.stop_details.block_addr) if state.unicorn.steps == 0 or state.unicorn.stop_reason == STOP.STOP_NOSTART: # fail out, force fallback to next engine # TODO: idk what the consequences of this might be. If this failed step can actually change non-unicorn # state then this is bad news. return super().process_successors(successors, **kwargs) description = f"Unicorn ({STOP.name_stop(state.unicorn.stop_reason)} after {state.unicorn.steps} steps)" state.history.recent_block_count += state.unicorn.steps state.history.recent_description = description # this can be expensive, so check first if state.supports_inspect: for bp in state.inspect._breakpoints["irsb"]: if bp.check(state, BP_AFTER): for bbl_addr in state.history.recent_bbl_addrs: state._inspect("irsb", BP_AFTER, address=bbl_addr) break if state.unicorn.stop_reason in ( STOP.symbolic_stop_reasons | STOP.unsupported_reasons ) or state.unicorn.stop_reason in (STOP.STOP_UNKNOWN_MEMORY_WRITE_SIZE, STOP.STOP_VEX_LIFT_FAILED): l.info(state.unicorn.stop_message) if state.unicorn.jumpkind.startswith("Ijk_Sys"): state.ip = state.unicorn._syscall_pc successors.add_successor(state, state.ip, state.solver.true, state.unicorn.jumpkind) successors.description = description successors.processed = True return None