Source code for FingerPrint.syscalltracer

#!/usr/bin/env python
#
#
# LC
#
# this file is used to trace a process and to see what are the 
# libraries it depends on
#

import FingerPrint.blotter
import FingerPrint.utils
import FingerPrint.sergeant

import tempfile
import os, signal, re
import logging

logger = logging.getLogger('fingerprint')



[docs]class SyscallTracer: """ this class can spawn a process and trace its' execution to record what are its dynamic dependency requirements Usage: .. code:: python tracer = SyscallTracer() execcmd = shlex.split(execcmd) tracer.main(execcmd) # output will in the TracerControlBlock static variables TracerControlBlock.[files|dependencies|env|cmdline] """
[docs] def main(self, command): """ start the trace with the given command :type command: string :param command: command line to trace passed through shlex.split :rtype: bool :return: false if something went wrong """ import ctypes from FingerPrint.ptrace import func as ptrace_func import FingerPrint.ptrace.cpu_info import FingerPrint.ptrace.signames files={} # # main function to launch a process and trace it # returnValue = False self.program = command # this is to check if we are entering or returning from a system call processesStatus = dict() options = ptrace_func.PTRACE_O_TRACEFORK | ptrace_func.PTRACE_O_TRACEVFORK \ | ptrace_func.PTRACE_O_TRACECLONE | ptrace_func.PTRACE_O_TRACEEXIT \ | ptrace_func.PTRACE_O_TRACEEXEC | ptrace_func.PTRACE_O_TRACESYSGOOD; #TODO add the logger #logger = getLogger() #logger.setLevel(DEBUG) # creating the debugger and setting it up child = os.fork() if child == 0: # we are in the child or traced process # traceme and execv ptrace_func.ptrace_traceme() os.execl(FingerPrint.utils.which(self.program[0]), *self.program) else: # father or tracer process # we trace the execution here logger.debug("The fingerprint process %d going to trace %d" % (os.getpid(), child)) pid, status = os.waitpid(-1, 0) if pid != child : logger.error("The process tracer could not bootstrap.") return False ptrace_func.ptrace_setoptions(child, options); ptrace_func.ptrace_syscall(child); files = TracerControlBlock.files TracerControlBlock.set_trace_function() while True: # main loop tracer # 1. wait for syscall from the children # 2. analyze what happen, if mmap syscall scan /proc/PID/maps # 3. get ready to wait for the next syscall try: # wait for all cloned children __WALL = 0x40000000 (pid, status) = os.waitpid(-1, 0x40000000 ) except OSError: logger.error("Tracing terminated successfully") return True if not pid > 0: logger.error("Catastrofic failure") return False event = status >> 16; signalValue = os.WSTOPSIG(status) deliverSignal = 0 if os.WIFEXITED(status): # a process died, report it and go back to wait for syscall logger.debug("The process " + str(pid) + " exited") processesStatus.pop(pid) continue if os.WIFSIGNALED(status): logger.debug("The process " + str(pid) + " exited because of a signal") processesStatus.pop(pid) continue if os.WIFCONTINUED(status): logger.debug("The process " + str(pid) + " continued") elif os.WIFSTOPPED(status) and signalValue == (signal.SIGTRAP | 0x80 ): # # we have a syscall # orig_rax or orig_eax contains the syscall number # taken from linux src arch/x86/syscalls/syscall_[32|64].tbl # switch on the syscal number to intercept mmap and open regs = ptrace_func.ptrace_getregs(pid) if pid not in processesStatus : #new pid tcb = TracerControlBlock( pid ) processesStatus[pid] = tcb if (FingerPrint.ptrace.cpu_info.CPU_X86_64 and regs.orig_rax == 2) or \ (FingerPrint.ptrace.cpu_info.CPU_I386 and regs.orig_eax == 5):# or regs.orig_rax == 257): # # handle open (orig_rax == 2 on 64bit) or (orig_eax == 5 on 32bit) # if processesStatus[pid].enterCall : # we are entering open, regs.rsi contains the first arguments for 64bit # https://github.com/torvalds/linux/blob/master/arch/x86/kernel/entry_64.S#L585 # ebx if for 32 bits http://man7.org/linux/man-pages/man2/syscall.2.html if FingerPrint.ptrace.cpu_info.CPU_X86_64: processesStatus[pid].firstArg = regs.rdi else: processesStatus[pid].firstArg = regs.ebx processesStatus[pid].enterCall = False else: # we are exiting from a open processesStatus[pid].enterCall = True # cast from c_ulong to c_long if FingerPrint.ptrace.cpu_info.CPU_X86_64: ret_value = regs.rax else: ret_value = regs.eax returnValue = ctypes.c_long(ret_value).value if returnValue >= 0: openPath = self.readCString(processesStatus[pid].firstArg, pid) if openPath[0] != '/': #relative path we need to get the pwd openPath = "$" + processesStatus[pid].getProcessCWD() + "$" + openPath libName = processesStatus[pid].getFileOpener() if libName not in files: files[libName] = {} if processesStatus[pid].getProcessName() not in files[libName]: files[libName][processesStatus[pid].getProcessName()] = set() files[libName][processesStatus[pid].getProcessName()].add(openPath) # else don't do anything # TODO use close to check for used files (easier to trace full path) elif (FingerPrint.ptrace.cpu_info.CPU_X86_64 and regs.orig_rax == 9)\ or (FingerPrint.ptrace.cpu_info.CPU_I386 and \ (regs.orig_eax == 90 or regs.orig_eax == 192 ) ): # # handle mmap (orig_rax == 9 64bit or orig_eax == 90 or 192 on 32bit) # if processesStatus[pid].enterCall : # we are entering mmap processesStatus[pid].enterCall = False #print "the process %d enter mmap" % pid else: # we are returning from mmap processesStatus[pid].enterCall = True processesStatus[pid].updateSharedLibraries() elif os.WIFSTOPPED(status) and (signalValue == signal.SIGTRAP) and event != 0: # this is just to print some output to the users subChild = ptrace_func.ptrace_geteventmsg(pid) if event == ptrace_func.PTRACE_EVENT_FORK: logger.debug("The process %d forked a new process %d" % (pid, subChild)) elif event == ptrace_func.PTRACE_EVENT_VFORK: logger.debug("The process %d vforked a new process %d" % (pid, subChild)) elif event == ptrace_func.PTRACE_EVENT_CLONE : logger.debug("The process %d cloned a new process %d" % (pid, subChild)) elif event == ptrace_func.PTRACE_EVENT_EXEC : logger.debug("The process %d run exec" % (pid)) processesStatus[pid].updateProcessInfo() elif event == ptrace_func.PTRACE_EVENT_EXIT: pass #print "the process %d is in a event exit %d" % (pid, subChild) elif os.WIFSTOPPED(status): # when a signal is delivered to one of the child and we get notified # we need to relay it properly to the child # (in particular SIGCHLD must be rerouted to the parents if not mpirun # will never end) logger.debug("Signal %s(%d) delivered to %d " % \ (FingerPrint.ptrace.signames.signalName(signalValue), signalValue, pid)) deliverSignal = signalValue else: logger.debug("This should not happen!!") # set the ptrace option and wait for the next syscall notification #ptrace_func.ptrace_setoptions(pid, options); ptrace_func.ptrace_syscall(pid, deliverSignal);
[docs] def readCString(self, address, pid): data = [] mem = open("/proc/" + str(pid) + "/mem", 'rb') mem.seek(address) while True: b = mem.read(1) if b == '\0': break data.append(b) mem.close() return ''.join(data)
[docs] def test(self): a = {} self.main(["bash", "-c", "sleep 5 > /dev/null & find /tmp > /dev/null &"], a) print "dict: ", a
[docs]class TracerControlBlock: """ This class hold data needed for tracing a processes. Inspired by strace code (struct tcb). PS: I don't really like this solution of static variable but for the moment ti does its job :type pid: int :param pid: the PID of the process that we are tracing """ dependencies = {} """ Dictionary of shared libraries used by the various processes. E.g.: ```{ 'binarypath': [list of file it depends to], '/bin/bash': ['/lib/x86_64-linux-gnu/libnss_files-2.15.so', '/lib/x86_64-linux-gnu/libnss_nis-2.15.so']}``` """ files = {} """ Dictionary of dictionary of opened files by the various processes. E.g. files[libraryA][executableB] and files[libraryA][executableC] return respectively the list of opened file by the libraryA when run under executableB and when run under executableC. """ env = {} """ Dictionary that keeps track of process environment variables. Keys are the full path to the executable of the process and values are a list of strings containing all the variables """ cmdline = {} """ dictionary that keeps track of the executed command line. Keys are the full path to the executable and values are a list of strings containing all the token of the command line """ @classmethod
[docs] def get_env_variable(cls, process_name, variable_name): """ returns the value of the variable_name if found int he process_name environment :type process_name: string :param process_name: the full path to the executable representing this process :type variable_name: string :param variable_name: the name of the variable :rtype: string :return: a environment variable value """ if process_name in cls.env: for i in cls.env[process_name]: if i.startswith(variable_name + '='): return i return None
def __init__(self, pid): self.pid = pid self.enterCall = True self.firstArg = None self.updateProcessInfo()
[docs] def updateProcessInfo(self): """ This method updates the process information into the global static variables :attr:`TracerControlBlock.cmdline`, :attr:`TracerControlBlock.env` of this class. This method is called only once when this instance is created (aka when the process is created). """ processName = self.getProcessName() if processName not in TracerControlBlock.dependencies: # new binary file let's add it to the dyctionary TracerControlBlock.dependencies[processName] = [] #read the cmdline f = open('/proc/' + str(self.pid) + '/cmdline') TracerControlBlock.cmdline[processName] = f.read().split('\x00') f.close() #read the env f = open('/proc/' + str(self.pid) + '/environ') TracerControlBlock.env[processName] = f.read().split('\x00') f.close()
[docs] def updateSharedLibraries(self): """ This method scans the procfs to find the shared libraries loaded by this process and it updates the static :attr:`TracerControlBlock.dependencies` variable accordingly. This function is called every time the process invoke the mmap system call. """ binaryFile = self.getProcessName() f=open('/proc/' + str(self.pid) + '/maps') maps = f.read() f.close() for i in maps.split('\n'): tokens = i.split() if len(tokens) > 5 and 'x' in tokens[1] and os.path.isfile(tokens[5]): # assumption: if we have a memory mapped area to a file and it is # executable then it is a shared library libPath = tokens[5].strip() if libPath not in TracerControlBlock.dependencies[binaryFile] and libPath != binaryFile: TracerControlBlock.dependencies[binaryFile].append( libPath )
@classmethod
[docs] def set_trace_function(cls): """ This class method load the function needed to set up the stack tracer which require the external shared library. Called only once. """ try: from FingerPrint.stacktracer import trace cls.trace = trace cls.tracing = True except : # no tracer compiled fall back to binary name #import traceback #traceback.print_exc() logger.error(" - Stack tracing facility not loaded - ") cls.tracing = False
[docs] def getProcessName(self): """ :rtype: string :return: the process name (this is used in all the static attribute of this class as a key) """ return os.readlink('/proc/' + str(self.pid) + '/exe')
[docs] def getProcessCWD(self): """ :rtype: string :return: return the current working directory of this process """ return os.readlink('/proc/' + str(self.pid) + '/cwd')
[docs] def getFileOpener(self): """ if Fingerprint is compiled with the stack tracer module it will find the file object who contains the code which initiated this open system call if not it will return the path to the current process. This function is called after each open system call. :rtype: string :return: the path of the library who triggered the current open system call """ if not self.tracing: return self.getProcessName() libname = self.trace(self.pid) prev_lib = "" for line in libname.split('\n'): splitline = line.split(':') if len(splitline) != 3: continue current_lib = splitline[0] if not prev_lib: #fisrt line in the stack prev_lib = current_lib else: if current_lib == prev_lib: #we are still in the first library probaly libc prev_lib = current_lib continue else: # that's it, we just got out of first lib in the stack, lets # see if we have a open or not if current_lib not in objectFiles: objectFiles[current_lib] = ObjectFile(current_lib) if self._isOpen(objectFiles[current_lib], splitline[1], splitline[2]) : return current_lib else : return prev_lib #hmm probably we are in the loader return current_lib
def _isOpen(self, objectFile, offset, ip): """return true if ip/offset are pointing to a open syscall for this object file """ if objectFile.isDynamic(): # offset and ip are in the form of 0x45a3f2 vma=offset[2:] else: vma=ip[2:] instruction = objectFile.getPrevInstruction(vma) #print "instruction ", instruction, " file ", objectFile.filename, " ", offset, ip if len(instruction[2]) == 0: # we don't know how to decode this situation return False if '@' in instruction[2]: #it's point to the plt just remove the @plt callname = instruction[2].split('@')[0] else : #we have a function call but it's not pointing to the plt table yet #so let's try to follow the code flow to see where it leads called_instruction = objectFile.getInstruction(instruction[1]) if 'jmp' in called_instruction[0]: # we have a jump let's hope it goes to the plt if '@' in called_instruction[2]: callname = called_instruction[2].split('@')[0] else: return False else: return False # these are the system call that we trace for the moment if callname in ['fopen', '_IO_fopen', 'fopen64', 'open64', 'open', \ '__open', 'open64', '__open64']: return True else: return False
objectFiles = {}
[docs]class ObjectFile: """ This class wraps an elf object file and its assembler code used by the stack tracing facility. This class depend on objdump to disassemble binaries. This class need several optimization (uses a lot of memory and CPU time). :type filename: string :param filename: the path to the binary will be disassembled """ def __init__(self, filename): """ """ self.filename = filename if FingerPrint.sergeant.prelink : # we need to undo the prelink # we could use also the IP instead of the offset in the _isOpen function # instead of undoing the prelinking but then it would not work with prelinked # and it would fail with non prelinked libraries (this way always works but is slower) (fd, filename) = tempfile.mkstemp() returncode = FingerPrint.utils.getOutputAsList(["prelink", "-u", "-o", \ filename, self.filename])[1] if returncode != 0 : raise RuntimeError("unable to undo the prelink on " + self.filename) (outputs, returncode) = FingerPrint.utils.getOutputAsList(["objdump", "-x", "-D", filename]) if returncode != 0 : raise RuntimeError("objdump failed for file " + self.filename) if FingerPrint.sergeant.prelink : os.close(fd) os.remove(filename) self.assembler = outputs
[docs] def isDynamic(self): """ :rtype: bool :return: true if this is a dynamic object aka a shared library """ #we just check the first lines no need to scan the whole thing for line in self.assembler[0:10]: if "EXEC_P" in line: return False if "DYNAMIC" in line: return True raise RuntimeError("Unable to determine VMA for file " + self.filename)
[docs] def getInstruction(self, vma): """ it decodes the instruction at the given virtual memory address :type vma: string :param vma: the virtual memory address in an hexadecimal format :rtype: tuple :return: a tuple of tree strings where the first string is the opcode at the given address vma, the second is the address referred by the instruction and the third is the symbolic name referred by the address """ for line in self.assembler: if re.match(" *" + vma + ":", line): return self._decodeLine(line) raise RuntimeError("Unable to determine VMA for file " + self.filename)
[docs] def getPrevInstruction(self, vma): """ it decodes the previous instruction at the given virtual memory address :type vma: string :param vma: the virtual memory address in an hexadecimal format :rtype: tuple :return: a tuple of tree strings where the first string is the opcode at the given address vma, the second is the address referred by the instruction and the third is the symbolic name referred by the address """ for line in self.assembler: if re.match(" *" + vma + ":", line): return self._decodeLine(prevLine) prevLine = line raise RuntimeError("Unable to determine VMA for file " + self.filename)
def _decodeLine(self, line): """ given a line of the disassemble code it returns a tuple with (instruction, address, symbol) """ # take out the address part line = line[line.find(':') + 1:].lstrip() while True: if not re.match("[a-z0-9][a-z0-9] ", line[0:3]): break line = line[3:] #we removed all the hex opcodes line = line.lstrip() tokens = line.split() istr = tokens[0] if len(tokens) > 1: # we have an address addr = tokens[1] else: addr = "" if len(tokens) > 2: # we have a symbol sym = tokens[2] sym = sym.rstrip('>') sym = sym.lstrip('<') else: sym = "" return (istr, addr, sym)