Source code for angr.flirt.build_sig

# pylint:disable=consider-using-with
from typing import List, Dict
import json
import subprocess
import argparse
import tempfile
import os
import itertools
from collections import defaultdict

import angr


UNIQUE_STRING_COUNT = 20
# strings longer than MAX_UNIQUE_STRING_LEN will be truncated
MAX_UNIQUE_STRING_LEN = 70


[docs]def get_basic_info(ar_path: str) -> Dict[str, str]: """ Get basic information of the archive file. """ with tempfile.TemporaryDirectory() as tempdirname: cwd = os.getcwd() os.chdir(tempdirname) subprocess.call(["ar", "x", ar_path]) # Load arch and OS information from the first .o file o_files = [f for f in os.listdir(".") if f.endswith(".o")] if o_files: proj = angr.Project(o_files[0], auto_load_libs=False) arch_name = proj.arch.name.lower() os_name = proj.simos.name.lower() os.chdir(cwd) return { "arch": arch_name, "platform": os_name, }
[docs]def get_unique_strings(ar_path: str) -> List[str]: """ For Linux libraries, this method requires ar (from binutils), nm (from binutils), and strings. """ # get symbols nm_output = subprocess.check_output(["nm", ar_path]) nm_lines = nm_output.decode("utf-8").split("\n") symbols = set() for nm_line in nm_lines: symbol_types = "UuVvTtRrDdWwBbNn" for symbol_type in symbol_types: if f" {symbol_type} " in nm_line: # parse it symbol = nm_line[nm_line.find(f" {symbol_type}") + 3 :].strip(" ") if "." in symbol: symbols |= set(symbol.split(".")) else: symbols.add(symbol) break # extract the archive file into a temporary directory all_strings = set() with tempfile.TemporaryDirectory() as tempdirname: cwd = os.getcwd() os.chdir(tempdirname) subprocess.call(["ar", "x", ar_path]) for filename in os.listdir("."): if filename.endswith(".o"): strings = subprocess.check_output(["strings", "-n", "8", filename]) strings = strings.decode("utf-8").split("\n") non_symbol_strings = set() for s in strings: if s in symbols: continue if "." in s and any(subs in symbols for subs in s.split(".")): continue # C++ specific if "::" in s: continue if "_" in s: # make sure it's not a substring of any symbol is_substring = False for symbol in symbols: if s in symbol: is_substring = True break if is_substring: continue non_symbol_strings.add(s) all_strings |= non_symbol_strings os.chdir(cwd) grouped_strings = defaultdict(set) for s in all_strings: grouped_strings[s[:5]].add(s) sorted_strings = list(sorted(all_strings, key=len, reverse=True)) ctr = 0 picked = set() unique_strings = [] for s in sorted_strings: if s[:5] in picked: continue unique_strings.append(s[:MAX_UNIQUE_STRING_LEN]) picked.add(s[:5]) ctr += 1 if ctr >= UNIQUE_STRING_COUNT: break return unique_strings
[docs]def run_pelf(pelf_path: str, ar_path: str, output_path: str): subprocess.check_call([pelf_path, "-r43:0:0", ar_path, output_path])
[docs]def run_sigmake(sigmake_path: str, sig_name: str, pat_path: str, sig_path: str): if " " not in sig_name: sig_name_arg = f"-n{sig_name}" else: sig_name_arg = f'-n"{sig_name}"' proc = subprocess.Popen( [sigmake_path, sig_name_arg, pat_path, sig_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) _, stderr = proc.communicate() if b"COLLISIONS:" in stderr: return False return True
[docs]def process_exc_file(exc_path: str): """ We are doing the stupidest thing possible: For each batch of conflicts, we pick the most likely result baed on a set of predefined rules. TODO: Add caller-callee-based de-duplication. """ with open(exc_path) as f: data = f.read() lines = data.split("\n") # parse groups ctr = itertools.count() idx = 0 groups = defaultdict(dict) for line in lines: if line.startswith(";"): continue if not line: idx = next(ctr) else: # parse the function name func_name = line[: line.index("\t")].strip(" ") groups[idx][func_name] = line # for each group, decide the one to keep for idx in list(groups.keys()): g = groups[idx] if len(g) == 1: # don't pick anything. This is a weird case that I don't understand continue if all(func_name.endswith(".cold") for func_name in g): # .cold functions. doesn't matter what we pick continue non_cold_names = [] for func_name in g: if func_name.endswith(".cold"): continue non_cold_names.append(func_name) # sort it non_cold_names = list(sorted(non_cold_names, key=len)) # pick the top one the_chosen_one = non_cold_names[0] line = g[the_chosen_one] g[the_chosen_one] = "+" + line # output with open(exc_path, "w") as f: for g in groups.values(): for line in g.values(): f.write(line + "\n") f.write("\n")
[docs]def main(): parser = argparse.ArgumentParser() parser.add_argument("ar_path", help="Path of the .a file to build signatures for") parser.add_argument("sig_name", help="Name of the signature (a string inside the signature file)") parser.add_argument("sig_path", help="File name of the generated signature") parser.add_argument( "--compiler", help="Name of the compiler (e.g., gcc, clang). It will be stored in the meta data file." ) parser.add_argument( "--compiler_version", help="Version of the compiler (e.g., 6). It will be stored in the meta data file." ) # parser.add_argument("--platform", help="Name of the platform (e.g., windows/linux/macos). It will be stored in # the meta data file.") parser.add_argument( "--os", help="Name of the operating system (e.g., ubuntu/debian). It will be stored in the meta data file." ) parser.add_argument( "--os_version", help="Version of the operating system (e.g., 20.04). It will be stored in the meta data file.", ) parser.add_argument("--pelf_path", help="Path of pelf") parser.add_argument("--sigmake_path", help="Path of sigmake") args = parser.parse_args() if args.pelf_path: pelf_path = args.pelf_path elif "pelf_path" in os.environ: pelf_path = os.environ["pelf_path"] else: raise ValueError("pelf_path must be specified.") if args.sigmake_path: sigmake_path = args.sigmake_path elif "sigmake_path" in os.environ: sigmake_path = os.environ["sigmake_path"] else: raise ValueError("sigmake_path must be specified.") compiler = args.compiler if compiler: compiler = compiler.lower() compiler_version = args.compiler_version if compiler_version: compiler_version = compiler_version.lower() os_name = args.os if os_name: os_name = os_name.lower() os_version = args.os_version if os_version: os_version = os_version.lower() # Get basic information # Get basic information basic_info = get_basic_info(args.ar_path) # Get unique strings from the library unique_strings = get_unique_strings(args.ar_path) # Build necessary file paths sig_path_basename = os.path.basename(args.sig_path) if "." in sig_path_basename: sig_dir = os.path.dirname(args.sig_path) filename = sig_path_basename[: sig_path_basename.rfind(".")] exc_path = os.path.join(sig_dir, filename + ".exc") meta_path = os.path.join(sig_dir, filename + ".meta") else: exc_path = args.sig_path + ".exc" meta_path = args.sig_path + ".meta" if os.path.isfile(exc_path): # Remove existing exc files (if there is one) os.remove(exc_path) # Make a temporary directory with tempfile.TemporaryDirectory() as tmpdirname: ar_path = args.ar_path basename = os.path.basename(ar_path) # sanitize basename since otherwise sigmake is not happy with it if basename.endswith(".a"): basename = basename[:-2] basename = basename.replace("+", "plus") # sanitize signame as well sig_name = args.sig_name sig_name = sig_name.replace("+", "plus") pat_path = os.path.join(tmpdirname, basename + ".pat") run_pelf(pelf_path, ar_path, pat_path) has_collision = not run_sigmake(sigmake_path, sig_name, pat_path, args.sig_path) if has_collision: process_exc_file(exc_path) # run sigmake again has_collision = not run_sigmake(sigmake_path, args.sig_name, pat_path, args.sig_path) assert not has_collision with open(meta_path, "w") as f: metadata = { "unique_strings": unique_strings, } metadata.update(basic_info) if compiler_version: metadata["compiler_version"] = compiler_version if compiler: metadata["compiler"] = compiler if os_name: metadata["os"] = os_name if os_version: metadata["os_version"] = os_version f.write(json.dumps(metadata, indent=2))
if __name__ == "__main__": main()