from __future__ import absolute_import
import json
import math
import os
import shlex
import subprocess
import sys
import lit.Test
import lit.TestRunner
import lit.util
from .base import TestFormat
kIsWindows = sys.platform in ['win32', 'cygwin']
class GoogleTest(TestFormat):
def __init__(self, test_sub_dirs, test_suffix, run_under = []):
self.test_sub_dirs = str(test_sub_dirs).split(';')
exe_suffix = str(test_suffix)
if kIsWindows:
exe_suffix += '.exe'
self.test_suffixes = {exe_suffix, test_suffix + '.py'}
self.run_under = run_under
def get_num_tests(self, path, litConfig, localConfig):
list_test_cmd = self.prepareCmd(
[path, '--gtest_list_tests', '--gtest_filter=-*DISABLED_*'])
try:
out = subprocess.check_output(list_test_cmd,
env=localConfig.environment)
except subprocess.CalledProcessError as exc:
litConfig.warning(
"unable to discover google-tests in %r: %s. Process output: %s"
% (path, sys.exc_info()[1], exc.output))
return None
return sum(
map(lambda line: lit.util.to_string(line).startswith(' '),
out.splitlines(False)))
def getTestsInDirectory(self, testSuite, path_in_suite, litConfig,
localConfig):
init_shard_size = 512 core_count = lit.util.usable_core_count()
source_path = testSuite.getSourcePath(path_in_suite)
for subdir in self.test_sub_dirs:
dir_path = os.path.join(source_path, subdir)
if not os.path.isdir(dir_path):
continue
for fn in lit.util.listdir_files(dir_path,
suffixes=self.test_suffixes):
execpath = os.path.join(source_path, subdir, fn)
num_tests = self.get_num_tests(execpath, litConfig,
localConfig)
if num_tests is not None:
shard_size = init_shard_size
nshard = int(math.ceil(num_tests / shard_size))
while nshard < core_count and shard_size > 1:
shard_size = shard_size // 2
nshard = int(math.ceil(num_tests / shard_size))
for idx in range(nshard):
testPath = path_in_suite + (subdir, fn, str(idx),
str(nshard))
json_file = '-'.join([
execpath, testSuite.config.name,
str(os.getpid()),
str(idx),
str(nshard)
]) + '.json'
yield lit.Test.Test(testSuite,
testPath,
localConfig,
file_path=execpath,
gtest_json_file=json_file)
else:
testPath = path_in_suite + (
subdir, fn, 'failed_to_discover_tests_from_gtest')
yield lit.Test.Test(testSuite,
testPath,
localConfig,
file_path=execpath)
def execute(self, test, litConfig):
if test.gtest_json_file is None:
return lit.Test.FAIL, ''
testPath,testName = os.path.split(test.getSourcePath())
while not os.path.exists(testPath):
testPath, namePrefix = os.path.split(testPath)
testName = namePrefix + '/' + testName
testName,total_shards = os.path.split(testName)
testName,shard_idx = os.path.split(testName)
from lit.cl_arguments import TestOrder
use_shuffle = TestOrder(litConfig.order) == TestOrder.RANDOM
shard_env = {
'GTEST_OUTPUT': 'json:' + test.gtest_json_file,
'GTEST_SHUFFLE': '1' if use_shuffle else '0',
'GTEST_TOTAL_SHARDS': total_shards,
'GTEST_SHARD_INDEX': shard_idx
}
test.config.environment.update(shard_env)
cmd = [testPath]
cmd = self.prepareCmd(cmd)
if litConfig.useValgrind:
cmd = litConfig.valgrindArgs + cmd
if litConfig.noExecute:
return lit.Test.PASS, ''
def get_shard_header(shard_env):
shard_envs = ' '.join([k + '=' + v for k, v in shard_env.items()])
return f"Script(shard):\n--\n%s %s\n--\n" % (shard_envs, ' '.join(cmd))
shard_header = get_shard_header(shard_env)
try:
out, _, exitCode = lit.util.executeCommand(
cmd, env=test.config.environment,
timeout=litConfig.maxIndividualTestTime, redirect_stderr=True)
except lit.util.ExecuteCommandTimeoutException as e:
stream_msg = f"\n{e.out}\n--\nexit: {e.exitCode}\n--\n"
return (lit.Test.TIMEOUT, f'{shard_header}{stream_msg}Reached '
f'timeout of {litConfig.maxIndividualTestTime} seconds')
if not os.path.exists(test.gtest_json_file):
errmsg = f"shard JSON output does not exist: %s" % (
test.gtest_json_file)
stream_msg = f"\n{out}\n--\nexit: {exitCode}\n--\n"
return lit.Test.FAIL, shard_header + stream_msg + errmsg
if exitCode == 0:
return lit.Test.PASS, ''
def get_test_stdout(test_name):
res = []
header = f'[ RUN ] ' + test_name
footer = f'[ FAILED ] ' + test_name
in_range = False
for l in out.splitlines():
if l.startswith(header):
in_range = True
elif l.startswith(footer):
return f'' if len(res) == 0 else '\n'.join(res)
elif in_range:
res.append(l)
assert False, f'gtest did not report the result for ' + test_name
found_failed_test = False
with open(test.gtest_json_file, encoding='utf-8') as f:
jf = json.load(f)
if use_shuffle:
shard_env['GTEST_RANDOM_SEED'] = str(jf['random_seed'])
output = get_shard_header(shard_env) + '\n'
for testcase in jf['testsuites']:
for testinfo in testcase['testsuite']:
result = testinfo['result']
if result == 'SUPPRESSED' or result == 'SKIPPED':
continue
testname = testcase['name'] + '.' + testinfo['name']
header = f"Script:\n--\n%s --gtest_filter=%s\n--\n" % (
' '.join(cmd), testname)
if 'failures' in testinfo:
found_failed_test = True
output += header
test_out = get_test_stdout(testname)
if test_out:
output += test_out + '\n\n'
for fail in testinfo['failures']:
output += fail['failure'] + '\n'
output += '\n'
elif result != 'COMPLETED':
output += header
output += 'unresolved test result\n'
if not found_failed_test:
output += f"\n{out}\n--\nexit: {exitCode}\n--\n"
return lit.Test.FAIL, output
def prepareCmd(self, cmd):
if cmd[0].endswith('.py'):
cmd = [sys.executable] + cmd
if self.run_under:
if isinstance(self.run_under, list):
cmd = self.run_under + cmd
else:
cmd = shlex.split(self.run_under) + cmd
return cmd
@staticmethod
def post_process_shard_results(selected_tests, discovered_tests):
def remove_gtest(tests):
return [t for t in tests if t.gtest_json_file is None]
discovered_tests = remove_gtest(discovered_tests)
gtests = [t for t in selected_tests if t.gtest_json_file]
selected_tests = remove_gtest(selected_tests)
for test in gtests:
if not os.path.exists(test.gtest_json_file):
selected_tests.append(test)
discovered_tests.append(test)
continue
start_time = test.result.start or 0.0
has_failure_in_shard = False
with open(test.gtest_json_file, encoding='utf-8') as f:
testsuites = json.load(f)['testsuites']
for testcase in testsuites:
for testinfo in testcase['testsuite']:
if testinfo['result'] == 'SUPPRESSED':
continue
testPath = test.path_in_suite[:-2] + (testcase['name'],
testinfo['name'])
subtest = lit.Test.Test(test.suite, testPath,
test.config, test.file_path)
testname = testcase['name'] + '.' + testinfo['name']
header = f"Script:\n--\n%s --gtest_filter=%s\n--\n" % (
test.file_path, testname)
output = ''
if testinfo['result'] == 'SKIPPED':
returnCode = lit.Test.SKIPPED
elif 'failures' in testinfo:
has_failure_in_shard = True
returnCode = lit.Test.FAIL
output = header
for fail in testinfo['failures']:
output += fail['failure'] + '\n'
elif testinfo['result'] == 'COMPLETED':
returnCode = lit.Test.PASS
else:
returnCode = lit.Test.UNRESOLVED
output = header + 'unresolved test result\n'
elapsed_time = float(testinfo['time'][:-1])
res = lit.Test.Result(returnCode, output, elapsed_time)
res.pid = test.result.pid or 0
res.start = start_time
start_time = start_time + elapsed_time
subtest.setResult(res)
selected_tests.append(subtest)
discovered_tests.append(subtest)
os.remove(test.gtest_json_file)
if not has_failure_in_shard and test.isFailure():
selected_tests.append(test)
discovered_tests.append(test)
return selected_tests, discovered_tests