import operator
import logging
import itertools
import contextlib
from typing import Optional
import claripy
from claripy.ast.bv import BV
from .plugin import SimStatePlugin
from .. import sim_options
from ..state_plugins.sim_action import SimActionObject
l = logging.getLogger(name=__name__)
[docs]class SimStateHistory(SimStatePlugin):
"""
This class keeps track of historically-relevant information for paths.
"""
STRONGREF_STATE = True
def __init__(self, parent=None, clone=None):
SimStatePlugin.__init__(self)
# attributes handling the progeny of this history object
self.parent = parent if clone is None else clone.parent
self.merged_from = [] if clone is None else list(clone.merged_from)
self.merge_conditions = [] if clone is None else list(clone.merge_conditions)
self.depth = (0 if parent is None else parent.depth + 1) if clone is None else clone.depth
self.previous_block_count = (
(0 if parent is None else parent.block_count) if clone is None else clone.previous_block_count
)
# a string description of this history
self.recent_description = None if clone is None else clone.recent_description
# the control flow transfer information from this history onwards (to the current state)
self.jump_target = None if clone is None else clone.jump_target
self.jump_source = None if clone is None else clone.jump_source
self.jump_avoidable = None if clone is None else clone.jump_avoidable
self.jump_guard: Optional[BV] = None if clone is None else clone.jump_guard
self.jumpkind = None if clone is None else clone.jumpkind
# the execution log for this history
self.recent_events = [] if clone is None else list(clone.recent_events)
self.recent_bbl_addrs = [] if clone is None else list(clone.recent_bbl_addrs)
self.recent_ins_addrs = [] if clone is None else list(clone.recent_ins_addrs)
self.recent_stack_actions = [] if clone is None else list(clone.recent_stack_actions)
self.last_stmt_idx = None if clone is None else clone.last_stmt_idx
# numbers of blocks, syscalls, and instructions that were executed in this step
self.recent_block_count = 0 if clone is None else clone.recent_block_count
self.recent_syscall_count = 0 if clone is None else clone.recent_syscall_count
self.recent_instruction_count = -1 if clone is None else clone.recent_instruction_count
# satness stuff
self._all_constraints = ()
self._satisfiable = None
self.successor_ip = None if clone is None else clone.successor_ip
self.strongref_state = None if clone is None else clone.strongref_state
[docs] def init_state(self):
self.successor_ip = self.state._ip
def __getstate__(self):
# flatten ancestry, otherwise we hit recursion errors trying to get the entire history...
# the important intuition here is that if we provide the entire linked list to pickle, pickle
# will traverse it recursively. If we provide it as a real list, it will not do any recursion.
# the nuance is in whether the list we provide has live parent links, in which case it matters
# what order pickle iterates the list, as it will suddenly be able to perform memoization.
ancestry = []
parent = self.parent
self.parent = None
while parent is not None:
ancestry.append(parent)
parent = parent.parent
ancestry[-1].parent = None
rev_ancestry = list(reversed(ancestry))
d = super().__getstate__()
d["strongref_state"] = None
d["rev_ancestry"] = rev_ancestry
d["successor_ip"] = self.successor_ip
# reconstruct chain
child = self
for parent in ancestry:
child.parent = parent
child = parent
d.pop("parent")
return d
def __setstate__(self, d):
child = self
ancestry = list(reversed(d.pop("rev_ancestry")))
for parent in ancestry:
if hasattr(child, "parent"):
break
child.parent = parent
child = parent
else:
child.parent = None
self.__dict__.update(d)
def __repr__(self):
addr = self.addr
if addr is None:
addr_str = "Unknown"
else:
addr_str = "%#x" % addr
return "<StateHistory @ %s>" % addr_str
[docs] def set_strongref_state(self, state):
if sim_options.EFFICIENT_STATE_MERGING in state.options:
self.strongref_state = state
@property
def addr(self):
if not self.recent_bbl_addrs:
return None
return self.recent_bbl_addrs[-1]
[docs] def merge(self, others, merge_conditions, common_ancestor=None):
if not others:
return False
self.merged_from.extend(h for h in others)
self.merge_conditions = merge_conditions
# we must fix this in order to get
# correct results when using constraints_since()
self.parent = common_ancestor if common_ancestor is not None else self.parent
# rebuild recent constraints
recent_constraints = [h.constraints_since(common_ancestor) for h in itertools.chain([self], others)]
if sim_options.SIMPLIFY_MERGED_CONSTRAINTS in self.state.options:
combined_constraint = self.state.solver.Or(
*[
self.state.solver.simplify(self.state.solver.And(*history_constraints))
for history_constraints in recent_constraints
]
)
else:
combined_constraint = self.state.solver.Or(
*[self.state.solver.And(*history_constraints) for history_constraints in recent_constraints]
)
self.recent_events = [
e.recent_events for e in itertools.chain([self], others) if not isinstance(e, SimActionConstraint)
]
self.recent_events.append(SimActionConstraint(self.state, combined_constraint))
# hard to say what we should do with these others list of things...
# self.recent_bbl_addrs = [e.recent_bbl_addrs for e in itertools.chain([self], others)]
# self.recent_ins_addrs = [e.recent_ins_addrs for e in itertools.chain([self], others)]
# self.recent_stack_actions = [e.recent_stack_actions for e in itertools.chain([self], others)]
return True
[docs] def widen(self, others): # pylint: disable=unused-argument
l.warning("history widening is not implemented!")
return # TODO
@SimStatePlugin.memo
def copy(self, memo): # pylint: disable=unused-argument
return SimStateHistory(clone=self)
[docs] def trim(self):
"""
Discard the ancestry of this state.
"""
new_hist = self.copy({})
new_hist.parent = None
self.state.register_plugin("history", new_hist)
[docs] def filter_actions(
self, start_block_addr=None, end_block_addr=None, block_stmt=None, insn_addr=None, read_from=None, write_to=None
):
"""
Filter self.actions based on some common parameters.
[start_block_addr, end_block_addr]
:param start_block_addr: Only return actions generated in blocks starting at this address.
:param end_block_addr: Only return actions generated in blocks ending at this address.
:param block_stmt: Only return actions generated in the nth statement of each block.
:param insn_addr: Only return actions generated in the assembly instruction at this address.
:param read_from: Only return actions that perform a read from the specified location.
:param write_to: Only return actions that perform a write to the specified location.
Notes:
If IR optimization is turned on, reads and writes may not occur in the instruction
they originally came from. Most commonly, If a register is read from twice in the same
block, the second read will not happen, instead reusing the temp the value is already
stored in.
Valid values for read_from and write_to are the string literals 'reg' or 'mem' (matching
any read or write to registers or memory, respectively), any string (representing a read
or write to the named register), and any integer (representing a read or write to the
memory at this address).
"""
if read_from is not None:
if write_to is not None:
raise ValueError("Can't handle read_from and write_to at the same time!")
if read_from in ("reg", "mem"):
read_type = read_from
read_offset = None
elif isinstance(read_from, str):
read_type = "reg"
read_offset = self.state.project.arch.registers[read_from][0]
else:
read_type = "mem"
read_offset = read_from
if write_to is not None:
if write_to in ("reg", "mem"):
write_type = write_to
write_offset = None
elif isinstance(write_to, str):
write_type = "reg"
write_offset = self.state.project.arch.registers[write_to][0]
else:
write_type = "mem"
write_offset = write_to
# def addr_of_stmt(bbl_addr, stmt_idx):
# if stmt_idx is None:
# return None
# stmts = self.state.project.factory.block(bbl_addr).vex.statements
# if stmt_idx >= len(stmts):
# return None
# for i in reversed(range(stmt_idx + 1)):
# if stmts[i].tag == 'Ist_IMark':
# return stmts[i].addr + stmts[i].delta
# return None
def action_reads(action):
if action.type != read_type:
return False
if action.action != "read":
return False
if read_offset is None:
return True
addr = action.addr
if isinstance(addr, SimActionObject):
addr = addr.ast
if isinstance(addr, claripy.ast.Base):
if addr.symbolic:
return False
addr = self.state.solver.eval(addr)
if addr != read_offset:
return False
return True
def action_writes(action):
if action.type != write_type:
return False
if action.action != "write":
return False
if write_offset is None:
return True
addr = action.addr
if isinstance(addr, SimActionObject):
addr = addr.ast
if isinstance(addr, claripy.ast.Base):
if addr.symbolic:
return False
addr = self.state.solver.eval(addr)
if addr != write_offset:
return False
return True
return [
x
for x in reversed(self.actions)
if (start_block_addr is None or x.bbl_addr >= start_block_addr)
and (end_block_addr is None or x.bbl_addr <= end_block_addr)
and (block_stmt is None or x.stmt_idx == block_stmt)
and (read_from is None or action_reads(x))
and (write_to is None or action_writes(x))
and (insn_addr is None or (x.sim_procedure is None and x.ins_addr == insn_addr))
# (insn_addr is None or (x.sim_procedure is None and addr_of_stmt(x.bbl_addr, x.stmt_idx) == insn_addr))
]
[docs] def demote(self):
"""
Demotes this history node, causing it to drop the strong state reference.
"""
self.strongref_state = None
[docs] def reachable(self):
if self._satisfiable is not None:
return self._satisfiable
if self.state is not None:
try:
self._satisfiable = self.state.solver.satisfiable()
return self._satisfiable
except ReferenceError:
pass
solver = claripy.Solver()
solver.add(self._all_constraints)
self._satisfiable = solver.satisfiable()
return self._satisfiable
#
# Log handling
#
[docs] def add_event(self, event_type, **kwargs):
new_event = SimEvent(self.state, event_type, **kwargs)
self.recent_events.append(new_event)
[docs] def add_action(self, action):
self.recent_events.append(action)
[docs] def extend_actions(self, new_actions):
self.recent_events.extend(new_actions)
[docs] @contextlib.contextmanager
def subscribe_actions(self):
start_idx = len(self.recent_actions)
res = []
yield res
res.extend(self.recent_actions[start_idx:])
#
# Convenient accessors
#
@property
def recent_constraints(self):
# this and the below MUST be lists, not generators, because we need to reverse them
return [ev.constraint for ev in self.recent_events if isinstance(ev, SimActionConstraint)]
@property
def recent_actions(self):
return [ev for ev in self.recent_events if isinstance(ev, SimAction)]
@property
def block_count(self):
return self.previous_block_count + self.recent_block_count
@property
def lineage(self):
return HistoryIter(self)
@property
def parents(self):
if self.parent:
yield from self.parent.lineage
@property
def events(self):
return LambdaIterIter(self, operator.attrgetter("recent_events"))
@property
def actions(self):
return LambdaIterIter(self, operator.attrgetter("recent_actions"))
@property
def jumpkinds(self):
return LambdaAttrIter(self, operator.attrgetter("jumpkind"))
@property
def jump_guards(self):
return LambdaAttrIter(self, operator.attrgetter("jump_guard"))
@property
def jump_targets(self):
return LambdaAttrIter(self, operator.attrgetter("jump_target"))
@property
def jump_sources(self):
return LambdaAttrIter(self, operator.attrgetter("jump_source"))
@property
def descriptions(self):
return LambdaAttrIter(self, operator.attrgetter("recent_description"))
@property
def bbl_addrs(self):
return LambdaIterIter(self, operator.attrgetter("recent_bbl_addrs"))
@property
def ins_addrs(self):
return LambdaIterIter(self, operator.attrgetter("recent_ins_addrs"))
@property
def stack_actions(self):
return LambdaIterIter(self, operator.attrgetter("recent_stack_actions"))
#
# Merging support
#
[docs] def closest_common_ancestor(self, other):
"""
Find the common ancestor between this history node and 'other'.
:param other: the PathHistory to find a common ancestor with.
:return: the common ancestor SimStateHistory, or None if there isn't one
"""
our_history_iter = reversed(HistoryIter(self))
their_history_iter = reversed(HistoryIter(other))
sofar = set()
while True:
our_done = False
their_done = False
try:
our_next = next(our_history_iter)
if our_next in sofar:
# we found it!
return our_next
sofar.add(our_next)
except StopIteration:
# we ran out of items during iteration
our_done = True
try:
their_next = next(their_history_iter)
if their_next in sofar:
# we found it!
return their_next
sofar.add(their_next)
except StopIteration:
# we ran out of items during iteration
their_done = True
# if we ran out of both lists, there's no common ancestor
if our_done and their_done:
return None
[docs] def constraints_since(self, other):
"""
Returns the constraints that have been accumulated since `other`.
:param other: a prior PathHistory object
:returns: a list of constraints
"""
constraints = []
cur = self
while cur is not other and cur is not None:
constraints.extend(cur.recent_constraints)
cur = cur.parent
return constraints
[docs] def make_child(self):
return SimStateHistory(parent=self)
[docs]class TreeIter:
def __init__(self, start, end=None):
self._start = start
self._end = end
def _iter_nodes(self):
n = self._start
while n is not self._end:
yield n
n = n.parent
def __iter__(self):
yield from self.hardcopy
def __reversed__(self):
raise NotImplementedError("Why are you using this class")
@property
def hardcopy(self):
# lmao
return list(reversed(tuple(reversed(self))))
def __len__(self):
# TODO: this is wrong
return self._start.depth
def __getitem__(self, k):
if isinstance(k, slice):
raise ValueError("Please use .hardcopy to use slices")
if k >= 0:
raise ValueError("Please use .hardcopy to use nonnegative indexes")
i = 0
for item in reversed(self):
i -= 1
if i == k:
return item
raise IndexError(k)
[docs] def count(self, v):
"""
Count occurrences of value v in the entire history. Note that the subclass must implement the __reversed__
method, otherwise an exception will be thrown.
:param object v: The value to look for
:return: The number of occurrences
:rtype: int
"""
ctr = 0
for item in reversed(self):
if item == v:
ctr += 1
return ctr
[docs]class HistoryIter(TreeIter):
def __reversed__(self):
yield from self._iter_nodes()
[docs]class LambdaAttrIter(TreeIter):
def __init__(self, start, f, **kwargs):
TreeIter.__init__(self, start, **kwargs)
self._f = f
def __reversed__(self):
for hist in self._iter_nodes():
a = self._f(hist)
if a is not None:
yield a
[docs]class LambdaIterIter(LambdaAttrIter):
def __init__(self, start, f, reverse=True, **kwargs):
LambdaAttrIter.__init__(self, start, f, **kwargs)
self._f = f
self._reverse = reverse
def __reversed__(self):
for hist in self._iter_nodes():
yield from reversed(self._f(hist)) if self._reverse else self._f(hist)
from angr.sim_state import SimState
SimState.register_default("history", SimStateHistory)
from .sim_action import SimAction, SimActionConstraint
from .sim_event import SimEvent