import operator
from .plugin import SimStatePlugin
[docs]class SimStateCGC(SimStatePlugin):
"""
This state plugin keeps track of CGC state.
"""
# CGC error codes
EBADF = 1
EFAULT = 2
EINVAL = 3
ENOMEM = 4
ENOSYS = 5
EPIPE = 6
# other CGC constants
FD_SETSIZE = 1024
max_allocation = 0x10000000
# __slots__ = [ 'heap_location', 'max_str_symbolic_bytes' ]
[docs] def __init__(self):
SimStatePlugin.__init__(self)
self.allocation_base = 0xB8000000
self.time = 0
self.input_size = 0
self.input_strings = []
self.output_strings = []
self.sinkholes = set()
self.flag_bytes = None
self.max_receive_size = 0
@SimStatePlugin.memo
def copy(self, memo): # pylint: disable=unused-argument
c = super().copy(memo)
c.allocation_base = self.allocation_base
c.time = self.time
c.input_strings = list(self.input_strings)
c.output_strings = list(self.output_strings)
c.input_size = self.input_size
c.sinkholes = set(self.sinkholes)
c.flag_bytes = self.flag_bytes
c.max_receive_size = self.max_receive_size
return c
[docs] def peek_output(self):
if len(self.output_strings) == 0:
return None
return self.output_strings[0]
[docs] def discard_output(self, num_bytes):
if len(self.output_strings) == 0:
return
self.output_strings[0] = self.output_strings[0][num_bytes:]
if self.output_strings[0] == b"":
self.output_strings.pop(0)
[docs] def addr_invalid(self, a):
return not self.state.solver.solution(a != 0, True)
def _combine(self, others):
merging_occured = False
new_allocation_base = max(o.allocation_base for o in others)
if self.state.solver.symbolic(new_allocation_base):
raise ValueError("wat")
concrete_allocation_base = self.state.solver.eval(self.allocation_base)
concrete_new_allocation_base = self.state.solver.eval(new_allocation_base)
if concrete_allocation_base != concrete_new_allocation_base:
self.allocation_base = new_allocation_base
merging_occured = True
return merging_occured
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
return self._combine(others)
[docs] def widen(self, others):
return self._combine(others)
### HEAP MANAGEMENT
[docs] def get_max_sinkhole(self, length):
"""
Find a sinkhole which is large enough to support `length` bytes.
This uses first-fit. The first sinkhole (ordered in descending order by their address)
which can hold `length` bytes is chosen. If there are more than `length` bytes in the
sinkhole, a new sinkhole is created representing the remaining bytes while the old
sinkhole is removed.
"""
ordered_sinks = sorted(list(self.sinkholes), key=operator.itemgetter(0), reverse=True)
max_pair = None
for addr, sz in ordered_sinks:
if sz >= length:
max_pair = (addr, sz)
break
if max_pair is None:
return None
remaining = max_pair[1] - length
max_addr = max_pair[0] + remaining
max_length = remaining
self.sinkholes.remove(max_pair)
if remaining:
self.sinkholes.add((max_pair[0], max_length))
return max_addr
[docs] def add_sinkhole(self, address, length):
"""
Add a sinkhole.
Allow the possibility for the program to reuse the memory represented by the
address length pair.
"""
self.sinkholes.add((address, length))
from angr.sim_state import SimState
SimState.register_default("cgc", SimStateCGC)