Source code for angr.storage.file

import claripy
import logging
import itertools

from .memory_mixins import DefaultMemory
from ..state_plugins.plugin import SimStatePlugin
from ..state_plugins.sim_action_object import SimActionObject
from .. import sim_options

l = logging.getLogger(name=__name__)

file_counter = itertools.count()
dialogue_counter = itertools.count()


[docs]class Flags: # pylint: disable=W0232, O_RDONLY = 0 O_WRONLY = 1 O_RDWR = 2 O_ACCMODE = 3 # bitmask for read/write mode O_APPEND = 0o2000 O_ASYNC = 0o20000 O_CLOEXEC = 0o2000000 # TODO mode for this flag O_CREAT = 0o100 O_DIRECT = 0o40000 O_DIRECTORY = 0o200000 O_DSYNC = 0o10000 O_EXCL = 0o200 O_LARGEFILE = 0o100000 O_NOATIME = 0o1000000 O_NOCTTY = 0o400 O_NOFOLLOW = 0o400000 O_NONBLOCK = 0o4000 O_NDELAY = 0o4000 O_PATH = 0o10000000 O_SYNC = 0o4010000 O_TMPFILE = 0o20200000 O_TRUNC = 0o1000
def _deps_unpack(a): if isinstance(a, SimActionObject): return a.ast, a.reg_deps, a.tmp_deps else: return a, None, None
[docs]class SimFileBase(SimStatePlugin): """ SimFiles are the storage mechanisms used by SimFileDescriptors. Different types of SimFiles can have drastically different interfaces, and as a result there's not much that can be specified on this base class. All the read and write methods take a ``pos`` argument, which may have different semantics per-class. ``0`` will always be a valid position to use, though, and the next position you should use is part of the return tuple. Some simfiles are "streams", meaning that the position that reads come from is determined not by the position you pass in (it will in fact be ignored), but by an internal variable. This is stored as ``.pos`` if you care to read it. Don't write to it. The same lack-of-semantics applies to this field as well. :ivar name: The name of the file. Purely for cosmetic purposes :ivar ident: The identifier of the file, typically autogenerated from the name and a nonce. Purely for cosmetic purposes, but does appear in symbolic values autogenerated in the file. :ivar seekable: Bool indicating whether seek operations on this file should succeed. If this is True, then ``pos`` must be a number of bytes from the start of the file. :ivar writable: Bool indicating whether writing to this file is allowed. :ivar pos: If the file is a stream, this will be the current position. Otherwise, None. :ivar concrete: Whether or not this file contains mostly concrete data. Will be used by some SimProcedures to choose how to handle variable-length operations like fgets. :ivar file_exists: Set to False, if file does not exists, set to a claripy Bool if unknown, default True. """ seekable = False pos = None
[docs] def __init__(self, name=None, writable=True, ident=None, concrete=False, file_exists=True, **kwargs): self.name = name self.ident = ident self.writable = writable self.concrete = concrete self.file_exists = file_exists if ident is None: self.ident = self.make_ident(self.name) if "memory_id" in kwargs: kwargs["memory_id"] = self.ident super().__init__(**kwargs)
[docs] @staticmethod def make_ident(name): if name is None: return "file" if type(name) is str: name = name.encode() def generate(): consecutive_bad = 0 for ch in name: if 0x20 <= ch <= 0x7E: consecutive_bad = 0 yield chr(ch) elif consecutive_bad < 3: consecutive_bad += 1 yield "?" nice_name = "".join(generate()) return "file_%d_%s" % (next(file_counter), nice_name)
[docs] def concretize(self, **kwargs): """ Return a concretization of the contents of the file. The type of the return value of this method will vary depending on which kind of SimFile you're using. """ raise NotImplementedError
[docs] def read(self, pos, size, **kwargs): """ Read some data from the file. :param pos: The offset in the file to read from. :param size: The size to read. May be symbolic. :return: A tuple of the data read (a bitvector of the length that is the maximum length of the read), the actual size of the read, and the new file position pointer. """ raise NotImplementedError
[docs] def write(self, pos, data, size=None, **kwargs): """ Write some data to the file. :param pos: The offset in the file to write to. May be ignored if the file is a stream or device. :param data: The data to write as a bitvector :param size: The optional size of the data to write. If not provided will default to the length of the data. Must be constrained to less than or equal to the size of the data. :return: The new file position pointer. """ raise NotImplementedError
@property def size(self): """ The number of data bytes stored by the file at present. May be a symbolic value. """ raise NotImplementedError @DefaultMemory.memo def copy(self, memo): o = super().copy(memo) o.ident = self.ident o.name = self.name o.ident = self.ident o.writable = self.writable o.concrete = self.concrete o.file_exists = self.file_exists return o
[docs]class SimFile(SimFileBase, DefaultMemory): # TODO: pick a better base class omg """ The normal SimFile is meant to model files on disk. It subclasses SimSymbolicMemory so loads and stores to/from it are very simple. :param name: The name of the file :param content: Optional initial content for the file as a string or bitvector :param size: Optional size of the file. If content is not specified, it defaults to zero :param has_end: Whether the size boundary is treated as the end of the file or a frontier at which new content will be generated. If unspecified, will pick its value based on options.FILES_HAVE_EOF. Another caveat is that if the size is also unspecified this value will default to False. :param seekable: Optional bool indicating whether seek operations on this file should succeed, default True. :param writable: Whether writing to this file is allowed :param concrete: Whether or not this file contains mostly concrete data. Will be used by some SimProcedures to choose how to handle variable-length operations like fgets. :ivar has_end: Whether this file has an EOF """
[docs] def __init__( self, name=None, content=None, size=None, has_end=None, seekable=True, writable=True, ident=None, concrete=None, **kwargs, ): kwargs["memory_id"] = kwargs.get("memory_id", "file") super().__init__(name=name, writable=writable, ident=ident, **kwargs) self._size = size self.has_end = has_end self.seekable = seekable # this is hacky because we need to work around not having a state yet content = _deps_unpack(content)[0] if type(content) is bytes: if concrete is None: concrete = True content = claripy.BVV(content) elif type(content) is str: if concrete is None: concrete = True content = claripy.BVV(content.encode()) elif content is None: pass elif isinstance(content, claripy.Bits): if concrete is None and not content.symbolic: concrete = True pass else: raise TypeError("Can't handle SimFile content of type %s" % type(content)) if concrete is None: concrete = False self.concrete = concrete if content is not None: self.__content = content if self._size is None: self._size = len(content) // 8 else: if self._size is None: self._size = 0 if has_end is None: self.has_end = False
@property def category(self): # override trying to determine from self.id to allow arbitrary idents return "file"
[docs] def set_state(self, state): super().set_state(state) try: content = self.__content except AttributeError: pass else: self.store(0, content) del self.__content if self.has_end is None: self.has_end = sim_options.FILES_HAVE_EOF in state.options if type(self._size) is int: self._size = claripy.BVV(self._size, state.arch.bits) elif len(self._size) != state.arch.bits: raise TypeError("SimFile size must be a bitvector of size %d (arch.bits)" % state.arch.bits)
@property def size(self): return self._size
[docs] def concretize(self, **kwargs): """ Return a concretization of the contents of the file, as a flat bytestring. """ size = self.state.solver.min(self._size, **kwargs) data = self.load(0, size) kwargs["cast_to"] = kwargs.get("cast_to", bytes) kwargs["extra_constraints"] = tuple(kwargs.get("extra_constraints", ())) + (self._size == size,) return self.state.solver.eval(data, **kwargs)
[docs] def read(self, pos, size, **kwargs): disable_actions = kwargs.pop("disable_actions", False) inspect = kwargs.pop("inspect", True) # Step 1: figure out a reasonable concrete size to use for the memory load # since we don't want to concretize anything if self.state.solver.symbolic(size): try: passed_max_size = self.state.solver.max( size, extra_constraints=(size < self.state.libc.max_packet_size,) ) except SimSolverError: passed_max_size = self.state.solver.min(size) l.warning("Symbolic read size is too large for threshold - concretizing to min (%d)", passed_max_size) self.state.add_constraints(size == passed_max_size) else: passed_max_size = self.state.solver.eval(size) if passed_max_size > 2**13: l.warning("Program performing extremely large reads") # Step 2.1: check for the possibility of EOFs # If it's not possible to EOF (because there's no EOF), this is very simple! if not self.has_end: # bump the storage size as we read self._size = self.state.solver.If(size + pos > self._size, size + pos, self._size) return self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect), size, size + pos # Step 2.2: check harder for the possibility of EOFs # This is the size if we're reading to the end of the file distance_to_eof = self._size - pos distance_to_eof = self.state.solver.If(self.state.solver.SLE(distance_to_eof, 0), 0, distance_to_eof) # try to frontload some constraint solving to see if it's impossible for this read to EOF if self.state.solver.satisfiable(extra_constraints=(size > distance_to_eof,)): # it's possible to EOF # final size = min(passed_size, max(distance_to_eof, 0)) real_size = self.state.solver.If(size >= distance_to_eof, distance_to_eof, size) return ( self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect), real_size, real_size + pos, ) else: # it's not possible to EOF # we don't need to constrain or min/max the output size because there are already constraints asserting # that the total filesize is pretty big # note: this assumes that constraints cannot be removed return self.load(pos, passed_max_size, disable_actions=disable_actions, inspect=inspect), size, size + pos
[docs] def write(self, pos, data, size=None, events=True, **kwargs): if events: self.state.history.add_event("fs_write", filename=self.name, data=data, size=size, pos=pos) data = _deps_unpack(data)[0] if size is None: size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.Bits) else len(data) # \(_^^)/ self.store(pos, data, size=size) new_end = _deps_unpack(pos + size)[0] # decline to store SAO self._size = self.state.solver.If(new_end > self._size, new_end, self._size) return new_end
@SimStatePlugin.memo def copy(self, memo): o = super().copy(memo) o.name = self.name o._size = self._size o.has_end = self.has_end o.seekable = self.seekable o.writable = self.writable o.concrete = self.concrete o.file_exists = self.file_exists return o
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument if not all(type(o) is type(self) for o in others): raise SimMergeError("Cannot merge files of disparate type") if any(o.has_end != self.has_end for o in others): raise SimMergeError("Cannot merge files where some have ends and some don't") self._size = self.state.solver.ite_cases(zip(merge_conditions[1:], (o._size for o in others)), self._size) return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs] def widen(self, _): raise SimMergeError("Widening the filesystem is unsupported")
[docs]class SimFileStream(SimFile): """ A specialized SimFile that uses a flat memory backing, but functions as a stream, tracking its position internally. The pos argument to the read and write methods will be ignored, and will return None. Instead, there is an attribute ``pos`` on the file itself, which will give you what you want. :param name: The name of the file, for cosmetic purposes :param pos: The initial position of the file, default zero :param kwargs: Any other keyword arguments will go on to the SimFile constructor. :ivar pos: The current position in the file. """
[docs] def __init__(self, name=None, content=None, pos=0, **kwargs): super().__init__(name=name, content=content, **kwargs) self.pos = pos
[docs] def set_state(self, state): super().set_state(state) if type(self.pos) is int: self.pos = state.solver.BVV(self.pos, state.arch.bits) elif len(self.pos) != state.arch.bits: raise TypeError("SimFileStream position must be a bitvector of size %d (arch.bits)" % state.arch.bits)
[docs] def read(self, pos, size, **kwargs): no_stream = kwargs.pop("no_stream", False) if not no_stream: pos = self.pos data, size, pos = super().read(pos, size, **kwargs) if not no_stream: self.pos = pos return data, size, pos
[docs] def write(self, _, data, size=None, **kwargs): self.pos = super().write(self.pos, data, size, **kwargs) return None
@SimStatePlugin.memo def copy(self, memo): c = super().copy(memo) c.pos = self.pos return c
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument self.pos = self.state.solver.ite_cases(zip(merge_conditions[1:], [o.pos for o in others]), self.pos) return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs]class SimPackets(SimFileBase): """ The SimPackets is meant to model inputs whose content is delivered a series of asynchronous chunks. The data is stored as a list of read or write results. For symbolic sizes, state.libc.max_packet_size will be respected. If the SHORT_READS option is enabled, reads will return a symbolic size constrained to be less than or equal to the requested size. A SimPackets cannot be used for both reading and writing - for socket objects that can be both read and written to you should use a file descriptor to multiplex the read and write operations into two separate file storage mechanisms. :param name: The name of the file, for cosmetic purposes :param write_mode: Whether this file is opened in read or write mode. If this is unspecified it will be autodetected. :param content: Some initial content to use for the file. Can be a list of bytestrings or a list of tuples of content ASTs and size ASTs. :ivar write_mode: See the eponymous parameter :ivar content: A list of packets, as tuples of content ASTs and size ASTs. """
[docs] def __init__(self, name, write_mode=None, content=None, writable=True, ident=None, **kwargs): super().__init__(name, writable=writable, ident=ident, **kwargs) self.write_mode = write_mode self.content = content self.sanitized = 0 if self.content is None: self.content = [] else: self.content = [ x if type(x) is tuple else (x, len(x) // 8) if isinstance(x, claripy.Bits) else (x.ast, len(x) // 8) if isinstance(x, SimActionObject) else (claripy.BVV(x), len(x)) if type(x) is bytes else None for x in self.content ] if any(x is None for x in self.content): raise TypeError("Bad type in initial SimPacket content")
[docs] def set_state(self, state): super().set_state(state) # sanitize the lengths in self.content now that we know the wordsize # getattr because we want to support old pickles without this attribute (TODO remove this) for i in range(getattr(self, "sanitized", 0), len(self.content)): data, length = self.content[i] if type(length) is int: self.content[i] = (data, claripy.BVV(length, state.arch.bits)) elif len(length) < state.arch.bits: self.content[i] = (data, length.zero_extend(state.arch.bits - len(length))) elif len(length) != state.arch.bits: raise TypeError("Bad bitvector size for length in SimPackets.content") self.sanitized = len(self.content)
@property def size(self): return sum(x[1] for x in self.content)
[docs] def concretize(self, **kwargs): """ Returns a list of the packets read or written as bytestrings. """ lengths = [self.state.solver.eval(x[1], **kwargs) for x in self.content] kwargs["cast_to"] = bytes sizes = [x[0].size() for x in self.content] return [ b"" if i == 0 else self.state.solver.eval(x[0][: size - i * self.state.arch.byte_width], **kwargs) for i, size, x in zip(lengths, sizes, self.content) ]
[docs] def read(self, pos, size, **kwargs): """ Read a packet from the stream. :param int pos: The packet number to read from the sequence of the stream. May be None to append to the stream. :param size: The size to read. May be symbolic. :param short_reads: Whether to replace the size with a symbolic value constrained to less than or equal to the original size. If unspecified, will be chosen based on the state option. :return: A tuple of the data read (a bitvector of the length that is the maximum length of the read) and the actual size of the read. """ short_reads = kwargs.pop("short_reads", None) # sanity check on read/write modes if self.write_mode is None: self.write_mode = False elif self.write_mode is True: raise SimFileError("Cannot read and write to the same SimPackets") # sanity check on packet number and determine if data is already present if pos is None: pos = len(self.content) if pos < 0: raise SimFileError("SimPacket.read(%d): Negative packet number?" % pos) elif pos > len(self.content): raise SimFileError("SimPacket.read(%d): Packet number is past frontier of %d?" % (pos, len(self.content))) elif pos != len(self.content): _, realsize = self.content[pos] self.state.add_constraints(realsize <= size) # assert that the packet fits within the read request if not self.state.solver.satisfiable(): raise SimFileError( "SimPackets could not fit the current packet into the read " f"request of {size} bytes: {self.content[pos]}" ) return self.content[pos] + (pos + 1,) # Type check if type(size) is int: size = self.state.solver.BVV(size, self.state.arch.bits) # The read is on the frontier. let's generate a new packet. orig_size = size max_size = None # if short reads are enabled, replace size with a symbol if short_reads is True or (short_reads is None and sim_options.SHORT_READS in self.state.options): size = self.state.solver.BVS( "packetsize_%d_%s" % (len(self.content), self.ident), self.state.arch.bits, key=("file", self.ident, "packetsize", len(self.content)), ) self.state.add_constraints(size <= orig_size) # figure out the maximum size of the read if not self.state.solver.symbolic(size): max_size = self.state.solver.eval(size) elif self.state.solver.satisfiable(extra_constraints=(size <= self.state.libc.max_packet_size,)): l.info("Constraining symbolic packet size to be less than %d", self.state.libc.max_packet_size) if not self.state.solver.is_true(orig_size <= self.state.libc.max_packet_size): self.state.add_constraints(size <= self.state.libc.max_packet_size) if not self.state.solver.symbolic(orig_size): max_size = min(self.state.solver.eval(orig_size), self.state.libc.max_packet_size) else: max_size = self.state.solver.max(size) else: max_size = self.state.solver.min(size) l.warning( "Could not constrain symbolic packet size to <= %d; using minimum %d for size", self.state.libc.max_packet_size, max_size, ) self.state.add_constraints(size == max_size) # generate the packet data and return it data = self.state.solver.BVS( "packet_%d_%s" % (len(self.content), self.ident), max_size * self.state.arch.byte_width, key=("file", self.ident, "packet", len(self.content)), ) packet = (data, size) self.content.append(packet) return packet + (pos + 1,)
[docs] def write(self, pos, data, size=None, events=True, **kwargs): """ Write a packet to the stream. :param int pos: The packet number to write in the sequence of the stream. May be None to append to the stream. :param data: The data to write, as a string or bitvector. :param size: The optional size to write. May be symbolic; must be constrained to at most the size of data. :return: The next packet to use after this """ if events: self.state.history.add_event("fs_write", filename=self.name, data=data, size=size, pos=pos) # sanity check on read/write modes if self.write_mode is None: self.write_mode = True elif self.write_mode is False: raise SimFileError("Cannot read and write to the same SimPackets") data = _deps_unpack(data)[0] if type(data) is bytes: data = claripy.BVV(data) if size is None: size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.Bits) else len(data) if type(size) is int: size = self.state.solver.BVV(size, self.state.arch.bits) # sanity check on packet number and determine if data is already present if pos is None: pos = len(self.content) if pos < 0: raise SimFileError("SimPacket.write(%d): Negative packet number?" % pos) elif pos > len(self.content): raise SimFileError("SimPacket.write(%d): Packet number is past frontier of %d?" % (pos, len(self.content))) elif pos != len(self.content): realdata, realsize = self.content[pos] maxlen = max(len(realdata), len(data)) self.state.add_constraints(realdata[maxlen - 1 : 0] == data[maxlen - 1 : 0]) self.state.add_constraints(size == realsize) if not self.state.solver.satisfiable(): raise SimFileError("Packet write equality constraints made state unsatisfiable???") return pos + 1 # write it out! self.content.append((_deps_unpack(data)[0], size)) return pos + 1
@SimStatePlugin.memo def copy(self, memo): # pylint: disable=unused-argument o = type(self)( name=self.name, write_mode=self.write_mode, content=self.content, ident=self.ident, concrete=self.concrete ) o.sanitized = getattr(self, "sanitized", 0) return o
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument for o in others: if o.write_mode is None: continue elif self.write_mode is None: self.write_mode = o.write_mode elif self.write_mode is not o.write_mode: raise SimMergeError("Cannot merge SimPackets with disparate write_mode") for o in others: if len(o.content) != len(self.content): raise SimMergeError("Cannot merge SimPackets with disparate number of packets") for i, default in enumerate(self.content): max_data_length = max(len(default[0]), max(len(o.content[i][0]) for o in others)) merged_data = self.state.solver.ite_cases( zip( merge_conditions[1:], (o.content[i][0].concat(claripy.BVV(0, max_data_length - len(o.content[i][0]))) for o in others), ), default[0], ) merged_size = self.state.solver.ite_cases( zip(merge_conditions[1:], (o.content[i][1] for o in others)), default[1] ) self.content[i] = (merged_data, merged_size) return True
[docs] def widen(self, _): raise SimMergeError("Widening the filesystem is unsupported")
[docs]class SimPacketsStream(SimPackets): """ A specialized SimPackets that tracks its position internally. The pos argument to the read and write methods will be ignored, and will return None. Instead, there is an attribute ``pos`` on the file itself, which will give you what you want. :param name: The name of the file, for cosmetic purposes :param pos: The initial position of the file, default zero :param kwargs: Any other keyword arguments will go on to the SimPackets constructor. :ivar pos: The current position in the file. """
[docs] def __init__(self, name, pos=0, **kwargs): super().__init__(name, **kwargs) self.pos = pos
[docs] def read(self, pos, size, **kwargs): no_stream = kwargs.pop("no_stream", False) if not no_stream: pos = self.pos data, size, pos = super().read(pos, size, **kwargs) if not no_stream: self.pos = pos return data, size, pos
[docs] def write(self, _, data, size=None, **kwargs): self.pos = super().write(self.pos, data, size, **kwargs) return None
@SimStatePlugin.memo def copy(self, memo): c = super().copy(memo) c.pos = self.pos return c
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument if any(o.pos != self.pos for o in others): raise SimMergeError("Can't merge SimPacketsStreams with disparate positions") return super().merge(others, merge_conditions, common_ancestor=common_ancestor)
[docs]class SimFileDescriptorBase(SimStatePlugin): """ The base class for implementations of POSIX file descriptors. All file descriptors should respect the CONCRETIZE_SYMBOLIC_{READ,WRITE}_SIZES state options. """
[docs] def read(self, pos, size, **kwargs): """ Reads some data from the file, storing it into memory. :param pos: The address to write the read data into memory :param size: The requested length of the read :return: The real length of the read """ data, realsize = self.read_data(size, **kwargs) if not self.state.solver.is_true(realsize == 0): do_concrete_update = kwargs.pop("do_concrete_update", False) if do_concrete_update: concrete_data = claripy.BVV(self.state.solver.eval(data), data.size()) self.state.memory.store(pos, concrete_data, action=None, inspect=False) self.state.memory.store(pos, data, size=realsize) return realsize
[docs] def write(self, pos, size, **kwargs): """ Writes some data, loaded from the state, into the file. :param pos: The address to read the data to write from in memory :param size: The requested size of the write :return: The real length of the write """ if type(pos) is str: raise TypeError("SimFileDescriptor.write takes an address and size. Did you mean write_data?") # Find a reasonable concrete size for the load since we don't want to concretize anything # This is copied from SimFile.read # TODO: refactor into a generic concretization strategy? if self.state.solver.symbolic(size): try: passed_max_size = self.state.solver.max( size, extra_constraints=(size < self.state.libc.max_packet_size,) ) except SimSolverError: passed_max_size = self.state.solver.min(size) l.warning("Symbolic write size is too large for threshold - concretizing to min (%d)", passed_max_size) self.state.add_constraints(size == passed_max_size) else: passed_max_size = self.state.solver.eval(size) if passed_max_size > 2**13: l.warning("Program performing extremely large write") data = self.state.memory.load(pos, passed_max_size) return self.write_data(data, size, **kwargs)
[docs] def read_data(self, size, **kwargs): """ Reads some data from the file, returning the data. :param size: The requested length of the read :return: A tuple of the data read and the real length of the read """ raise NotImplementedError
[docs] def write_data(self, data, size=None, **kwargs): """ Write some data, provided as an argument into the file. :param data: A bitvector to write into the file :param size: The requested size of the write (may be symbolic) :return: The real length of the write """ raise NotImplementedError
[docs] def seek(self, offset, whence="start"): """ Seek the file descriptor to a different position in the file. :param offset: The offset to seek to, interpreted according to whence :param whence: What the offset is relative to; one of the strings "start", "current", or "end" :return: A symbolic boolean describing whether the seek succeeded or not """ raise NotImplementedError
[docs] def tell(self): """ Return the current position, or None if the concept doesn't make sense for the given file. """ raise NotImplementedError
[docs] def eof(self): """ Return the EOF status. May be a symbolic boolean. """ raise NotImplementedError
[docs] def size(self): """ Return the size of the data stored in the file in bytes, or None if the concept doesn't make sense for the given file. """ raise NotImplementedError
@property def read_storage(self): """ Return the SimFile backing reads from this fd """ raise NotImplementedError @property def write_storage(self): """ Return the SimFile backing writes to this fd """ raise NotImplementedError @property def read_pos(self): """ Return the current position of the read file pointer. If the underlying read file is a stream, this will return the position of the stream. Otherwise, will return the position of the file descriptor in the file. """ raise NotImplementedError @property def write_pos(self): """ Return the current position of the read file pointer. If the underlying read file is a stream, this will return the position of the stream. Otherwise, will return the position of the file descriptor in the file. """ raise NotImplementedError
[docs] def concretize(self, **kwargs): """ Return a concretizeation of the data in the underlying file. Has different return types to represent different data structures on a per-class basis. Any arguments passed to this will be passed onto state.solver.eval. """ raise NotImplementedError
@property def file_exists(self): """ This should be True in most cases. Only if we opened an fd of unknown existence, ALL_FILES_EXIST is False and ANY_FILE_MIGHT_EXIST is True, this is a symbolic boolean. """ return True def _prep_read(self, size): return self._prep_generic(size, True) def _prep_write(self, size): return self._prep_generic(size, False) def _prep_generic(self, size, is_read): option = ( sim_options.CONCRETIZE_SYMBOLIC_FILE_READ_SIZES if is_read else sim_options.CONCRETIZE_SYMBOLIC_WRITE_SIZES ) string = "read" if is_read else "write" # check if we need to concretize the length if option in self.state.options and self.state.solver.symbolic(size): try: size = self.state.solver.max(size, extra_constraints=(size <= self.state.libc.max_packet_size,)) except SimSolverError: size = self.state.solver.min(size) l.info("Concretizing symbolic %s size to %d", string, size) return size
[docs]class SimFileDescriptor(SimFileDescriptorBase): """ A simple file descriptor forwarding reads and writes to a SimFile. Contains information about the current opened state of the file, such as the flags or (if relevant) the current position. :ivar file: The SimFile described to by this descriptor :ivar flags: The mode that the file descriptor was opened with, a bitfield of flags """
[docs] def __init__(self, simfile, flags=0): super().__init__() self.file = simfile self._pos = 0 self.flags = flags
[docs] def read_data(self, size, **kwargs): size = self._prep_read(size) data, realsize, self._pos = self.file.read(self._pos, size) return data, realsize
[docs] def write_data(self, data, size=None, **kwargs): if self.flags & Flags.O_APPEND and self.file.seekable: self._pos = self.file.size data = _deps_unpack(data)[0] if size is None: size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.Bits) else len(data) size = self._prep_write(size) self._pos = self.file.write(self._pos, data, size) return size
[docs] def seek(self, offset, whence="start"): if not self.file.seekable: return claripy.false if type(offset) is int: offset = self.state.solver.BVV(offset, self.state.arch.bits) if whence == "start": new_pos = offset elif whence == "current": new_pos = self._pos + offset elif whence == "end": new_pos = self.file.size + offset success_condition = self.state.solver.And( self.state.solver.SGE(new_pos, 0), self.state.solver.SLE(new_pos, self.file.size) ) self._pos = _deps_unpack(self.state.solver.If(success_condition, new_pos, self._pos))[0] return success_condition
[docs] def eof(self): if not self.file.seekable: return claripy.false if not getattr(self.file, "has_end", True): return claripy.false return self._pos == self.file.size
[docs] def tell(self): if not self.file.seekable: return None return self._pos
[docs] def size(self): return self.file.size
[docs] def concretize(self, **kwargs): """ Return a concretization of the underlying file. Returns whatever format is preferred by the file. """ return self.file.concretize(**kwargs)
@property def file_exists(self): return self.file.file_exists @property def read_storage(self): return self.file @property def write_storage(self): return self.file @property def read_pos(self): if self.file.pos is not None: return self.file.pos return self._pos @property def write_pos(self): if self.file.pos is not None: return self.file.pos return self._pos
[docs] def set_state(self, state): self.file.set_state(state) super().set_state(state)
@SimStatePlugin.memo def copy(self, memo): c = type(self)(self.file.copy(memo), self.flags) c._pos = self._pos return c
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument # do NOT merge file content - descriptors do not have ownership, prevent duplicate merging if not all(type(o) is type(self) for o in others): l.error("Cannot merge SimFileDescriptors of disparate types") return False if not all(o.flags == self.flags for o in others): l.error("Cannot merge SimFileDescriptors of disparate flags") return False if type(self._pos) is int and all(type(o._pos) is int for o in others): # TODO: we can do slightly better for packet-based things by having some packets have a "guard condition" # which makes them zero length if they're not merged in if any(o._pos != self._pos for o in others): raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets") elif self._pos is None and all(o._pos is None for o in others): pass elif self._pos is None or any(o._pos is None for o in others): raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!") else: self._pos = self.state.solver.ite_cases(zip(merge_conditions[1:], (o._pos for o in others)), self._pos) return True
[docs] def widen(self, _): raise SimMergeError("Widening the filesystem is unsupported")
[docs]class SimFileDescriptorDuplex(SimFileDescriptorBase): """ A file descriptor that refers to two file storage mechanisms, one to read from and one to write to. As a result, operations like seek, eof, etc no longer make sense. :param read_file: The SimFile to read from :param write_file: The SimFile to write to """
[docs] def __init__(self, read_file, write_file): super().__init__() self._read_file = read_file self._write_file = write_file self._read_pos = 0 self._write_pos = 0
[docs] def read_data(self, size, **kwargs): size = self._prep_read(size) data, realsize, self._read_pos = self._read_file.read(self._read_pos, size) return data, realsize
[docs] def write_data(self, data, size=None, **kwargs): data = _deps_unpack(data)[0] if size is None: size = len(data) // self.state.arch.byte_width if isinstance(data, claripy.Bits) else len(data) size = self._prep_write(size) self._write_pos = self._write_file.write(self._write_pos, data, size) return size
[docs] def set_state(self, state): self._read_file.set_state(state) self._write_file.set_state(state) super().set_state(state)
[docs] def eof(self): # the thing that makes the most sense is for this to refer to the read eof status... if not self._read_file.seekable: return claripy.false if not getattr(self._read_file, "has_end", True): return claripy.false return self._read_pos == self._read_file.size
[docs] def tell(self): return None
[docs] def seek(self, offset, whence="start"): return claripy.false
[docs] def size(self): return None
[docs] def concretize(self, **kwargs): """ Return a concretization of the underlying files, as a tuple of (read file, write file). """ return (self._read_file.concretize(**kwargs), self._write_file.concretize(**kwargs))
@property def read_storage(self): return self._read_file @property def write_storage(self): return self._write_file @property def read_pos(self): if self._read_file.pos is not None: return self._read_file.pos return self._read_pos @property def write_pos(self): if self._write_file.pos is not None: return self._write_file.pos return self._write_pos @SimStatePlugin.memo def copy(self, memo): c = type(self)(self._read_file.copy(memo), self._write_file.copy(memo)) c._read_pos = self._read_pos c._write_pos = self._write_pos return c
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument # do NOT merge storage mechanisms here - fs and posix handle that if not all(type(o) is type(self) for o in others): raise SimMergeError("Cannot merge SimFileDescriptors of disparate types") if type(self._read_pos) is int and all(type(o._read_pos) is int for o in others): if any(o._read_pos != self._read_pos for o in others): raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets") elif self._read_pos is None and all(o._read_pos is None for o in others): pass elif self._read_pos is None or any(o._read_pos is None for o in others): raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!") else: self._read_pos = self.state.solver.ite_cases( zip(merge_conditions[1:], (o._read_pos for o in others)), self._read_pos ) if type(self._write_pos) is int and all(type(o._write_pos) is int for o in others): if any(o._write_pos != self._write_pos for o in others): raise SimMergeError("Cannot merge SimFileDescriptors over SimPackets with disparate number of packets") elif self._write_pos is None and all(o._write_pos is None for o in others): pass elif self._write_pos is None or any(o._write_pos is None for o in others): raise SimMergeError("Cannot merge SimFileDescriptors with inconsistent None-position - please report this!") else: self._write_pos = self.state.solver.ite_cases( zip(merge_conditions[1:], (o._write_pos for o in others)), self._write_pos ) return True
[docs] def widen(self, _): raise SimMergeError("Widening the filesystem is unsupported")
[docs]class SimPacketsSlots(SimFileBase): """ SimPacketsSlots is the new SimDialogue, if you've ever seen that before. The idea is that in some cases, the only thing you really care about is getting the lengths of reads right, and some of them should be short reads, and some of them should be truncated. You provide to this class a list of read lengths, and it figures out the length of each read, and delivers some content. This class will NOT respect the position argument you pass it - this storage is not stateless. """ seekable = False
[docs] def __init__(self, name, read_sizes, ident=None, **kwargs): super().__init__(name, writable=False, ident=ident) self.read_sizes = read_sizes self.read_data = []
[docs] def concretize(self, **kwargs): return [self.state.solver.eval(var, cast_to=bytes, **kwargs) for var in self.read_data]
[docs] def read(self, pos, size, **kwargs): if not self.read_sizes: return self.state.solver.BVV(0, 0), 0, None try: req_size = self.state.solver.eval_one(size) except SimSolverError: raise SimFileError("SimPacketsSlots can't handle multivalued read sizes") avail_size = self.read_sizes[0] if avail_size > req_size: # chop the packet in half real_size = req_size self.read_sizes[0] -= req_size else: # short read or full size read real_size = avail_size self.read_sizes.pop(0) data = self.state.solver.BVS( "packet_%d_%s" % (len(self.read_data), self.ident), real_size * self.state.arch.byte_width, key=("file", self.ident, "packet", len(self.read_data)), ) self.read_data.append(data) return data, real_size, None
[docs] def write(self, pos, data, size=None, **kwargs): raise SimFileError("Trying to write to SimPacketsSlots? Illegal")
@property def size(self): return sum(len(x) for x in self.read_data) // self.state.arch.byte_width @SimStatePlugin.memo def copy(self, memo): # pylint: disable=unused-argument o = type(self)(self.name, self.read_sizes, ident=self.ident) o.read_data = list(self.read_data) return o
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument if any(self.read_sizes != o.read_sizes for o in others): raise SimMergeError("Can't merge SimPacketsSlots with disparate reads") already_read_sizes = [len(x) for x in self.read_data] if any(already_read_sizes != [len(x) for x in o.read_data] for o in others): raise SimMergeError("Can't merge SimPacketsSlots with disparate reads") for i, default_var in self.read_data: self.read_data[i] = self.state.solver.ite_cases( zip(merge_conditions[1:], [o.read_data[i] for o in others]), default_var ) return True
[docs] def widen(self, _): raise SimMergeError("Widening the filesystem is unsupported")
from ..errors import SimMergeError, SimFileError, SimSolverError