Source code for angr.engines.concrete

import logging
import threading

from angr.errors import AngrError
from .engine import SuccessorsMixin
from ..errors import SimConcreteRegisterError

l = logging.getLogger("angr.engines.concrete")
# l.setLevel(logging.DEBUG)

try:
    from angr_targets.concrete import ConcreteTarget
except ImportError:
    ConcreteTarget = None


[docs]class SimEngineConcrete(SuccessorsMixin): """ Concrete execution using a concrete target provided by the user. """
[docs] def __init__(self, project): if not ConcreteTarget: l.critical("Error, can't find angr_target project") raise AngrError l.info("Initializing SimEngineConcrete with ConcreteTarget provided.") super().__init__() self.project = project if isinstance(self.project.concrete_target, ConcreteTarget) and self.check_concrete_target_methods( self.project.concrete_target ): self.target = self.project.concrete_target else: l.warning("Error, you must provide an instance of a ConcreteTarget to initialize a SimEngineConcrete.") self.target = None raise NotImplementedError self.segment_registers_already_init = False
[docs] def process_successors( self, successors, extra_stop_points=None, memory_concretize=None, register_concretize=None, timeout=0, *args, **kwargs, ): new_state = self.state # setup the concrete process and resume the execution self.to_engine(new_state, extra_stop_points, memory_concretize, register_concretize, timeout) # sync angr with the current state of the concrete process using # the state plugin new_state.concrete.sync() successors.engine = "SimEngineConcrete" successors.sort = "SimEngineConcrete" successors.add_successor(new_state, new_state.ip, new_state.solver.true, new_state.unicorn.jumpkind) successors.description = "Concrete Successors" successors.processed = True
[docs] def to_engine(self, state, extra_stop_points, memory_concretize, register_concretize, timeout): """ Handle the concrete execution of the process This method takes care of: 1- Set the breakpoints on the addresses provided by the user 2- Concretize the symbolic variables and perform the write inside the concrete process 3- Continue the program execution. :param state: The state with which to execute :param extra_stop_points: list of a addresses where to stop the concrete execution and return to the simulated one :param memory_concretize: list of tuples (address, symbolic variable) that are going to be written in the concrete process memory. :param register_concretize: list of tuples (reg_name, symbolic variable) that are going to be written :param timeout: how long we should wait the concrete target to reach the breakpoint :return: None """ state.globals["symbion_timeout"] = False extra_stop_points = [] if extra_stop_points is None else extra_stop_points l.debug( "Entering in SimEngineConcrete: simulated address %#x concrete address %#x stop points %s", state.addr, self.target.read_register("pc"), map(hex, extra_stop_points), ) if memory_concretize: l.debug("SimEngineConcrete is concretizing memory variables before resuming the concrete process") for sym_var in memory_concretize: sym_var_address = state.solver.eval(sym_var[0]) sym_var_value = state.solver.eval(sym_var[1], cast_to=bytes) l.debug("Concretize memory at address %#x with value %s", sym_var_address, str(sym_var_value)) self.target.write_memory(sym_var_address, sym_var_value, raw=True) if register_concretize: l.debug("SimEngineConcrete is concretizing registers variables before resuming the concrete process") for reg in register_concretize: register_name = reg[0] register_value = state.solver.eval(reg[1]) l.debug("Concretize register %s with value %s", register_name, str(register_value)) self.target.write_register(register_name, register_value) # Set breakpoint on remote target for stop_point in extra_stop_points: l.debug("Setting breakpoints at %#x", stop_point) self.target.set_breakpoint(stop_point, hardware=True, temporary=True) if timeout > 0: l.debug("Found timeout as option, setting it up!") def timeout_handler(): self.target.stop() # stop the concrete target now! state.globals["symbion_timeout"] = True # this will end up in the timeout stash execution_timer = threading.Timer(timeout, timeout_handler) execution_timer.start() # start the timer! # resuming of the concrete process, if the target won't reach the # breakpoint specified by the user the timeout will abort angr execution. l.debug("SimEngineConcrete is resuming the concrete process") self.target.run() l.debug("SimEngineConcrete has successfully resumed the process") if state.globals["symbion_timeout"]: l.critical("Timeout has been reached during resuming of concrete process") l.critical( "This can be a bad thing ( the ConcreteTarget didn't hit your breakpoint ) or" "just it will take a while." ) # reset the alarm if timeout > 0: execution_timer.cancel() # removing all breakpoints set by the concrete target for stop_point in extra_stop_points: self.target.remove_breakpoint(stop_point) # handling the case in which the program stops at a point different than the breakpoints set # by the user. current_pc = self.target.read_register("pc") if current_pc not in extra_stop_points and not state.globals["symbion_timeout"]: l.critical("Stopped at unexpected location inside the concrete process: %#x", current_pc) raise AngrError
[docs] @staticmethod def check_concrete_target_methods(concrete_target): """ Check if the concrete target methods return the correct type of data :return: True if the concrete target is compliant """ entry_point = concrete_target.read_register("pc") if type(entry_point) is not int: l.error("read_register result type is %s, should be <type 'int'>", (type(entry_point))) return False mem_read = concrete_target.read_memory(entry_point, 0x4) if type(mem_read) is not bytes: l.error("read_memory result type is %s, should be <type 'bytes'>", (type(mem_read))) return False try: concrete_target.read_register("not_existent_reg") l.error("read_register should raise a SimConcreteRegisterError when accessing non existent registers") return False except SimConcreteRegisterError: l.debug("read_register raise a SimConcreteRegisterError, ok!") return True