import re
import os
import os.path
import json
import logging
import multiprocessing
import tempfile
import functools
import subprocess
import contextlib
import datetime
import shutil
import glob
from collections import defaultdict
from libscanbuild import command_entry_point, compiler_wrapper, \
wrapper_environment, run_build, run_command, CtuConfig
from libscanbuild.arguments import parse_args_for_scan_build, \
parse_args_for_analyze_build
from libscanbuild.intercept import capture
from libscanbuild.report import document
from libscanbuild.compilation import split_command, classify_source, \
compiler_language
from libscanbuild.clang import get_version, get_arguments, get_triple_arch, \
ClangErrorException
from libscanbuild.shell import decode
__all__ = ['scan_build', 'analyze_build', 'analyze_compiler_wrapper']
scanbuild_dir = os.path.dirname(os.path.realpath(__import__('sys').argv[0]))
COMPILER_WRAPPER_CC = os.path.join(scanbuild_dir, '..', 'libexec', 'analyze-cc')
COMPILER_WRAPPER_CXX = os.path.join(scanbuild_dir, '..', 'libexec', 'analyze-c++')
CTU_EXTDEF_MAP_FILENAME = 'externalDefMap.txt'
CTU_TEMP_DEFMAP_FOLDER = 'tmpExternalDefMaps'
@command_entry_point
def scan_build():
args = parse_args_for_scan_build()
with report_directory(
args.output, args.keep_empty, args.output_format) as args.output:
if args.intercept_first:
exit_code = capture(args)
if need_analyzer(args.build):
govern_analyzer_runs(args)
else:
environment = setup_environment(args)
exit_code = run_build(args.build, env=environment)
number_of_bugs = document(args)
return number_of_bugs if args.status_bugs else exit_code
@command_entry_point
def analyze_build():
args = parse_args_for_analyze_build()
with report_directory(args.output, args.keep_empty, args.output_format) as args.output:
govern_analyzer_runs(args)
number_of_bugs = document(args)
return number_of_bugs if args.status_bugs else 0
def need_analyzer(args):
return len(args) and not re.search(r'configure|autogen', args[0])
def prefix_with(constant, pieces):
return [elem for piece in pieces for elem in [constant, piece]]
def get_ctu_config_from_args(args):
return (
CtuConfig(collect=args.ctu_phases.collect,
analyze=args.ctu_phases.analyze,
dir=args.ctu_dir,
extdef_map_cmd=args.extdef_map_cmd)
if hasattr(args, 'ctu_phases') and hasattr(args.ctu_phases, 'dir')
else CtuConfig(collect=False, analyze=False, dir='', extdef_map_cmd=''))
def get_ctu_config_from_json(ctu_conf_json):
ctu_config = json.loads(ctu_conf_json)
return CtuConfig(collect=ctu_config[0],
analyze=ctu_config[1],
dir=ctu_config[2],
extdef_map_cmd=ctu_config[3])
def create_global_ctu_extdef_map(extdef_map_lines):
mangled_to_asts = defaultdict(set)
for line in extdef_map_lines:
mangled_name, ast_file = line.strip().split(' ', 1)
mangled_to_asts[mangled_name].add(ast_file)
mangled_ast_pairs = []
for mangled_name, ast_files in mangled_to_asts.items():
if len(ast_files) == 1:
mangled_ast_pairs.append((mangled_name, next(iter(ast_files))))
return mangled_ast_pairs
def merge_ctu_extdef_maps(ctudir):
def generate_extdef_map_lines(extdefmap_dir):
files = glob.glob(os.path.join(extdefmap_dir, '*'))
files.sort()
for filename in files:
with open(filename, 'r') as in_file:
for line in in_file:
yield line
def write_global_map(arch, mangled_ast_pairs):
extern_defs_map_file = os.path.join(ctudir, arch,
CTU_EXTDEF_MAP_FILENAME)
with open(extern_defs_map_file, 'w') as out_file:
for mangled_name, ast_file in mangled_ast_pairs:
out_file.write('%s %s\n' % (mangled_name, ast_file))
triple_arches = glob.glob(os.path.join(ctudir, '*'))
for triple_path in triple_arches:
if os.path.isdir(triple_path):
triple_arch = os.path.basename(triple_path)
extdefmap_dir = os.path.join(ctudir, triple_arch,
CTU_TEMP_DEFMAP_FOLDER)
extdef_map_lines = generate_extdef_map_lines(extdefmap_dir)
mangled_ast_pairs = create_global_ctu_extdef_map(extdef_map_lines)
write_global_map(triple_arch, mangled_ast_pairs)
shutil.rmtree(extdefmap_dir, ignore_errors=True)
def run_analyzer_parallel(args):
def exclude(filename, directory):
if not os.path.isabs(filename):
filename = os.path.normpath(os.path.join(directory, filename))
return any(re.match(r'^' + exclude_directory, filename)
for exclude_directory in args.excludes)
consts = {
'clang': args.clang,
'output_dir': args.output,
'output_format': args.output_format,
'output_failures': args.output_failures,
'direct_args': analyzer_params(args),
'force_debug': args.force_debug,
'ctu': get_ctu_config_from_args(args)
}
logging.debug('run analyzer against compilation database')
with open(args.cdb, 'r') as handle:
generator = (dict(cmd, **consts)
for cmd in json.load(handle) if not exclude(
cmd['file'], cmd['directory']))
pool = multiprocessing.Pool(1 if args.verbose > 2 else None)
for current in pool.imap_unordered(run, generator):
if current is not None:
for line in current['error_output']:
logging.info(line.rstrip())
pool.close()
pool.join()
def govern_analyzer_runs(args):
ctu_config = get_ctu_config_from_args(args)
if ctu_config.collect:
shutil.rmtree(ctu_config.dir, ignore_errors=True)
if ctu_config.collect and ctu_config.analyze:
args.ctu_phases = CtuConfig(collect=True, analyze=False,
dir='', extdef_map_cmd='')
run_analyzer_parallel(args)
merge_ctu_extdef_maps(ctu_config.dir)
args.ctu_phases = CtuConfig(collect=False, analyze=True,
dir='', extdef_map_cmd='')
run_analyzer_parallel(args)
shutil.rmtree(ctu_config.dir, ignore_errors=True)
else:
run_analyzer_parallel(args)
if ctu_config.collect:
merge_ctu_extdef_maps(ctu_config.dir)
def setup_environment(args):
environment = dict(os.environ)
environment.update(wrapper_environment(args))
environment.update({
'CC': COMPILER_WRAPPER_CC,
'CXX': COMPILER_WRAPPER_CXX,
'ANALYZE_BUILD_CLANG': args.clang if need_analyzer(args.build) else '',
'ANALYZE_BUILD_REPORT_DIR': args.output,
'ANALYZE_BUILD_REPORT_FORMAT': args.output_format,
'ANALYZE_BUILD_REPORT_FAILURES': 'yes' if args.output_failures else '',
'ANALYZE_BUILD_PARAMETERS': ' '.join(analyzer_params(args)),
'ANALYZE_BUILD_FORCE_DEBUG': 'yes' if args.force_debug else '',
'ANALYZE_BUILD_CTU': json.dumps(get_ctu_config_from_args(args))
})
return environment
@command_entry_point
def analyze_compiler_wrapper():
return compiler_wrapper(analyze_compiler_wrapper_impl)
def analyze_compiler_wrapper_impl(result, execution):
if result or not os.getenv('ANALYZE_BUILD_CLANG'):
return
compilation = split_command(execution.cmd)
if compilation is None:
return
parameters = {
'clang': os.getenv('ANALYZE_BUILD_CLANG'),
'output_dir': os.getenv('ANALYZE_BUILD_REPORT_DIR'),
'output_format': os.getenv('ANALYZE_BUILD_REPORT_FORMAT'),
'output_failures': os.getenv('ANALYZE_BUILD_REPORT_FAILURES'),
'direct_args': os.getenv('ANALYZE_BUILD_PARAMETERS',
'').split(' '),
'force_debug': os.getenv('ANALYZE_BUILD_FORCE_DEBUG'),
'directory': execution.cwd,
'command': [execution.cmd[0], '-c'] + compilation.flags,
'ctu': get_ctu_config_from_json(os.getenv('ANALYZE_BUILD_CTU'))
}
for source in compilation.files:
parameters.update({'file': source})
logging.debug('analyzer parameters %s', parameters)
current = run(parameters)
if current is not None:
for line in current['error_output']:
logging.info(line.rstrip())
@contextlib.contextmanager
def report_directory(hint, keep, output_format):
stamp_format = 'scan-build-%Y-%m-%d-%H-%M-%S-%f-'
stamp = datetime.datetime.now().strftime(stamp_format)
parent_dir = os.path.abspath(hint)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
name = tempfile.mkdtemp(prefix=stamp, dir=parent_dir)
logging.info('Report directory created: %s', name)
try:
yield name
finally:
args = (name,)
if os.listdir(name):
if output_format not in ['sarif', 'sarif-html']: msg = "Run 'scan-view %s' to examine bug reports."
elif output_format == 'sarif-html':
msg = "Run 'scan-view %s' to examine bug reports or see " \
"merged sarif results at %s/results-merged.sarif."
args = (name, name)
else:
msg = "View merged sarif results at %s/results-merged.sarif."
keep = True
else:
if keep:
msg = "Report directory '%s' contains no report, but kept."
else:
msg = "Removing directory '%s' because it contains no report."
logging.warning(msg, *args)
if not keep:
os.rmdir(name)
def analyzer_params(args):
result = []
if args.store_model:
result.append('-analyzer-store={0}'.format(args.store_model))
if args.constraints_model:
result.append('-analyzer-constraints={0}'.format(
args.constraints_model))
if args.internal_stats:
result.append('-analyzer-stats')
if args.analyze_headers:
result.append('-analyzer-opt-analyze-headers')
if args.stats:
result.append('-analyzer-checker=debug.Stats')
if args.maxloop:
result.extend(['-analyzer-max-loop', str(args.maxloop)])
if args.output_format:
result.append('-analyzer-output={0}'.format(args.output_format))
if args.analyzer_config:
result.extend(['-analyzer-config', args.analyzer_config])
if args.verbose >= 4:
result.append('-analyzer-display-progress')
if args.plugins:
result.extend(prefix_with('-load', args.plugins))
if args.enable_checker:
checkers = ','.join(args.enable_checker)
result.extend(['-analyzer-checker', checkers])
if args.disable_checker:
checkers = ','.join(args.disable_checker)
result.extend(['-analyzer-disable-checker', checkers])
return prefix_with('-Xclang', result)
def require(required):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
for key in required:
if key not in args[0]:
raise KeyError('{0} not passed to {1}'.format(
key, function.__name__))
return function(*args, **kwargs)
return wrapper
return decorator
@require(['command', 'directory', 'file', 'clang', 'direct_args', 'force_debug', 'output_dir', 'output_format', 'output_failures', 'ctu']) def run(opts):
try:
command = opts.pop('command')
command = command if isinstance(command, list) else decode(command)
logging.debug("Run analyzer against '%s'", command)
opts.update(classify_parameters(command))
return arch_check(opts)
except Exception:
logging.error("Problem occurred during analysis.", exc_info=1)
return None
@require(['clang', 'directory', 'flags', 'file', 'output_dir', 'language',
'error_output', 'exit_code'])
def report_failure(opts):
def extension():
mapping = {'objective-c++': '.mii', 'objective-c': '.mi', 'c++': '.ii'}
return mapping.get(opts['language'], '.i')
def destination():
failures_dir = os.path.join(opts['output_dir'], 'failures')
if not os.path.isdir(failures_dir):
os.makedirs(failures_dir)
return failures_dir
error = 'crash' if opts['exit_code'] < 0 else 'other_error'
(handle, name) = tempfile.mkstemp(suffix=extension(),
prefix='clang_' + error + '_',
dir=destination())
os.close(handle)
cwd = opts['directory']
cmd = [opts['clang'], '-fsyntax-only', '-E'] + opts['flags'] + \
[opts['file'], '-o', name]
try:
cmd = get_arguments(cmd, cwd)
run_command(cmd, cwd=cwd)
except subprocess.CalledProcessError:
pass
except ClangErrorException:
pass
with open(name + '.info.txt', 'w') as handle:
handle.write(opts['file'] + os.linesep)
handle.write(error.title().replace('_', ' ') + os.linesep)
handle.write(' '.join(cmd) + os.linesep)
handle.write(' '.join(os.uname()) + os.linesep)
handle.write(get_version(opts['clang']))
handle.close()
with open(name + '.stderr.txt', 'w') as handle:
handle.writelines(opts['error_output'])
handle.close()
@require(['clang', 'directory', 'flags', 'direct_args', 'file', 'output_dir',
'output_format'])
def run_analyzer(opts, continuation=report_failure):
def target():
if opts['output_format'] in {
'plist',
'plist-html',
'plist-multi-file'}:
(handle, name) = tempfile.mkstemp(prefix='report-',
suffix='.plist',
dir=opts['output_dir'])
os.close(handle)
return name
elif opts['output_format'] in {
'sarif',
'sarif-html'}:
(handle, name) = tempfile.mkstemp(prefix='result-',
suffix='.sarif',
dir=opts['output_dir'])
os.close(handle)
return name
return opts['output_dir']
try:
cwd = opts['directory']
cmd = get_arguments([opts['clang'], '--analyze'] +
opts['direct_args'] + opts['flags'] +
[opts['file'], '-o', target()],
cwd)
output = run_command(cmd, cwd=cwd)
return {'error_output': output, 'exit_code': 0}
except subprocess.CalledProcessError as ex:
result = {'error_output': ex.output, 'exit_code': ex.returncode}
if opts.get('output_failures', False):
opts.update(result)
continuation(opts)
return result
except ClangErrorException as ex:
result = {'error_output': ex.error, 'exit_code': 0}
if opts.get('output_failures', False):
opts.update(result)
continuation(opts)
return result
def extdef_map_list_src_to_ast(extdef_src_list):
extdef_ast_list = []
for extdef_src_txt in extdef_src_list:
mangled_name, path = extdef_src_txt.split(" ", 1)
path = os.path.splitdrive(path)[1]
path = path[1:] if path[0] == os.sep else path
ast_path = os.path.join("ast", path + ".ast")
extdef_ast_list.append(mangled_name + " " + ast_path)
return extdef_ast_list
@require(['clang', 'directory', 'flags', 'direct_args', 'file', 'ctu'])
def ctu_collect_phase(opts):
def generate_ast(triple_arch):
args = opts['direct_args'] + opts['flags']
ast_joined_path = os.path.join(opts['ctu'].dir, triple_arch, 'ast',
os.path.realpath(opts['file'])[1:] +
'.ast')
ast_path = os.path.abspath(ast_joined_path)
ast_dir = os.path.dirname(ast_path)
if not os.path.isdir(ast_dir):
try:
os.makedirs(ast_dir)
except OSError:
pass
ast_command = [opts['clang'], '-emit-ast']
ast_command.extend(args)
ast_command.append('-w')
ast_command.append(opts['file'])
ast_command.append('-o')
ast_command.append(ast_path)
logging.debug("Generating AST using '%s'", ast_command)
run_command(ast_command, cwd=opts['directory'])
def map_extdefs(triple_arch):
args = opts['direct_args'] + opts['flags']
extdefmap_command = [opts['ctu'].extdef_map_cmd]
extdefmap_command.append(opts['file'])
extdefmap_command.append('--')
extdefmap_command.extend(args)
logging.debug("Generating external definition map using '%s'",
extdefmap_command)
extdef_src_list = run_command(extdefmap_command, cwd=opts['directory'])
extdef_ast_list = extdef_map_list_src_to_ast(extdef_src_list)
extern_defs_map_folder = os.path.join(opts['ctu'].dir, triple_arch,
CTU_TEMP_DEFMAP_FOLDER)
if not os.path.isdir(extern_defs_map_folder):
try:
os.makedirs(extern_defs_map_folder)
except OSError:
pass
if extdef_ast_list:
with tempfile.NamedTemporaryFile(mode='w',
dir=extern_defs_map_folder,
delete=False) as out_file:
out_file.write("\n".join(extdef_ast_list) + "\n")
cwd = opts['directory']
cmd = [opts['clang'], '--analyze'] + opts['direct_args'] + opts['flags'] \
+ [opts['file']]
triple_arch = get_triple_arch(cmd, cwd)
generate_ast(triple_arch)
map_extdefs(triple_arch)
@require(['ctu'])
def dispatch_ctu(opts, continuation=run_analyzer):
ctu_config = opts['ctu']
if ctu_config.collect or ctu_config.analyze:
assert ctu_config.collect != ctu_config.analyze
if ctu_config.collect:
return ctu_collect_phase(opts)
if ctu_config.analyze:
cwd = opts['directory']
cmd = [opts['clang'], '--analyze'] + opts['direct_args'] \
+ opts['flags'] + [opts['file']]
triarch = get_triple_arch(cmd, cwd)
ctu_options = ['ctu-dir=' + os.path.join(ctu_config.dir, triarch),
'experimental-enable-naive-ctu-analysis=true']
analyzer_options = prefix_with('-analyzer-config', ctu_options)
direct_options = prefix_with('-Xanalyzer', analyzer_options)
opts['direct_args'].extend(direct_options)
return continuation(opts)
@require(['flags', 'force_debug'])
def filter_debug_flags(opts, continuation=dispatch_ctu):
if opts.pop('force_debug'):
opts.update({'flags': opts['flags'] + ['-UNDEBUG']})
return continuation(opts)
@require(['language', 'compiler', 'file', 'flags'])
def language_check(opts, continuation=filter_debug_flags):
accepted = frozenset({
'c', 'c++', 'objective-c', 'objective-c++', 'c-cpp-output',
'c++-cpp-output', 'objective-c-cpp-output'
})
language = opts.pop('language')
compiler = opts.pop('compiler')
if language is None and compiler is not None:
language = classify_source(opts['file'], compiler == 'c')
if language is None:
logging.debug('skip analysis, language not known')
return None
elif language not in accepted:
logging.debug('skip analysis, language not supported')
return None
else:
logging.debug('analysis, language: %s', language)
opts.update({'language': language,
'flags': ['-x', language] + opts['flags']})
return continuation(opts)
@require(['arch_list', 'flags'])
def arch_check(opts, continuation=language_check):
disabled = frozenset({'ppc', 'ppc64'})
received_list = opts.pop('arch_list')
if received_list:
filtered_list = [a for a in received_list if a not in disabled]
if filtered_list:
current = filtered_list.pop()
logging.debug('analysis, on arch: %s', current)
opts.update({'flags': ['-arch', current] + opts['flags']})
return continuation(opts)
else:
logging.debug('skip analysis, found not supported arch')
return None
else:
logging.debug('analysis, on default arch')
return continuation(opts)
IGNORED_FLAGS = {
'-c': 0, '-fsyntax-only': 0, '-o': 1, '-g': 0,
'-save-temps': 0,
'-install_name': 1,
'-exported_symbols_list': 1,
'-current_version': 1,
'-compatibility_version': 1,
'-init': 1,
'-e': 1,
'-seg1addr': 1,
'-bundle_loader': 1,
'-multiply_defined': 1,
'-sectorder': 3,
'--param': 1,
'--serialize-diagnostics': 1
}
def classify_parameters(command):
result = {
'flags': [], 'arch_list': [], 'language': None, 'compiler': compiler_language(command) }
args = iter(command[1:])
for arg in args:
if arg == '-arch':
result['arch_list'].append(next(args))
elif arg == '-x':
result['language'] = next(args)
elif re.match(r'^[^-].+', arg) and classify_source(arg):
pass
elif arg in IGNORED_FLAGS:
count = IGNORED_FLAGS[arg]
for _ in range(count):
next(args)
elif re.match(r'^-W.+', arg) and not re.match(r'^-Wno-.+', arg):
pass
else:
result['flags'].append(arg)
return result