Source code for angr.state_plugins.heap.heap_ptmalloc

from ..plugin import SimStatePlugin
from .heap_freelist import SimHeapFreelist, Chunk
from .utils import concretize

from ...errors import SimHeapError, SimMergeError, SimSolverError

import logging

l = logging.getLogger("angr.state_plugins.heap.heap_ptmalloc")
sml = logging.getLogger("angr.state_plugins.symbolic_memory")


# These are included as sometimes the heap will touch uninitialized locations, which normally causes a warning
[docs]def silence_logger(): level = sml.getEffectiveLevel() sml.setLevel("ERROR") return level
[docs]def unsilence_logger(level): sml.setLevel(level)
[docs]class PTChunk(Chunk): """ A chunk, inspired by the implementation of chunks in ptmalloc. Provides a representation of a chunk via a view into the memory plugin. For the chunk definitions and docs that this was loosely based off of, see glibc malloc/malloc.c, line 1033, as of commit 5a580643111ef6081be7b4c7bd1997a5447c903f. Alternatively, take the following link.;a=blob;f=malloc/malloc.c;h=67cdfd0ad2f003964cd0f7dfe3bcd85ca98528a7;hb=5a580643111ef6081be7b4c7bd1997a5447c903f#l1033 :ivar base: the location of the base of the chunk in memory :ivar state: the program state that the chunk is resident in :ivar heap: the heap plugin that the chunk is managed by """
[docs] def __init__(self, base, sim_state, heap=None): super().__init__(base, sim_state) # This is necessary since the heap can't always be referenced through the state, e.g. during heap initialization self.heap = self.state.heap if heap is None else heap # Size in bytes of the type used to store a piece of metadata self._chunk_size_t_size = self.heap._chunk_size_t_size self._chunk_min_size = self.heap._chunk_min_size self._chunk_align_mask = self.heap._chunk_align_mask
[docs] def get_size(self): return self.state.memory.load(self.base + self._chunk_size_t_size, self._chunk_size_t_size) & ~CHUNK_FLAGS_MASK
[docs] def get_data_size(self): chunk_size = self.get_size() if self.is_free(): return chunk_size - 4 * self._chunk_size_t_size else: return chunk_size - 2 * self._chunk_size_t_size
def _set_leading_size(self, size): level = silence_logger() chunk_flags = ( self.state.memory.load(self.base + self._chunk_size_t_size, self._chunk_size_t_size) & CHUNK_FLAGS_MASK ) unsilence_logger(level) + self._chunk_size_t_size, size | chunk_flags, size=self.state.arch.bytes) def _set_trailing_size(self, size): if self.is_free(): next_chunk = self.next_chunk() if next_chunk is not None:, size, self.state.arch.bytes)
[docs] def set_size(self, size, is_free=None): # pylint:disable=arguments-differ """ Use this to set the size on a chunk. When the chunk is new (such as when a free chunk is shrunk to form an allocated chunk and a remainder free chunk) it is recommended that the is_free hint be used since setting the size depends on the chunk's freeness, and vice versa. :param size: size of the chunk :param is_free: boolean indicating the chunk's freeness """ self._set_leading_size(size) next_chunk = self.next_chunk() if is_free is not None: if next_chunk is not None: next_chunk.set_prev_freeness(is_free) else: self.heap._set_final_freeness(is_free) if is_free is not None and is_free or self.is_free(): if next_chunk is not None:, size, size=self.state.arch.bytes)
[docs] def set_prev_freeness(self, is_free): """ Sets (or unsets) the flag controlling whether the previous chunk is free. :param is_free: if True, sets the previous chunk to be free; if False, sets it to be allocated """ level = silence_logger() size_field = self.state.memory.load(self.base + self._chunk_size_t_size, self._chunk_size_t_size) unsilence_logger(level) if is_free: self.base + self._chunk_size_t_size, size_field & ~CHUNK_P_MASK, size=self.state.arch.bytes ) else: self.base + self._chunk_size_t_size, size_field | CHUNK_P_MASK, size=self.state.arch.bytes )
[docs] def is_prev_free(self): """ Returns a concrete state of the flag indicating whether the previous chunk is free or not. Issues a warning if that flag is symbolic and has multiple solutions, and then assumes that the previous chunk is free. :returns: True if the previous chunk is free; False otherwise """ flag = self.state.memory.load(self.base + self._chunk_size_t_size, self._chunk_size_t_size) & CHUNK_P_MASK def sym_flag_handler(flag): l.warning("A chunk's P flag is symbolic; assuming it is not set") return self.state.solver.min_int(flag) flag = concretize(flag, self.state.solver, sym_flag_handler) return not flag
[docs] def prev_size(self): """ Returns the size of the previous chunk, masking off what would be the flag bits if it were in the actual size field. Performs NO CHECKING to determine whether the previous chunk size is valid (for example, when the previous chunk is not free, its size cannot be determined). """ return self.state.memory.load(self.base, self._chunk_size_t_size) & ~CHUNK_FLAGS_MASK
[docs] def is_free(self): next_chunk = self.next_chunk() if next_chunk is not None: return next_chunk.is_prev_free() else: flag = ( self.state.memory.load( self.heap.heap_base + self.heap.heap_size - self._chunk_size_t_size, self._chunk_size_t_size ) & CHUNK_P_MASK ) def sym_flag_handler(flag): l.warning("The final P flag is symbolic; assuming it is not set") return self.state.solver.min_int(flag) flag = concretize(flag, self.state.solver, sym_flag_handler) return not flag
[docs] def data_ptr(self): return self.base + (2 * self._chunk_size_t_size)
[docs] def next_chunk(self): """ Returns the chunk immediately following (and adjacent to) this one, if it exists. :returns: The following chunk, or None if applicable """ def sym_base_handler(base): l.warning("A computed chunk base is symbolic; maximizing it") return self.state.solver.max_int(base) base = concretize(self.base + self.get_size(), self.state.solver, sym_base_handler) if base >= self.heap.heap_base + self.heap.heap_size - 2 * self._chunk_size_t_size: return None else: return PTChunk(base, self.state)
[docs] def prev_chunk(self): """ Returns the chunk immediately prior (and adjacent) to this one, if that chunk is free. If the prior chunk is not free, then its base cannot be located and this method raises an error. :returns: If possible, the previous chunk; otherwise, raises an error """ if self.is_prev_free(): return PTChunk(self.base - self.prev_size(), self.state) else: raise SimHeapError("Attempted to access the previous chunk, but it was not free")
[docs] def fwd_chunk(self): """ Returns the chunk following this chunk in the list of free chunks. If this chunk is not free, then it resides in no such list and this method raises an error. :returns: If possible, the forward chunk; otherwise, raises an error """ if self.is_free(): base = self.state.memory.load( self.base + 2 * self._chunk_size_t_size, self._chunk_size_t_size, endness=self.state.arch.memory_endness ) return PTChunk(base, self.state) else: raise SimHeapError("Attempted to access the forward chunk of an allocated chunk")
[docs] def set_fwd_chunk(self, fwd): self.base + 2 * self._chunk_size_t_size, fwd.base, endness=self.state.arch.memory_endness, size=self.state.arch.bytes, )
[docs] def bck_chunk(self): """ Returns the chunk backward from this chunk in the list of free chunks. If this chunk is not free, then it resides in no such list and this method raises an error. :returns: If possible, the backward chunk; otherwise, raises an error """ if self.is_free(): base = self.state.memory.load( self.base + 3 * self._chunk_size_t_size, self._chunk_size_t_size, endness=self.state.arch.memory_endness ) return PTChunk(base, self.state) else: raise SimHeapError("Attempted to access the backward chunk of an allocated chunk")
[docs] def set_bck_chunk(self, bck): self.base + 3 * self._chunk_size_t_size, bck.base, endness=self.state.arch.memory_endness, size=self.state.arch.bytes, )
[docs]class PTChunkIterator:
[docs] def __init__(self, chunk, cond=lambda chunk: True): self.chunk = chunk self.cond = cond
def __iter__(self): return self def __next__(self): if self.chunk is None: raise StopIteration if self.cond(self.chunk): ret = self.chunk self.chunk = self.chunk.next_chunk() else: while self.chunk is not None and not self.cond(self.chunk): self.chunk = self.chunk.next_chunk() if self.chunk is None: raise StopIteration ret = self.chunk self.chunk = self.chunk.next_chunk() return ret
[docs]class SimHeapPTMalloc(SimHeapFreelist): """ A freelist-style heap implementation inspired by ptmalloc. The chunks used by this heap contain heap metadata in addition to user data. While the real-world ptmalloc is implemented using multiple lists of free chunks (corresponding to their different sizes), this more basic model uses a single list of chunks and searches for free chunks using a first-fit algorithm. **NOTE:** The plugin must be registered using ``register_plugin`` with name ``heap`` in order to function properly. :ivar heap_base: the address of the base of the heap in memory :ivar heap_size: the total size of the main memory region managed by the heap in memory :ivar mmap_base: the address of the region from which large mmap allocations will be made :ivar free_head_chunk: the head of the linked list of free chunks in the heap """
[docs] def __init__(self, heap_base=None, heap_size=None): super().__init__(heap_base, heap_size) # All of these depend on the state and so are initialized in init_state self._free_head_chunk_exists = True # Only used during plugin copy due to the dependency on the memory plugin self._free_head_chunk_init_base = None # Same as above self._chunk_size_t_size = None # Size (bytes) of the type used to store a piece of metadata self._chunk_min_size = None # Based on needed fields for any chunk self._chunk_align_mask = 0 self.free_head_chunk = None self._initialized = False
@SimStatePlugin.memo def copy(self, memo): # pylint: disable=unused-argument o = super().copy(memo) o._free_head_chunk_exists = self.free_head_chunk is not None o._free_head_chunk_init_base = self.free_head_chunk.base if self.free_head_chunk is not None else None o._initialized = self._initialized return o
[docs] def chunks(self): return PTChunkIterator(PTChunk(self.heap_base, self.state))
[docs] def allocated_chunks(self): return PTChunkIterator(PTChunk(self.heap_base, self.state), lambda chunk: not chunk.is_free())
[docs] def free_chunks(self): return PTChunkIterator(PTChunk(self.heap_base, self.state), lambda chunk: chunk.is_free())
[docs] def chunk_from_mem(self, ptr): """ Given a pointer to a user payload, return the base of the chunk associated with that payload (i.e. the chunk pointer). Returns None if ptr is null. :param ptr: a pointer to the base of a user payload in the heap :returns: a pointer to the base of the associated heap chunk, or None if ptr is null """ if self.state.solver.symbolic(ptr): try: ptr = self.state.solver.eval_one(ptr) except SimSolverError: l.warning("A pointer to a chunk is symbolic; maximizing it") ptr = self.state.solver.max_int(ptr) else: ptr = self.state.solver.eval(ptr) return PTChunk(ptr - (2 * self._chunk_size_t_size), self.state) if ptr != 0 else None
def _find_bck(self, chunk): """ Simply finds the free chunk that would be the backwards chunk relative to the chunk at ptr. Hence, the free head and all other metadata are unaltered by this function. """ cur = self.free_head_chunk if cur is None: return None fwd = cur.fwd_chunk() if cur == fwd: return cur # At this point there should be at least two free chunks in the heap if cur < chunk: while cur < fwd < chunk: cur = fwd fwd = cur.fwd_chunk() return cur else: while fwd != self.free_head_chunk: cur = fwd fwd = cur.fwd_chunk() return cur def _set_final_freeness(self, flag): """ Sets the freedom of the final chunk. Since no proper chunk follows the final chunk, the heap itself manages this. Nonetheless, for now it is implemented as if an additional chunk followed the final chunk. """ if flag: self.heap_base + self.heap_size - self._chunk_size_t_size, ~CHUNK_P_MASK, size=self.state.arch.bytes ) else: self.heap_base + self.heap_size - self._chunk_size_t_size, CHUNK_P_MASK, size=self.state.arch.bytes ) def _make_chunk_size(self, req_size): """ Takes an allocation size as requested by the user and modifies it to be a suitable chunk size. """ size = req_size size += 2 * self._chunk_size_t_size # Two size fields size = self._chunk_min_size if size < self._chunk_min_size else size if size & self._chunk_align_mask: # If the chunk would not be aligned size = (size & ~self._chunk_align_mask) + self._chunk_align_mask + 1 # Fix it return size
[docs] def malloc(self, sim_size): size = self._conc_alloc_size(sim_size) req_size = size size = self._make_chunk_size(size) chunk = None # This will be the resulting allocation free_chunk = self.free_head_chunk if free_chunk is None: l.warning("No free chunks available; heap space exhausted") return 0 # No free chunks available # This handler will be necessary as we'll be checking the size fields of many free chunks def sym_free_size_handler(size): l.warning("A free chunk's size field is symbolic; maximizing it") return self.state.solver.max_int(size) while chunk is None: free_size = free_chunk.get_size() free_size = concretize(free_size, self.state.solver, sym_free_size_handler) if free_size < size: # Chunk is too small to be used; move to the next or fail fwd = free_chunk.fwd_chunk() if fwd <= free_chunk: l.debug("No free chunks of sufficient size available") return 0 else: free_chunk = fwd elif free_size > size and free_size - size >= self._chunk_min_size: # Chunk may be too large but we'll use it anyway chunk = free_chunk bck = free_chunk.bck_chunk() # Store these now as we'll have to remove this chunk from the list fwd = free_chunk.fwd_chunk() rem_chunk = PTChunk(chunk.base + size, chunk.state) # The "remainder" chunk is the unused portion after rem_chunk.set_size(free_size - size, True) # the allocation is made. Since it follows the used rem_chunk.set_prev_freeness(False) # portion, we can set the used chunk as not free. if free_chunk == self.free_head_chunk: self.free_head_chunk = rem_chunk # If the used chunk had been the head, now the remainder is chunk.set_size(size) if free_chunk == bck and free_chunk == fwd: # If the free chunk had been the only free chunk, then the rem_chunk.set_bck_chunk(rem_chunk) # remainder chunk is now the only free chunk rem_chunk.set_fwd_chunk(rem_chunk) else: rem_chunk.set_bck_chunk(bck) # Otherwise there was at least one other chunk, and the rem_chunk.set_fwd_chunk(fwd) # remainder chunk may safely replace the original in the bck.set_fwd_chunk(rem_chunk) # list fwd.set_bck_chunk(rem_chunk) else: # Chunk is a perfect fit, or the remainder would be too small to split off as a free chunk chunk = free_chunk fwd = free_chunk.fwd_chunk() # Once again we store these in advance bck = free_chunk.bck_chunk() if bck == fwd and free_chunk == fwd: # Last chunk being used up self.free_head_chunk = None else: if free_chunk == self.free_head_chunk: self.free_head_chunk = fwd bck.set_fwd_chunk(fwd) # We can safely remove the chunk from the list fwd.set_bck_chunk(bck) next_chunk = chunk.next_chunk() # Now we set the new chunk to be allocated, using a different if next_chunk is not None: # approach depending on whether the chunk was the last in the next_chunk.set_prev_freeness(False) # heap or not else: self._set_final_freeness(False) addr = chunk.data_ptr() l.debug("Requested: %4d; Allocated: %4d; Returned: %#08x; Chunk: %#08x", req_size, size, addr, chunk.base) return addr
[docs] def free(self, ptr): # In the following, "next" and "previous" (and their abbreviations) refer to the adjacent chunks in memory, # while forward and backward (and their abbreviations) refer to the adjacent chunks in the list of free chunks req = ptr chunk = self.chunk_from_mem(ptr) if chunk is None or chunk.is_free(): return size = chunk.get_size() p_in_use = not chunk.is_prev_free() n_ptr = chunk.next_chunk() n_in_use = n_ptr is None or not n_ptr.is_free() if p_in_use and n_in_use: # exist # When both adjacent chunks are in use, no merging will be # necessary between the freed chunk and another free chunk if n_ptr is not None: n_ptr.set_prev_freeness(True) # Set the chunk to be free else: self._set_final_freeness(True) chunk.set_size(size) # Reset the chunk's size to account for the trailing size field bck = self._find_bck(chunk) # Scan the free list to determine where to insert the newly freed chunk if bck is None: # There was no other chunk in the free list self.free_head_chunk = chunk bck = chunk fwd = chunk else: # Insert the chunk after the bck chunk that was found fwd = bck.fwd_chunk() bck.set_fwd_chunk(chunk) fwd.set_bck_chunk(chunk) chunk.set_bck_chunk(bck) chunk.set_fwd_chunk(fwd) if chunk < self.free_head_chunk: self.free_head_chunk = chunk elif not p_in_use and not n_in_use: # If both the adjacent chunks are free, merging between all three will be needed p_ptr = chunk.prev_chunk() # The previous chunk will be the base of the overall new chunk p_ptr.set_size(p_ptr.get_size() + size + n_ptr.get_size()) n_fwd = n_ptr.fwd_chunk() # The chunk forward from the chunk that's next after the freed chunk, which is p_ptr.set_fwd_chunk(n_fwd) # needed since we're removing a free chunk (chunk.next_chunk()) from the linked n_fwd.set_bck_chunk(p_ptr) # list else: # There are two remaining cases, but we handle them generically below by deciding on a base for a new chunk, # determining its size, and updating all metadata around it (even though sometimes it isn't necessary). if not p_in_use: base = chunk.prev_chunk() new_size = size + base.get_size() bck = base.bck_chunk() fwd = base.fwd_chunk() else: n_size = n_ptr.get_size() base = chunk new_size = size + n_size bck = n_ptr.bck_chunk() fwd = n_ptr.fwd_chunk() # In case the freed chunk preceded the free head if base < self.free_head_chunk: self.free_head_chunk = base # In case the following chunk was the last free chunk, we can't use its links due to the merge if bck == fwd and bck == n_ptr: bck = base fwd = base base.set_size(new_size) base.set_bck_chunk(bck) base.set_fwd_chunk(fwd) bck.set_fwd_chunk(base) fwd.set_bck_chunk(base) new_next = base.next_chunk() if new_next is not None: new_next.set_prev_freeness(True) else: self._set_final_freeness(True) # FIXME: must set size twice so that once free, the trailing size field is updated; more elegant way? base.set_size(new_size) l.debug( "Free request: %#08x; Freed chunk: %#08x", self.state.solver.eval(req), self.state.solver.eval(chunk.base) )
[docs] def calloc(self, sim_nmemb, sim_size): size = self._conc_alloc_size(sim_nmemb * sim_size) addr = self.malloc(size) if addr == 0: return 0 if size != 0: z = self.state.solver.BVV(0, size * 8), z) return addr
[docs] def realloc(self, ptr, size): chunk = self.chunk_from_mem(ptr) if chunk is None: # ptr is null return self.malloc(size) size = self._conc_alloc_size(size) if size == 0: # assumes that REALLOC_ZERO_BYTES_FREES is set for ptmalloc return 0 old_size = chunk.get_size() def sym_size_handler(sym_size): l.warning("An allocated chunk's size field is symbolic; maximizing it") return self.state.solver.max_int(sym_size) old_size = concretize(old_size, self.state.solver, sym_size_handler) new_size = self._make_chunk_size(size) if new_size > old_size: # If more space is needed, will have to reallocate # TODO: this could be made more complex to make better usage of remaining heap space when it runs out by # TODO: checking for smaller adjacent free chunks that could be amalgamated (rather than malloc, copy, free) new_data_ptr = self.malloc(size) # Make the new allocation if new_data_ptr == 0: # Check for failure return 0 # Copy the old data over old_data_ptr = chunk.data_ptr() level = silence_logger() old_data = self.state.memory.load(old_data_ptr, size=old_size - 2 * self._chunk_size_t_size) unsilence_logger(level), old_data) # Free the old chunk return new_data_ptr elif new_size < old_size and old_size - new_size >= self._chunk_min_size: # Less space is needed, so just shrink the chunk and create a new free chunk from the freed space chunk.set_size(new_size, False) new_next_chunk = chunk.next_chunk() new_next_chunk.set_size(old_size - new_size, False) new_next_chunk.set_prev_freeness(False) return chunk.data_ptr() else: # No changes needed; we're already the right size return chunk.data_ptr()
def _malloc(self, sim_size): return self.malloc(sim_size) def _free(self, ptr): return def _calloc(self, sim_nmemb, sim_size): return self.calloc(sim_nmemb, sim_size) def _realloc(self, ptr, size): return self.realloc(ptr, size) def _combine(self, others): if any(o.heap_base != self.heap_base for o in others): raise SimMergeError("Cannot merge heaps with different bases") # When heaps become more dynamic, this next one can probably change if any(o.heap_size != self.heap_size for o in others): raise SimMergeError("Cannot merge heaps with different sizes") if any(o.free_head_chunk != self.free_head_chunk for o in others): raise SimMergeError("Cannot merge heaps with different freelist head chunks") if any(o.mmap_base != self.mmap_base for o in others): raise SimMergeError("Cannot merge heaps with different mmap bases") # These are definitely sanity checks if any(o._chunk_size_t_size != self._chunk_size_t_size for o in others): raise SimMergeError("Cannot merge heaps with different chunk size_t sizes") if any(o._chunk_min_size != self._chunk_min_size for o in others): raise SimMergeError("Cannot merge heaps with different minimum chunk sizes") if any(o._chunk_align_mask != self._chunk_align_mask for o in others): raise SimMergeError("Cannot merge heaps with different chunk alignments") return False
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint:disable=unused-argument return self._combine(others)
[docs] def widen(self, others): return self._combine(others)
[docs] def init_state(self): super().init_state() self._chunk_size_t_size = self.state.arch.bytes self._chunk_min_size = 4 * self._chunk_size_t_size self._chunk_align_mask = 2 * self._chunk_size_t_size - 1 # TODO: where are bin metadata stored in reality? if self._free_head_chunk_exists and self._free_head_chunk_init_base is None: free_base = self.heap_base if self.heap_base & self._chunk_align_mask: free_base = (self.heap_base & ~self._chunk_align_mask) + self._chunk_align_mask + 1 self.free_head_chunk = PTChunk(free_base, self.state, self) elif not self._free_head_chunk_exists: self.free_head_chunk = None else: self.free_head_chunk = PTChunk(self._free_head_chunk_init_base, self.state) # We reserve enough space at the top of the heap to simulate the presence of another chunk, for the purpose of # storing the usage information of the real final chunk if not self._initialized: self.free_head_chunk.base + self._chunk_size_t_size, ((self.heap_size - 2 * self._chunk_size_t_size) & ~self._chunk_align_mask) | CHUNK_P_MASK, size=self.state.arch.bytes, ) self._set_final_freeness(True) self.free_head_chunk.set_fwd_chunk(self.free_head_chunk) self.free_head_chunk.set_bck_chunk(self.free_head_chunk) self._initialized = True