from __future__ import annotations
import logging
import itertools
import claripy
from .memory_mixins import DefaultMemory
from angr.state_plugins.plugin import SimStatePlugin
from angr.state_plugins.sim_action_object import SimActionObject
from angr import sim_options
l = logging.getLogger(name=__name__)
file_counter = itertools.count()
dialogue_counter = itertools.count()
[docs]
class Flags: # pylint: disable=W0232,
O_RDONLY = 0
O_WRONLY = 1
O_RDWR = 2
O_ACCMODE = 3 # bitmask for read/write mode
O_APPEND = 0o2000
O_ASYNC = 0o20000
O_CLOEXEC = 0o2000000
# TODO mode for this flag
O_CREAT = 0o100
O_DIRECT = 0o40000
O_DIRECTORY = 0o200000
O_DSYNC = 0o10000
O_EXCL = 0o200
O_LARGEFILE = 0o100000
O_NOATIME = 0o1000000
O_NOCTTY = 0o400
O_NOFOLLOW = 0o400000
O_NONBLOCK = 0o4000
O_NDELAY = 0o4000
O_PATH = 0o10000000
O_SYNC = 0o4010000
O_TMPFILE = 0o20200000
O_TRUNC = 0o1000
def _deps_unpack(a):
if isinstance(a, SimActionObject):
return a.ast, a.reg_deps, a.tmp_deps
return a, None, None
[docs]
class SimFileBase(SimStatePlugin):
"""
SimFiles are the storage mechanisms used by SimFileDescriptors.
Different types of SimFiles can have drastically different interfaces, and as a result there's not much that can be
specified on this base class. All the read and write methods take a ``pos`` argument, which may have different
semantics per-class. ``0`` will always be a valid position to use, though, and the next position you should use
is part of the return tuple.
Some simfiles are "streams", meaning that the position that reads come from is determined not by the position you
pass in (it will in fact be ignored), but by an internal variable. This is stored as ``.pos`` if you care to read
it. Don't write to it. The same lack-of-semantics applies to this field as well.
:ivar name: The name of the file. Purely for cosmetic purposes
:ivar ident: The identifier of the file, typically autogenerated from the name and a nonce. Purely for cosmetic
purposes, but does appear in symbolic values autogenerated in the file.
:ivar seekable: Bool indicating whether seek operations on this file should succeed. If this is True, then ``pos``
must be a number of bytes from the start of the file.
:ivar writable: Bool indicating whether writing to this file is allowed.
:ivar pos: If the file is a stream, this will be the current position. Otherwise, None.
:ivar concrete: Whether or not this file contains mostly concrete data. Will be used by some SimProcedures to
choose how to handle variable-length operations like fgets.
:ivar file_exists:
Set to False, if file does not exists, set to a claripy Bool if unknown, default True.
"""
seekable = False
pos = None
[docs]
def __init__(self, name=None, writable=True, ident=None, concrete=False, file_exists=True, **kwargs):
self.name = name
self.ident = ident
self.writable = writable
self.concrete = concrete
self.file_exists = file_exists
if ident is None:
self.ident = self.make_ident(self.name)
if "memory_id" in kwargs:
kwargs["memory_id"] = self.ident
super().__init__(**kwargs)
[docs]
@staticmethod
def make_ident(name):
if name is None:
return "file"
if type(name) is str:
name = name.encode()
def generate():
consecutive_bad = 0
for ch in name:
if 0x20 <= ch <= 0x7E:
consecutive_bad = 0
yield chr(ch)
elif consecutive_bad < 3:
consecutive_bad += 1
yield "?"
nice_name = "".join(generate())
return "file_%d_%s" % (next(file_counter), nice_name)
[docs]
def concretize(self, **kwargs):
"""
Return a concretization of the contents of the file. The type of the return value of this method will vary
depending on which kind of SimFile you're using.
"""
raise NotImplementedError
[docs]
def read(self, pos, size, **kwargs):
"""
Read some data from the file.
:param pos: The offset in the file to read from.
:param size: The size to read. May be symbolic.
:return: A tuple of the data read (a bitvector of the length that is the maximum length of the read),
the actual size of the read, and the new file position pointer.
"""
raise NotImplementedError
[docs]
def write(self, pos, data, size=None, **kwargs):
"""
Write some data to the file.
:param pos: The offset in the file to write to. May be ignored if the file is a stream or device.
:param data: The data to write as a bitvector
:param size: The optional size of the data to write. If not provided will default to the length of the data.
Must be constrained to less than or equal to the size of the data.
:return: The new file position pointer.
"""
raise NotImplementedError
@property
def size(self):
"""
The number of data bytes stored by the file at present. May be a symbolic value.
"""
raise NotImplementedError
@DefaultMemory.memo
def copy(self, memo):
o = super().copy(memo)
o.ident = self.ident
o.name = self.name
o.ident = self.ident
o.writable = self.writable
o.concrete = self.concrete
o.file_exists = self.file_exists
return o
[docs]
class SimFile(SimFileBase, DefaultMemory): # TODO: pick a better base class omg
"""
The normal SimFile is meant to model files on disk. It subclasses SimSymbolicMemory so loads and stores to/from
it are very simple.
:param name: The name of the file
:param content: Optional initial content for the file as a string or bitvector
:param size: Optional size of the file. If content is not specified, it defaults to zero
:param has_end: Whether the size boundary is treated as the end of the file or a frontier at which new content
will be generated. If unspecified, will pick its value based on options.FILES_HAVE_EOF. Another
caveat is that if the size is also unspecified this value will default to False.
:param seekable: Optional bool indicating whether seek operations on this file should succeed, default True.
:param writable: Whether writing to this file is allowed
:param concrete: Whether or not this file contains mostly concrete data. Will be used by some SimProcedures to
choose how to handle variable-length operations like fgets.
:ivar has_end: Whether this file has an EOF
"""
[docs]
def __init__(
self,
name=None,
content=None,
size=None,
has_end=None,
seekable=True,
writable=True,
ident=None,
concrete=None,
**kwargs,
):
kwargs["memory_id"] = kwargs.get("memory_id", "file")
super().__init__(name=name, writable=writable, ident=ident, **kwargs)
self._size = size
self.has_end = has_end
self.seekable = seekable
# this is hacky because we need to work around not having a state yet
content = _deps_unpack(content)[0]
if type(content) is bytes:
if concrete is None:
concrete = True
content = claripy.BVV(content)
elif type(content) is str:
if concrete is None:
concrete = True
content = claripy.BVV(content.encode())
elif content is None:
pass
elif isinstance(content, claripy.ast.Bits):
if concrete is None and not content.symbolic:
concrete = True
else:
raise TypeError(f"Can't handle SimFile content of type {type(content)}")
if concrete is None:
concrete = False
self.concrete = concrete
if content is not None:
self.__content = content
if self._size is None:
self._size = len(content) // 8
else:
if self._size is None:
self._size = 0
if has_end is None:
self.has_end = False
@property
def category(self): # override trying to determine from self.id to allow arbitrary idents
return "file"
[docs]
def set_state(self, state):
super().set_state(state)
try:
content = self.__content
except AttributeError:
pass
else:
self.store(0, content)
del self.__content
if self.has_end is None:
self.has_end = sim_options.FILES_HAVE_EOF in state.options
if type(self._size) is int:
self._size = claripy.BVV(self._size, state.arch.bits)
elif len(self._size) != state.arch.bits:
raise TypeError("SimFile size must be a bitvector of size %d (arch.bits)" % state.arch.bits)
@property
def size(self):
return self._size
[docs]
def concretize(self, **kwargs):
"""
Return a concretization of the contents of the file, as a flat bytestring.
"""
size = self.state.solver.min(self._size, **kwargs)
data = self.load(0, size)
kwargs["cast_to"] = kwargs.get("cast_to", bytes)
kwargs["extra_constraints"] = (*tuple(kwargs.get("extra_constraints", ())), self._size == size)
return self.state.solver.eval(data, **kwargs)
[docs]
def read(self, pos, size, **kwargs):
disable_actions = kwargs.pop("disable_actions", False)
inspect = kwargs.pop("inspect", True)
# Step 1: figure out a reasonable concrete size to use for the memory load
# since we don't want to concretize anything
if self.state.solver.symbolic(size):
try:
passed_max_size = self.state.solver.max(
size, extra_constraints=(size < self.state.libc.max_packet_size,)
)
except SimSolverError:
passed_max_size = self.state.solver.min(size)
l.warning("Symbolic read size is too large for threshold - concretizing to min (%d)", passed_max_size)
self.state.add_constraints(size == passed_max_size)
else:
passed_max_size = self.state.solver.eval(size)
if passed_max_size > 2**13:
l.warning("Program performing extremely large reads")
# Step 2.1: check for the possibility of EOFs
# If it's not possible to EOF (because there's no EOF), this is very simple!
if not self.has_end:
# bump the storage size as we read
self._size = claripy.If(size + pos > self._size, size + pos, self._size)
return self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect), size, size + pos
# Step 2.2: check harder for the possibility of EOFs
# This is the size if we're reading to the end of the file
distance_to_eof = self._size - pos
distance_to_eof = claripy.If(claripy.SLE(distance_to_eof, 0), 0, distance_to_eof)
# try to frontload some constraint solving to see if it's impossible for this read to EOF
if self.state.solver.satisfiable(extra_constraints=(size > distance_to_eof,)):
# it's possible to EOF
# final size = min(passed_size, max(distance_to_eof, 0))
real_size = claripy.If(size >= distance_to_eof, distance_to_eof, size)
return (
self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect),
real_size,
real_size + pos,
)
# it's not possible to EOF
# we don't need to constrain or min/max the output size because there are already constraints asserting
# that the total filesize is pretty big
# note: this assumes that constraints cannot be removed
return self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect), size, size + pos
[docs]
def write(self, pos, data, size=None, events=True, **kwargs):
if events:
self.state.history.add_event("fs_write", filename=self.name, data=data, size=size, pos=pos)
data = _deps_unpack(data)[0]
if size is None:
size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.ast.Bits) else len(data)
# \(_^^)/
self.store(pos, data, size=size)
new_end = _deps_unpack(pos + size)[0] # decline to store SAO
self._size = claripy.If(new_end > self._size, new_end, self._size)
return new_end
@SimStatePlugin.memo
def copy(self, memo):
o = super().copy(memo)
o.name = self.name
o._size = self._size
o.has_end = self.has_end
o.seekable = self.seekable
o.writable = self.writable
o.concrete = self.concrete
o.file_exists = self.file_exists
return o
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
if not all(type(o) is type(self) for o in others):
raise SimMergeError("Cannot merge files of disparate type")
if any(o.has_end != self.has_end for o in others):
raise SimMergeError("Cannot merge files where some have ends and some don't")
self._size = claripy.ite_cases(zip(merge_conditions[1:], (o._size for o in others)), self._size)
return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs]
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
[docs]
class SimFileStream(SimFile):
"""
A specialized SimFile that uses a flat memory backing, but functions as a stream, tracking its position internally.
The pos argument to the read and write methods will be ignored, and will return None. Instead, there is an
attribute ``pos`` on the file itself, which will give you what you want.
:param name: The name of the file, for cosmetic purposes
:param pos: The initial position of the file, default zero
:param kwargs: Any other keyword arguments will go on to the SimFile constructor.
:ivar pos: The current position in the file.
"""
[docs]
def __init__(self, name=None, content=None, pos=0, **kwargs):
super().__init__(name=name, content=content, **kwargs)
self.pos = pos
[docs]
def set_state(self, state):
super().set_state(state)
if type(self.pos) is int:
self.pos = claripy.BVV(self.pos, state.arch.bits)
elif len(self.pos) != state.arch.bits:
raise TypeError("SimFileStream position must be a bitvector of size %d (arch.bits)" % state.arch.bits)
[docs]
def read(self, pos, size, **kwargs):
no_stream = kwargs.pop("no_stream", False)
if not no_stream:
pos = self.pos
data, size, pos = super().read(pos, size, **kwargs)
if not no_stream:
self.pos = pos
return data, size, pos
[docs]
def write(self, _, data, size=None, **kwargs):
self.pos = super().write(self.pos, data, size, **kwargs)
return
@SimStatePlugin.memo
def copy(self, memo):
c = super().copy(memo)
c.pos = self.pos
return c
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
self.pos = claripy.ite_cases(zip(merge_conditions[1:], [o.pos for o in others]), self.pos)
return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs]
class SimPackets(SimFileBase):
"""
The SimPackets is meant to model inputs whose content is delivered a series of asynchronous chunks. The data is
stored as a list of read or write results. For symbolic sizes, state.libc.max_packet_size will be respected. If
the SHORT_READS option is enabled, reads will return a symbolic size constrained to be less than or equal to the
requested size.
A SimPackets cannot be used for both reading and writing - for socket objects that can be both read and written to
you should use a file descriptor to multiplex the read and write operations into two separate file storage
mechanisms.
:param name: The name of the file, for cosmetic purposes
:param write_mode: Whether this file is opened in read or write mode. If this is unspecified it will be
autodetected.
:param content: Some initial content to use for the file. Can be a list of bytestrings or a list of tuples of
content ASTs and size ASTs.
:ivar write_mode: See the eponymous parameter
:ivar content: A list of packets, as tuples of content ASTs and size ASTs.
"""
[docs]
def __init__(self, name, write_mode=None, content=None, writable=True, ident=None, **kwargs):
super().__init__(name, writable=writable, ident=ident, **kwargs)
self.write_mode = write_mode
self.content = content
self.sanitized = 0
if self.content is None:
self.content = []
else:
self.content = [
(
x
if type(x) is tuple
else (
(x, len(x) // 8)
if isinstance(x, claripy.ast.Bits)
else (
(x.ast, len(x) // 8)
if isinstance(x, SimActionObject)
else (claripy.BVV(x), len(x)) if type(x) is bytes else None
)
)
)
for x in self.content
]
if any(x is None for x in self.content):
raise TypeError("Bad type in initial SimPacket content")
[docs]
def set_state(self, state):
super().set_state(state)
# sanitize the lengths in self.content now that we know the wordsize
# getattr because we want to support old pickles without this attribute (TODO remove this)
for i in range(getattr(self, "sanitized", 0), len(self.content)):
data, length = self.content[i]
if type(length) is int:
self.content[i] = (data, claripy.BVV(length, state.arch.bits))
elif len(length) < state.arch.bits:
self.content[i] = (data, length.zero_extend(state.arch.bits - len(length)))
elif len(length) != state.arch.bits:
raise TypeError("Bad bitvector size for length in SimPackets.content")
self.sanitized = len(self.content)
@property
def size(self):
return sum(x[1] for x in self.content)
[docs]
def concretize(self, **kwargs):
"""
Returns a list of the packets read or written as bytestrings.
"""
lengths = [self.state.solver.eval(x[1], **kwargs) for x in self.content]
kwargs["cast_to"] = bytes
sizes = [x[0].size() for x in self.content]
return [
b"" if i == 0 else self.state.solver.eval(x[0][: size - i * self.state.arch.byte_width], **kwargs)
for i, size, x in zip(lengths, sizes, self.content)
]
[docs]
def read(self, pos, size, **kwargs):
"""
Read a packet from the stream.
:param int pos: The packet number to read from the sequence of the stream. May be None to append to the
stream.
:param size: The size to read. May be symbolic.
:param short_reads: Whether to replace the size with a symbolic value constrained to less than or equal to the
original size. If unspecified, will be chosen based on the state option.
:return: A tuple of the data read (a bitvector of the length that is the maximum length of the read)
and the actual size of the read.
"""
short_reads = kwargs.pop("short_reads", None)
# sanity check on read/write modes
if self.write_mode is None:
self.write_mode = False
elif self.write_mode is True:
raise SimFileError("Cannot read and write to the same SimPackets")
# sanity check on packet number and determine if data is already present
if pos is None:
pos = len(self.content)
if pos < 0:
raise SimFileError("SimPacket.read(%d): Negative packet number?" % pos)
if pos > len(self.content):
raise SimFileError("SimPacket.read(%d): Packet number is past frontier of %d?" % (pos, len(self.content)))
if pos != len(self.content):
_, realsize = self.content[pos]
self.state.add_constraints(realsize <= size) # assert that the packet fits within the read request
if not self.state.solver.satisfiable():
raise SimFileError(
"SimPackets could not fit the current packet into the read "
f"request of {size} bytes: {self.content[pos]}"
)
return self.content[pos] + (pos + 1,)
# Type check
if type(size) is int:
size = claripy.BVV(size, self.state.arch.bits)
# The read is on the frontier. let's generate a new packet.
orig_size = size
max_size = None
# if short reads are enabled, replace size with a symbol
if short_reads is True or (short_reads is None and sim_options.SHORT_READS in self.state.options):
size = self.state.solver.BVS(
"packetsize_%d_%s" % (len(self.content), self.ident),
self.state.arch.bits,
key=("file", self.ident, "packetsize", len(self.content)),
)
self.state.add_constraints(size <= orig_size)
# figure out the maximum size of the read
if not self.state.solver.symbolic(size):
max_size = self.state.solver.eval(size)
elif self.state.solver.satisfiable(extra_constraints=(size <= self.state.libc.max_packet_size,)):
l.info("Constraining symbolic packet size to be less than %d", self.state.libc.max_packet_size)
if not self.state.solver.is_true(orig_size <= self.state.libc.max_packet_size):
self.state.add_constraints(size <= self.state.libc.max_packet_size)
if not self.state.solver.symbolic(orig_size):
max_size = min(self.state.solver.eval(orig_size), self.state.libc.max_packet_size)
else:
max_size = self.state.solver.max(size)
else:
max_size = self.state.solver.min(size)
l.warning(
"Could not constrain symbolic packet size to <= %d; using minimum %d for size",
self.state.libc.max_packet_size,
max_size,
)
self.state.add_constraints(size == max_size)
# generate the packet data and return it
data = self.state.solver.BVS(
"packet_%d_%s" % (len(self.content), self.ident),
max_size * self.state.arch.byte_width,
key=("file", self.ident, "packet", len(self.content)),
)
packet = (data, size)
self.content.append(packet)
return (*packet, pos + 1)
[docs]
def write(self, pos, data, size=None, events=True, **kwargs):
"""
Write a packet to the stream.
:param int pos: The packet number to write in the sequence of the stream. May be None to append to the
stream.
:param data: The data to write, as a string or bitvector.
:param size: The optional size to write. May be symbolic; must be constrained to at most the size of
data.
:return: The next packet to use after this
"""
if events:
self.state.history.add_event("fs_write", filename=self.name, data=data, size=size, pos=pos)
# sanity check on read/write modes
if self.write_mode is None:
self.write_mode = True
elif self.write_mode is False:
raise SimFileError("Cannot read and write to the same SimPackets")
data = _deps_unpack(data)[0]
if type(data) is bytes:
data = claripy.BVV(data)
if size is None:
size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.ast.Bits) else len(data)
if type(size) is int:
size = claripy.BVV(size, self.state.arch.bits)
# sanity check on packet number and determine if data is already present
if pos is None:
pos = len(self.content)
if pos < 0:
raise SimFileError("SimPacket.write(%d): Negative packet number?" % pos)
if pos > len(self.content):
raise SimFileError("SimPacket.write(%d): Packet number is past frontier of %d?" % (pos, len(self.content)))
if pos != len(self.content):
realdata, realsize = self.content[pos]
maxlen = max(len(realdata), len(data))
self.state.add_constraints(realdata[maxlen - 1 : 0] == data[maxlen - 1 : 0])
self.state.add_constraints(size == realsize)
if not self.state.solver.satisfiable():
raise SimFileError("Packet write equality constraints made state unsatisfiable???")
return pos + 1
# write it out!
self.content.append((_deps_unpack(data)[0], size))
return pos + 1
@SimStatePlugin.memo
def copy(self, memo): # pylint: disable=unused-argument
o = type(self)(
name=self.name, write_mode=self.write_mode, content=self.content, ident=self.ident, concrete=self.concrete
)
o.sanitized = getattr(self, "sanitized", 0)
return o
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
for o in others:
if o.write_mode is None:
continue
if self.write_mode is None:
self.write_mode = o.write_mode
elif self.write_mode is not o.write_mode:
raise SimMergeError("Cannot merge SimPackets with disparate write_mode")
for o in others:
if len(o.content) != len(self.content):
raise SimMergeError("Cannot merge SimPackets with disparate number of packets")
for i, default in enumerate(self.content):
max_data_length = max(len(default[0]), max(len(o.content[i][0]) for o in others))
merged_data = claripy.ite_cases(
zip(
merge_conditions[1:],
(o.content[i][0].concat(claripy.BVV(0, max_data_length - len(o.content[i][0]))) for o in others),
),
default[0],
)
merged_size = claripy.ite_cases(zip(merge_conditions[1:], (o.content[i][1] for o in others)), default[1])
self.content[i] = (merged_data, merged_size)
return True
[docs]
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
[docs]
class SimPacketsStream(SimPackets):
"""
A specialized SimPackets that tracks its position internally.
The pos argument to the read and write methods will be ignored, and will return None. Instead, there is an
attribute ``pos`` on the file itself, which will give you what you want.
:param name: The name of the file, for cosmetic purposes
:param pos: The initial position of the file, default zero
:param kwargs: Any other keyword arguments will go on to the SimPackets constructor.
:ivar pos: The current position in the file.
"""
[docs]
def __init__(self, name, pos=0, **kwargs):
super().__init__(name, **kwargs)
self.pos = pos
[docs]
def read(self, pos, size, **kwargs):
no_stream = kwargs.pop("no_stream", False)
if not no_stream:
pos = self.pos
data, size, pos = super().read(pos, size, **kwargs)
if not no_stream:
self.pos = pos
return data, size, pos
[docs]
def write(self, _, data, size=None, **kwargs):
self.pos = super().write(self.pos, data, size, **kwargs)
return
@SimStatePlugin.memo
def copy(self, memo):
c = super().copy(memo)
c.pos = self.pos
return c
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
if any(o.pos != self.pos for o in others):
raise SimMergeError("Can't merge SimPacketsStreams with disparate positions")
return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs]
class SimFileDescriptorBase(SimStatePlugin):
"""
The base class for implementations of POSIX file descriptors.
All file descriptors should respect the CONCRETIZE_SYMBOLIC_{READ,WRITE}_SIZES state options.
"""
[docs]
def read(self, pos, size, **kwargs):
"""
Reads some data from the file, storing it into memory.
:param pos: The address to read data from file
:param size: The requested length of the read
:return: The real length of the read
"""
data, realsize = self.read_data(size, **kwargs)
if not self.state.solver.is_true(realsize == 0):
do_concrete_update = kwargs.pop("do_concrete_update", False)
if do_concrete_update:
concrete_data = claripy.BVV(self.state.solver.eval(data), data.size())
self.state.memory.store(pos, concrete_data, action=None, inspect=False)
self.state.memory.store(pos, data, size=realsize)
return realsize
[docs]
def write(self, pos, size, **kwargs):
"""
Writes some data, loaded from the state, into the file.
:param pos: The address to read the data to write from in memory
:param size: The requested size of the write
:return: The real length of the write
"""
if type(pos) is str:
raise TypeError("SimFileDescriptor.write takes an address and size. Did you mean write_data?")
# Find a reasonable concrete size for the load since we don't want to concretize anything
# This is copied from SimFile.read
# TODO: refactor into a generic concretization strategy?
if self.state.solver.symbolic(size):
try:
passed_max_size = self.state.solver.max(
size, extra_constraints=(size < self.state.libc.max_packet_size,)
)
except SimSolverError:
passed_max_size = self.state.solver.min(size)
l.warning("Symbolic write size is too large for threshold - concretizing to min (%d)", passed_max_size)
self.state.add_constraints(size == passed_max_size)
else:
passed_max_size = self.state.solver.eval(size)
if passed_max_size > 2**13:
l.warning("Program performing extremely large write")
data = self.state.memory.load(pos, passed_max_size)
return self.write_data(data, size, **kwargs)
[docs]
def read_data(self, size, **kwargs):
"""
Reads some data from the file, returning the data.
:param size: The requested length of the read
:return: A tuple of the data read and the real length of the read
"""
raise NotImplementedError
[docs]
def write_data(self, data, size=None, **kwargs):
"""
Write some data, provided as an argument into the file.
:param data: A bitvector to write into the file
:param size: The requested size of the write (may be symbolic)
:return: The real length of the write
"""
raise NotImplementedError
[docs]
def seek(self, offset, whence="start"):
"""
Seek the file descriptor to a different position in the file.
:param offset: The offset to seek to, interpreted according to whence
:param whence: What the offset is relative to; one of the strings "start", "current", or "end"
:return: A symbolic boolean describing whether the seek succeeded or not
"""
raise NotImplementedError
[docs]
def tell(self):
"""
Return the current position, or None if the concept doesn't make sense for the given file.
"""
raise NotImplementedError
[docs]
def eof(self):
"""
Return the EOF status. May be a symbolic boolean.
"""
raise NotImplementedError
[docs]
def size(self):
"""
Return the size of the data stored in the file in bytes, or None if the concept doesn't make sense for the
given file.
"""
raise NotImplementedError
@property
def read_storage(self):
"""
Return the SimFile backing reads from this fd
"""
raise NotImplementedError
@property
def write_storage(self):
"""
Return the SimFile backing writes to this fd
"""
raise NotImplementedError
@property
def read_pos(self):
"""
Return the current position of the read file pointer.
If the underlying read file is a stream, this will return the position of the stream. Otherwise, will return
the position of the file descriptor in the file.
"""
raise NotImplementedError
@property
def write_pos(self):
"""
Return the current position of the read file pointer.
If the underlying read file is a stream, this will return the position of the stream. Otherwise, will return
the position of the file descriptor in the file.
"""
raise NotImplementedError
[docs]
def concretize(self, **kwargs):
"""
Return a concretizeation of the data in the underlying file. Has different return types to represent different
data structures on a per-class basis.
Any arguments passed to this will be passed onto state.solver.eval.
"""
raise NotImplementedError
@property
def file_exists(self):
"""
This should be True in most cases.
Only if we opened an fd of unknown existence, ALL_FILES_EXIST is False and ANY_FILE_MIGHT_EXIST is True,
this is a symbolic boolean.
"""
return True
def _prep_read(self, size):
return self._prep_generic(size, True)
def _prep_write(self, size):
return self._prep_generic(size, False)
def _prep_generic(self, size, is_read):
option = (
sim_options.CONCRETIZE_SYMBOLIC_FILE_READ_SIZES if is_read else sim_options.CONCRETIZE_SYMBOLIC_WRITE_SIZES
)
string = "read" if is_read else "write"
# check if we need to concretize the length
if option in self.state.options and self.state.solver.symbolic(size):
try:
size = self.state.solver.max(size, extra_constraints=(size <= self.state.libc.max_packet_size,))
except SimSolverError:
size = self.state.solver.min(size)
l.info("Concretizing symbolic %s size to %d", string, size)
return size
[docs]
class SimFileDescriptor(SimFileDescriptorBase):
"""
A simple file descriptor forwarding reads and writes to a SimFile. Contains information about
the current opened state of the file, such as the flags or (if relevant) the current position.
:ivar file: The SimFile described to by this descriptor
:ivar flags: The mode that the file descriptor was opened with, a bitfield of flags
"""
[docs]
def __init__(self, simfile, flags=0):
super().__init__()
self.file = simfile
self._pos = 0
self.flags = flags
[docs]
def read_data(self, size, **kwargs):
size = self._prep_read(size)
data, realsize, self._pos = self.file.read(self._pos, size)
return data, realsize
[docs]
def write_data(self, data, size=None, **kwargs):
if self.flags & Flags.O_APPEND and self.file.seekable:
self._pos = self.file.size
data = _deps_unpack(data)[0]
if size is None:
size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.ast.Bits) else len(data)
size = self._prep_write(size)
self._pos = self.file.write(self._pos, data, size)
return size
[docs]
def seek(self, offset, whence="start"):
if not self.file.seekable:
return claripy.false()
if type(offset) is int:
offset = claripy.BVV(offset, self.state.arch.bits)
if whence == "start":
new_pos = offset
elif whence == "current":
new_pos = self._pos + offset
elif whence == "end":
new_pos = self.file.size + offset
success_condition = claripy.And(claripy.SGE(new_pos, 0), claripy.SLE(new_pos, self.file.size))
self._pos = _deps_unpack(claripy.If(success_condition, new_pos, self._pos))[0]
return success_condition
[docs]
def eof(self):
if not self.file.seekable:
return claripy.false()
if not getattr(self.file, "has_end", True):
return claripy.false()
return self._pos == self.file.size
[docs]
def tell(self):
if not self.file.seekable:
return None
return self._pos
[docs]
def size(self):
return self.file.size
[docs]
def concretize(self, **kwargs):
"""
Return a concretization of the underlying file. Returns whatever format is preferred by the file.
"""
return self.file.concretize(**kwargs)
@property
def file_exists(self):
return self.file.file_exists
@property
def read_storage(self):
return self.file
@property
def write_storage(self):
return self.file
@property
def read_pos(self):
if self.file.pos is not None:
return self.file.pos
return self._pos
@property
def write_pos(self):
if self.file.pos is not None:
return self.file.pos
return self._pos
[docs]
def set_state(self, state):
self.file.set_state(state)
super().set_state(state)
@SimStatePlugin.memo
def copy(self, memo):
c = type(self)(self.file.copy(memo), self.flags)
c._pos = self._pos
return c
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
# do NOT merge file content - descriptors do not have ownership, prevent duplicate merging
if not all(type(o) is type(self) for o in others):
l.error("Cannot merge SimFileDescriptors of disparate types")
return False
if not all(o.flags == self.flags for o in others):
l.error("Cannot merge SimFileDescriptors of disparate flags")
return False
if type(self._pos) is int and all(type(o._pos) is int for o in others):
# TODO: we can do slightly better for packet-based things by having some packets have a "guard condition"
# which makes them zero length if they're not merged in
if any(o._pos != self._pos for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets")
elif self._pos is None and all(o._pos is None for o in others):
pass
elif self._pos is None or any(o._pos is None for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!")
else:
self._pos = claripy.ite_cases(zip(merge_conditions[1:], (o._pos for o in others)), self._pos)
return True
[docs]
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
[docs]
class SimFileDescriptorDuplex(SimFileDescriptorBase):
"""
A file descriptor that refers to two file storage mechanisms, one to read from and one to write to. As a result,
operations like seek, eof, etc no longer make sense.
:param read_file: The SimFile to read from
:param write_file: The SimFile to write to
"""
[docs]
def __init__(self, read_file, write_file):
super().__init__()
self._read_file = read_file
self._write_file = write_file
self._read_pos = 0
self._write_pos = 0
[docs]
def read_data(self, size, **kwargs):
size = self._prep_read(size)
data, realsize, self._read_pos = self._read_file.read(self._read_pos, size)
return data, realsize
[docs]
def write_data(self, data, size=None, **kwargs):
data = _deps_unpack(data)[0]
if size is None:
size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.ast.Bits) else len(data)
size = self._prep_write(size)
self._write_pos = self._write_file.write(self._write_pos, data, size)
return size
[docs]
def set_state(self, state):
self._read_file.set_state(state)
self._write_file.set_state(state)
super().set_state(state)
[docs]
def eof(self):
# the thing that makes the most sense is for this to refer to the read eof status...
if not self._read_file.seekable:
return claripy.false()
if not getattr(self._read_file, "has_end", True):
return claripy.false()
return self._read_pos == self._read_file.size
[docs]
def tell(self):
return None
[docs]
def seek(self, offset, whence="start"):
return claripy.false()
[docs]
def size(self):
return None
[docs]
def concretize(self, **kwargs):
"""
Return a concretization of the underlying files, as a tuple of (read file, write file).
"""
return (self._read_file.concretize(**kwargs), self._write_file.concretize(**kwargs))
@property
def read_storage(self):
return self._read_file
@property
def write_storage(self):
return self._write_file
@property
def read_pos(self):
if self._read_file.pos is not None:
return self._read_file.pos
return self._read_pos
@property
def write_pos(self):
if self._write_file.pos is not None:
return self._write_file.pos
return self._write_pos
@SimStatePlugin.memo
def copy(self, memo):
c = type(self)(self._read_file.copy(memo), self._write_file.copy(memo))
c._read_pos = self._read_pos
c._write_pos = self._write_pos
return c
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
# do NOT merge storage mechanisms here - fs and posix handle that
if not all(type(o) is type(self) for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors of disparate types")
if type(self._read_pos) is int and all(type(o._read_pos) is int for o in others):
if any(o._read_pos != self._read_pos for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets")
elif self._read_pos is None and all(o._read_pos is None for o in others):
pass
elif self._read_pos is None or any(o._read_pos is None for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!")
else:
self._read_pos = claripy.ite_cases(zip(merge_conditions[1:], (o._read_pos for o in others)), self._read_pos)
if type(self._write_pos) is int and all(type(o._write_pos) is int for o in others):
if any(o._write_pos != self._write_pos for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets")
elif self._write_pos is None and all(o._write_pos is None for o in others):
pass
elif self._write_pos is None or any(o._write_pos is None for o in others):
raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!")
else:
self._write_pos = claripy.ite_cases(
zip(merge_conditions[1:], (o._write_pos for o in others)), self._write_pos
)
return True
[docs]
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
[docs]
class SimPacketsSlots(SimFileBase):
"""
SimPacketsSlots is the new SimDialogue, if you've ever seen that before.
The idea is that in some cases, the only thing you really care about is getting the lengths of reads right, and
some of them should be short reads, and some of them should be truncated. You provide to this class a list of read
lengths, and it figures out the length of each read, and delivers some content.
This class will NOT respect the position argument you pass it - this storage is not stateless.
"""
seekable = False
[docs]
def __init__(self, name, read_sizes, ident=None, **kwargs):
super().__init__(name, writable=False, ident=ident)
self.read_sizes = read_sizes
self.read_data = []
[docs]
def concretize(self, **kwargs):
return [self.state.solver.eval(var, cast_to=bytes, **kwargs) for var in self.read_data]
[docs]
def read(self, pos, size, **kwargs):
if not self.read_sizes:
return claripy.BVV(0, 0), 0, None
try:
req_size = self.state.solver.eval_one(size)
except SimSolverError as err:
raise SimFileError("SimPacketsSlots can't handle multivalued read sizes") from err
avail_size = self.read_sizes[0]
if avail_size > req_size:
# chop the packet in half
real_size = req_size
self.read_sizes[0] -= req_size
else:
# short read or full size read
real_size = avail_size
self.read_sizes.pop(0)
data = self.state.solver.BVS(
"packet_%d_%s" % (len(self.read_data), self.ident),
real_size * self.state.arch.byte_width,
key=("file", self.ident, "packet", len(self.read_data)),
)
self.read_data.append(data)
return data, real_size, None
[docs]
def write(self, pos, data, size=None, **kwargs):
raise SimFileError("Trying to write to SimPacketsSlots? Illegal")
@property
def size(self):
return sum(len(x) for x in self.read_data) // self.state.arch.byte_width
@SimStatePlugin.memo
def copy(self, memo): # pylint: disable=unused-argument
o = type(self)(self.name, self.read_sizes, ident=self.ident)
o.read_data = list(self.read_data)
return o
[docs]
def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
if any(self.read_sizes != o.read_sizes for o in others):
raise SimMergeError("Can't merge SimPacketsSlots with disparate reads")
already_read_sizes = [len(x) for x in self.read_data]
if any(already_read_sizes != [len(x) for x in o.read_data] for o in others):
raise SimMergeError("Can't merge SimPacketsSlots with disparate reads")
for i, default_var in self.read_data:
self.read_data[i] = claripy.ite_cases(
zip(merge_conditions[1:], [o.read_data[i] for o in others]), default_var
)
return True
[docs]
def widen(self, _):
raise SimMergeError("Widening the filesystem is unsupported")
from angr.errors import SimMergeError, SimFileError, SimSolverError