Source code for angr.analyses.reaching_definitions.function_handler

from typing import TYPE_CHECKING, List, Set, Optional, Union
from dataclasses import dataclass, field
import logging

from cle import Symbol
from cle.backends import ELF

from angr.storage.memory_mixins.paged_memory.pages.multi_values import MultiValues
from angr.sim_type import SimTypeBottom
from angr.knowledge_plugins.key_definitions.atoms import Atom, Register, MemoryLocation, SpOffset
from angr.calling_conventions import SimCC
from angr.sim_type import SimTypeFunction
from angr.knowledge_plugins.key_definitions.definition import Definition
from angr.knowledge_plugins.functions import Function
from angr.analyses.reaching_definitions.dep_graph import FunctionCallRelationships
from angr.code_location import CodeLocation


if TYPE_CHECKING:
    from angr.analyses.reaching_definitions.rd_state import ReachingDefinitionsState
    from angr.analyses.reaching_definitions.reaching_definitions import ReachingDefinitionsAnalysis

l = logging.getLogger(__name__)


[docs]@dataclass class FunctionEffect: """ A single effect that a function summary may apply to the state. This is largely an implementation detail; use `FunctionCallData.depends` instead. """ dest: Optional[Atom] sources: Set[Atom] value: Optional[MultiValues] = None sources_defns: Optional[Set[Definition]] = None apply_at_callsite: bool = False
[docs]@dataclass class FunctionCallData: """ A bundle of intermediate data used when computing the sum effect of a function during ReachingDefinitionsAnalysis. RDA engine contract: - Construct one of these before calling `FunctionHandler.handle_function`. Fill it with as many fields as you can realistically provide without duplicating effort. - Provide `callsite_codeloc` as either the call statement (AIL) or the default exit of the default statement of the calling block (VEX) - Provide `function_codeloc` as the callee address with `stmt_idx=0``. Function handler contract: - If redefine_locals is unset, do not adjust any artifacts of the function call abstration, such as the stack pointer, the caller saved registers, etc. - If caller_will_handle_single_ret is set, and there is a single entry in `ret_atoms`, do not apply to the state effects modifying this atom. Instead, set `ret_values` and `ret_values_deps` to the values and deps which are used constructing these values. """ callsite_codeloc: CodeLocation function_codeloc: CodeLocation address_multi: Optional[MultiValues] address: Optional[int] = None symbol: Optional[Symbol] = None function: Optional[Function] = None name: Optional[str] = None cc: Optional[SimCC] = None prototype: Optional[SimTypeFunction] = None args_atoms: Optional[List[Set[Atom]]] = None args_values: Optional[List[MultiValues]] = None ret_atoms: Optional[Set[Atom]] = None redefine_locals: bool = True visited_blocks: Optional[Set[int]] = None effects: List[FunctionEffect] = field(default_factory=lambda: []) ret_values: Optional[MultiValues] = None ret_values_deps: Optional[Set[Definition]] = None caller_will_handle_single_ret: bool = False guessed_cc: bool = False guessed_prototype: bool = False retaddr_popped: bool = False
[docs] def has_clobbered(self, dest: Atom) -> bool: """ Determines whether the given atom already has effects applied """ if isinstance(dest, Register): for effect in self.effects: if not isinstance(effect.dest, Register): continue reg = effect.dest if dest.reg_offset + dest.size <= reg.reg_offset or dest.reg_offset >= reg.reg_offset + reg.size: # no overlap continue return True return False if isinstance(dest, MemoryLocation) and isinstance(dest.addr, SpOffset): for effect in self.effects: if not isinstance(effect.dest, MemoryLocation) or not isinstance(effect.dest.addr, SpOffset): continue stkarg = effect.dest if ( dest.addr.offset + dest.size <= stkarg.addr.offset or stkarg.addr.offset + stkarg.size <= dest.addr.offset ): # no overlap continue return True return False # unsupported return False
[docs] def depends( self, dest: Optional[Atom], *sources: Atom, value: Optional[MultiValues] = None, apply_at_callsite: bool = False, ): """ Mark a single effect of the current function, including the atom being modified, the input atoms on which that output atom depends, the precise (or imprecise!) value to store, and whether the effect should be applied during the function or afterwards, at the callsite. The atom being modified may be None to mark uses of the source atoms which do not have any explicit sinks. """ if dest is not None and self.has_clobbered(dest): l.warning( "Function handler for %s seems to be implemented incorrectly - " "you're supposed to call depends() exactly once per dependant atom", self.address, ) else: self.effects.append(FunctionEffect(dest, set(sources), value=value, apply_at_callsite=apply_at_callsite))
# pylint: disable=unused-argument, no-self-use
[docs]class FunctionHandler: """ A mechanism for summarizing a function call's effect on a program for ReachingDefinitionsAnalysis. """
[docs] def hook(self, analysis: "ReachingDefinitionsAnalysis") -> "FunctionHandler": """ Attach this instance of the function handler to an instance of RDA. """ return self
[docs] def make_function_codeloc( self, target: Union[None, int, MultiValues], callsite: CodeLocation, callsite_func_addr: Optional[int] ): """ The RDA engine will call this function to transform a callsite CodeLocation into a callee CodeLocation. """ if isinstance(target, MultiValues): target_bv = target.one_value() if target_bv is not None and target_bv.op == "BVV": target_int = target_bv.args[0] else: target_int = None else: target_int = target if callsite.context is None: return CodeLocation(target_int, stmt_idx=None, context=None) elif type(callsite.context) is tuple and callsite_func_addr is not None: return CodeLocation(target_int, stmt_idx=None, context=(callsite.block_addr,) + callsite.context) else: raise TypeError( "Please implement FunctionHandler.make_function_codeloc for your special context sensitivity" )
[docs] def handle_function(self, state: "ReachingDefinitionsState", data: FunctionCallData): """ The main entry point for the function handler. Called with a RDA state and a FunctionCallData, it is expected to update the state and the data as per the contracts described on FunctionCallData. You can override this method to take full control over how data is processed, or override any of the following to use the higher-level interface (data.depends()): - `handle_impl_<function name>` - `handle_local_function` - `handle_external_function` - `handle_indirect_function` - `handle_generic_function` Each of them take the same signature as `handle_function`. """ # META assert state.analysis is not None assert state.analysis.project.loader.main_object is not None if data.address is None and data.address_multi is not None: for vs in data.address_multi.values(): for val in vs: if val is not None and val.op == "BVV": data.address = val.args[0] break if data.address is not None: break if data.symbol is None and data.address is not None: data.symbol = state.analysis.project.loader.find_symbol(data.address) if data.function is None and data.address is not None: data.function = state.analysis.project.kb.functions.get(data.address, None) if data.name is None and data.function is not None: data.name = data.function.name if data.name is None and data.symbol is not None: data.name = data.symbol.name if data.cc is None and data.function is not None: data.cc = data.function.calling_convention if data.prototype is None and data.function is not None: data.prototype = data.function.prototype if data.address is not None and (data.cc is None or data.prototype is None): hook = ( None if not state.analysis.project.is_hooked(data.address) else state.analysis.project.hooked_by(data.address) ) if ( hook is None and isinstance(state.analysis.project.loader.main_object, ELF) and data.address in state.analysis.project.loader.main_object.reverse_plt ): plt_name = state.analysis.project.loader.main_object.reverse_plt[data.address] if state.analysis.project.loader.find_symbol(plt_name) is not None: hook = state.analysis.project.symbol_hooked_by(plt_name) if data.cc is None and hook is not None: data.cc = hook.cc if data.prototype is None and hook is not None: data.prototype = hook.prototype.with_arch(state.arch) data.guessed_prototype = hook.guessed_prototype # fallback to the default calling convention and prototype if data.cc is None: data.cc = state.analysis.project.factory.cc() data.guessed_cc = True if data.prototype is None: data.prototype = state.analysis.project.factory.function_prototype() data.guessed_prototype = True args_atoms_from_values = set() if data.args_atoms is None and data.args_values is not None: data.args_atoms = [ set().union( *({defn.atom for defn in state.extract_defs(value)} for values in mv.values() for value in values) ) for mv in data.args_values ] for atoms_set in data.args_atoms: args_atoms_from_values |= atoms_set elif data.args_atoms is None and data.cc is not None and data.prototype is not None: data.args_atoms = self.c_args_as_atoms(state, data.cc, data.prototype) if data.ret_atoms is None and data.cc is not None and data.prototype is not None: if data.prototype.returnty is not None: data.ret_atoms = self.c_return_as_atoms(state, data.cc, data.prototype) # PROCESS state.move_codelocs(data.function_codeloc) if data.name is not None and hasattr(self, f"handle_impl_{data.name}"): handler = getattr(self, f"handle_impl_{data.name}") elif data.address is not None: if (data.symbol is None and state.analysis.project.loader.main_object.contains_addr(data.address)) or ( data.symbol is not None and data.symbol.owner is state.analysis.project.loader.main_object ): handler = self.handle_local_function else: handler = self.handle_external_function else: handler = self.handle_indirect_function handler(state, data) # a call expression does not overwrite or redefine any local registers if data.redefine_locals: if data.cc is not None: for reg in self.caller_saved_regs_as_atoms(state, data.cc): if not data.has_clobbered(reg): data.depends(reg) if state.arch.call_pushes_ret and not data.retaddr_popped: sp_atom = self.stack_pointer_as_atom(state) if not data.has_clobbered(sp_atom): # let the user override the stack pointer if they want new_sp = None sp_val = state.live_definitions.get_value_from_atom(sp_atom) if sp_val is not None: one_sp_val = sp_val.one_value() if one_sp_val is not None: # call_sp_fix is the sp movement after the call instruction executes, which means it is # usually a negative number if the stack grows towards a lower address. when we return, # we should subtract this negative number from the current stack pointer to keep the stack # balanced. new_sp = MultiValues(one_sp_val - state.arch.call_sp_fix) data.depends(sp_atom, value=new_sp) # OUTPUT args_defns = [ set().union(*(state.get_definitions(atom) for atom in atoms)) for atoms in (data.args_atoms or set()) ] all_args_defns = set().union(*args_defns) other_input_defns = set() ret_defns = set() other_output_defns = set() # translate all the dep atoms into dep defns for effect in data.effects: if effect.sources_defns is None and effect.sources: effect.sources_defns = set().union(*(set(state.get_definitions(atom)) for atom in effect.sources)) other_input_defns |= effect.sources_defns - all_args_defns # apply the effects, with the ones marked with apply_at_callsite=False applied first for effect in sorted(data.effects, key=lambda effect: effect.apply_at_callsite): codeloc = data.callsite_codeloc if effect.apply_at_callsite else data.function_codeloc state.move_codelocs(codeloc) # no-op if duplicated # mark uses for source in effect.sources_defns or set(): if source.atom not in args_atoms_from_values: state.add_use_by_def(source, expr=None) if effect.dest is None: continue value = effect.value if effect.value is not None else MultiValues(state.top(effect.dest.bits)) # special case: if there is exactly one ret atom, we expect that the caller will do something # with the value, e.g. if this is a call expression. if data.caller_will_handle_single_ret and data.ret_atoms == {effect.dest}: data.ret_values = value data.ret_values_deps = effect.sources_defns else: # mark definition mv, defs = state.kill_and_add_definition( effect.dest, value, uses=effect.sources_defns or set(), ) # categorize the output defn as either ret or other based on the atoms for defn in defs: if data.ret_atoms is not None and defn.atom not in data.ret_atoms: other_output_defns.add(defn) else: ret_defns.add(defn) # record this callsite state.analysis.function_calls[data.callsite_codeloc] = FunctionCallRelationships( callsite=data.callsite_codeloc, target=data.address, args_defns=args_defns, other_input_defns=other_input_defns, ret_defns=ret_defns, other_output_defns=other_output_defns, ) # move the current codeloc back to the callsite state.move_codelocs(data.callsite_codeloc)
[docs] def handle_generic_function(self, state: "ReachingDefinitionsState", data: FunctionCallData): assert data.cc is not None assert data.prototype is not None if data.prototype.returnty is not None: data.ret_values = MultiValues(state.top(data.prototype.returnty.size)) if data.guessed_prototype: # use all! # TODO should we use some number of stack variables as well? if data.ret_atoms is not None: for ret_atom in data.ret_atoms: data.depends( ret_atom, *(Register(*state.arch.registers[reg_name], arch=state.arch) for reg_name in data.cc.ARG_REGS), apply_at_callsite=True, ) else: sources = {atom for arg in data.args_atoms or [] for atom in arg} if not data.ret_atoms: data.depends(None, *sources, apply_at_callsite=True) # controversial return for atom in data.ret_atoms: data.depends(atom, *sources, apply_at_callsite=True)
handle_indirect_function = handle_generic_function handle_local_function = handle_generic_function handle_external_function = handle_generic_function
[docs] @staticmethod def c_args_as_atoms(state: "ReachingDefinitionsState", cc: SimCC, prototype: SimTypeFunction) -> List[Set[Atom]]: if not prototype.variadic: sp_value = state.get_one_value(Register(state.arch.sp_offset, state.arch.bytes)) sp = state.get_stack_offset(sp_value) if sp_value is not None else None atoms = [] for arg in cc.arg_locs(prototype): atoms_set = set() for footprint_arg in arg.get_footprint(): try: atom = Atom.from_argument( footprint_arg, state.arch, full_reg=True, sp=sp, ) except ValueError: continue atoms_set.add(atom) atoms.append(atoms_set) return atoms return [{Register(*state.arch.registers[arg_name], arch=state.arch)} for arg_name in cc.ARG_REGS]
[docs] @staticmethod def c_return_as_atoms(state: "ReachingDefinitionsState", cc: SimCC, prototype: SimTypeFunction) -> Set[Atom]: if prototype.returnty is not None and not isinstance(prototype.returnty, SimTypeBottom): retval = cc.return_val(prototype.returnty) if retval is not None: return { Atom.from_argument(footprint_arg, state.arch, full_reg=True) for footprint_arg in retval.get_footprint() } return set()
[docs] @staticmethod def caller_saved_regs_as_atoms(state: "ReachingDefinitionsState", cc: SimCC) -> Set[Register]: return ( {Register(*state.arch.registers[reg], arch=state.arch) for reg in cc.CALLER_SAVED_REGS} if cc.CALLER_SAVED_REGS is not None else set() )
[docs] @staticmethod def stack_pointer_as_atom(state) -> Register: return Register(state.arch.sp_offset, state.arch.bytes, state.arch)