import collections
import functools
import json
import logging
import os
import os.path
import re
import shlex
import subprocess
import sys
ENVIRONMENT_KEY = 'INTERCEPT_BUILD'
Execution = collections.namedtuple('Execution', ['pid', 'cwd', 'cmd'])
CtuConfig = collections.namedtuple('CtuConfig', ['collect', 'analyze', 'dir',
'extdef_map_cmd'])
def duplicate_check(method):
def predicate(entry):
entry_hash = predicate.unique(entry)
if entry_hash not in predicate.state:
predicate.state.add(entry_hash)
return False
return True
predicate.unique = method
predicate.state = set()
return predicate
def run_build(command, *args, **kwargs):
environment = kwargs.get('env', os.environ)
logging.debug('run build %s, in environment: %s', command, environment)
exit_code = subprocess.call(command, *args, **kwargs)
logging.debug('build finished with exit code: %d', exit_code)
return exit_code
def run_command(command, cwd=None):
def decode_when_needed(result):
return result.decode('utf-8') if isinstance(result, bytes) else result
try:
directory = os.path.abspath(cwd) if cwd else os.getcwd()
logging.debug('exec command %s in %s', command, directory)
output = subprocess.check_output(command,
cwd=directory,
stderr=subprocess.STDOUT)
return decode_when_needed(output).splitlines()
except subprocess.CalledProcessError as ex:
ex.output = decode_when_needed(ex.output).splitlines()
raise ex
def reconfigure_logging(verbose_level):
if verbose_level == 0:
return
root = logging.getLogger()
level = logging.WARNING - min(logging.WARNING, (10 * verbose_level))
root.setLevel(level)
if verbose_level <= 3:
fmt_string = '%(name)s: %(levelname)s: %(message)s'
else:
fmt_string = '%(name)s: %(levelname)s: %(funcName)s: %(message)s'
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt=fmt_string))
root.handlers = [handler]
def command_entry_point(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
try:
logging.basicConfig(format='%(name)s: %(message)s',
level=logging.WARNING,
stream=sys.stdout)
logging.getLogger().name = os.path.basename(sys.argv[0])
return function(*args, **kwargs)
except KeyboardInterrupt:
logging.warning('Keyboard interrupt')
return 130 except Exception:
logging.exception('Internal error.')
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.error("Please report this bug and attach the output "
"to the bug report")
else:
logging.error("Please run this command again and turn on "
"verbose mode (add '-vvvv' as argument).")
return 64 finally:
logging.shutdown()
return wrapper
def compiler_wrapper(function):
def is_cxx_compiler():
wrapper_command = os.path.basename(sys.argv[0])
return re.match(r'(.+)c\+\+(.*)', wrapper_command)
def run_compiler(executable):
command = executable + sys.argv[1:]
logging.debug('compilation: %s', command)
result = subprocess.call(command)
logging.debug('compilation exit code: %d', result)
return result
parameters = json.loads(os.environ[ENVIRONMENT_KEY])
reconfigure_logging(parameters['verbose'])
cxx = is_cxx_compiler()
compiler = parameters['cxx'] if cxx else parameters['cc']
result = run_compiler(compiler)
try:
call = Execution(
pid=os.getpid(),
cwd=os.getcwd(),
cmd=['c++' if cxx else 'cc'] + sys.argv[1:])
function(result, call)
except:
logging.exception('Compiler wrapper failed complete.')
finally:
return result
def wrapper_environment(args):
return {
ENVIRONMENT_KEY: json.dumps({
'verbose': args.verbose,
'cc': shlex.split(args.cc),
'cxx': shlex.split(args.cxx)
})
}