import sys
import os
import os.path
import re
import itertools
import json
import glob
import logging
from libear import build_libear, TemporaryDirectory
from libscanbuild import command_entry_point, compiler_wrapper, \
wrapper_environment, run_command, run_build
from libscanbuild import duplicate_check
from libscanbuild.compilation import split_command
from libscanbuild.arguments import parse_args_for_intercept_build
from libscanbuild.shell import encode, decode
__all__ = ['capture', 'intercept_build', 'intercept_compiler_wrapper']
GS = chr(0x1d)
RS = chr(0x1e)
US = chr(0x1f)
COMPILER_WRAPPER_CC = 'intercept-cc'
COMPILER_WRAPPER_CXX = 'intercept-c++'
TRACE_FILE_EXTENSION = '.cmd' WRAPPER_ONLY_PLATFORMS = frozenset({'win32', 'cygwin'})
@command_entry_point
def intercept_build():
args = parse_args_for_intercept_build()
return capture(args)
def capture(args):
def post_processing(commands):
current = itertools.chain.from_iterable(
format_entry(command) for command in commands)
if 'append' in args and args.append and os.path.isfile(args.cdb):
with open(args.cdb) as handle:
previous = iter(json.load(handle))
else:
previous = iter([])
duplicate = duplicate_check(entry_hash)
return (entry
for entry in itertools.chain(previous, current)
if os.path.exists(entry['file']) and not duplicate(entry))
with TemporaryDirectory(prefix='intercept-') as tmp_dir:
environment = setup_environment(args, tmp_dir)
exit_code = run_build(args.build, env=environment)
exec_traces = itertools.chain.from_iterable(
parse_exec_trace(os.path.join(tmp_dir, filename))
for filename in sorted(glob.iglob(os.path.join(tmp_dir, '*.cmd'))))
entries = post_processing(exec_traces)
with open(args.cdb, 'w+') as handle:
json.dump(list(entries), handle, sort_keys=True, indent=4)
return exit_code
def setup_environment(args, destination):
c_compiler = args.cc if 'cc' in args else 'cc'
cxx_compiler = args.cxx if 'cxx' in args else 'c++'
libear_path = None if args.override_compiler or is_preload_disabled(
sys.platform) else build_libear(c_compiler, destination)
environment = dict(os.environ)
environment.update({'INTERCEPT_BUILD_TARGET_DIR': destination})
if not libear_path:
logging.debug('intercept gonna use compiler wrappers')
environment.update(wrapper_environment(args))
environment.update({
'CC': COMPILER_WRAPPER_CC,
'CXX': COMPILER_WRAPPER_CXX
})
elif sys.platform == 'darwin':
logging.debug('intercept gonna preload libear on OSX')
environment.update({
'DYLD_INSERT_LIBRARIES': libear_path,
'DYLD_FORCE_FLAT_NAMESPACE': '1'
})
else:
logging.debug('intercept gonna preload libear on UNIX')
environment.update({'LD_PRELOAD': libear_path})
return environment
@command_entry_point
def intercept_compiler_wrapper():
return compiler_wrapper(intercept_compiler_wrapper_impl)
def intercept_compiler_wrapper_impl(_, execution):
message_prefix = 'execution report might be incomplete: %s'
target_dir = os.getenv('INTERCEPT_BUILD_TARGET_DIR')
if not target_dir:
logging.warning(message_prefix, 'missing target directory')
return
try:
target_file_name = str(os.getpid()) + TRACE_FILE_EXTENSION
target_file = os.path.join(target_dir, target_file_name)
logging.debug('writing execution report to: %s', target_file)
write_exec_trace(target_file, execution)
except IOError:
logging.warning(message_prefix, 'io problem')
def write_exec_trace(filename, entry):
with open(filename, 'ab') as handler:
pid = str(entry.pid)
command = US.join(entry.cmd) + US
content = RS.join([pid, pid, 'wrapper', entry.cwd, command]) + GS
handler.write(content.encode('utf-8'))
def parse_exec_trace(filename):
logging.debug('parse exec trace file: %s', filename)
with open(filename, 'r') as handler:
content = handler.read()
for group in filter(bool, content.split(GS)):
records = group.split(RS)
yield {
'pid': records[0],
'ppid': records[1],
'function': records[2],
'directory': records[3],
'command': records[4].split(US)[:-1]
}
def format_entry(exec_trace):
def abspath(cwd, name):
fullname = name if os.path.isabs(name) else os.path.join(cwd, name)
return os.path.normpath(fullname)
logging.debug('format this command: %s', exec_trace['command'])
compilation = split_command(exec_trace['command'])
if compilation:
for source in compilation.files:
compiler = 'c++' if compilation.compiler == 'c++' else 'cc'
command = [compiler, '-c'] + compilation.flags + [source]
logging.debug('formated as: %s', command)
yield {
'directory': exec_trace['directory'],
'command': encode(command),
'file': abspath(exec_trace['directory'], source)
}
def is_preload_disabled(platform):
if platform in WRAPPER_ONLY_PLATFORMS:
return True
elif platform == 'darwin':
command = ['csrutil', 'status']
pattern = re.compile(r'System Integrity Protection status:\s+enabled')
try:
return any(pattern.match(line) for line in run_command(command))
except:
return False
else:
return False
def entry_hash(entry):
filename = entry['file'][::-1]
directory = entry['directory'][::-1]
command = ' '.join(decode(entry['command'])[1:])
return '<>'.join([filename, directory, command])