from __future__ import absolute_import
import errno
import io
import itertools
import getopt
import os, signal, subprocess, sys
import re
import stat
import pathlib
import platform
import shutil
import tempfile
import threading
import io
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from lit.ShCommands import GlobItem, Command
import lit.ShUtil as ShUtil
import lit.Test as Test
import lit.util
from lit.util import to_bytes, to_string, to_unicode
from lit.BooleanExpression import BooleanExpression
class InternalShellError(Exception):
def __init__(self, command, message):
self.command = command
self.message = message
kIsWindows = platform.system() == 'Windows'
kUseCloseFDs = not kIsWindows
kAvoidDevNull = kIsWindows
kDevNull = "/dev/null"
kPdbgRegex = '%dbg\\(([^)\'"]*)\\)(.*)'
class ShellEnvironment(object):
def __init__(self, cwd, env):
self.cwd = cwd
self.env = dict(env)
self.dirStack = []
def change_dir(self, newdir):
if os.path.isabs(newdir):
self.cwd = newdir
else:
self.cwd = os.path.realpath(os.path.join(self.cwd, newdir))
class TimeoutHelper(object):
def __init__(self, timeout):
self.timeout = timeout
self._procs = []
self._timeoutReached = False
self._doneKillPass = False
self._lock = None
self._timer = None
def cancel(self):
if not self.active():
return
self._timer.cancel()
def active(self):
return self.timeout > 0
def addProcess(self, proc):
if not self.active():
return
needToRunKill = False
with self._lock:
self._procs.append(proc)
needToRunKill = self._doneKillPass
if needToRunKill:
assert self.timeoutReached()
self._kill()
def startTimer(self):
if not self.active():
return
self._lock = threading.Lock()
self._timer = threading.Timer(self.timeout, self._handleTimeoutReached)
self._timer.start()
def _handleTimeoutReached(self):
self._timeoutReached = True
self._kill()
def timeoutReached(self):
return self._timeoutReached
def _kill(self):
with self._lock:
for p in self._procs:
lit.util.killProcessAndChildren(p.pid)
self._procs = [] self._doneKillPass = True
class ShellCommandResult(object):
def __init__(self, command, stdout, stderr, exitCode, timeoutReached,
outputFiles = []):
self.command = command
self.stdout = stdout
self.stderr = stderr
self.exitCode = exitCode
self.timeoutReached = timeoutReached
self.outputFiles = list(outputFiles)
def executeShCmd(cmd, shenv, results, timeout=0):
timeoutHelper = TimeoutHelper(timeout)
if timeout > 0:
timeoutHelper.startTimer()
finalExitCode = _executeShCmd(cmd, shenv, results, timeoutHelper)
timeoutHelper.cancel()
timeoutInfo = None
if timeoutHelper.timeoutReached():
timeoutInfo = 'Reached timeout of {} seconds'.format(timeout)
return (finalExitCode, timeoutInfo)
def expand_glob(arg, cwd):
if isinstance(arg, GlobItem):
return sorted(arg.resolve(cwd))
return [arg]
def expand_glob_expressions(args, cwd):
result = [args[0]]
for arg in args[1:]:
result.extend(expand_glob(arg, cwd))
return result
def quote_windows_command(seq):
result = []
needquote = False
for arg in seq:
bs_buf = []
if result:
result.append(' ')
needquote = (" " in arg) or ("\t" in arg) or ("\"" in arg) or ("[" in arg) or (";" in arg) or not arg
if needquote:
result.append('"')
for c in arg:
if c == '\\':
bs_buf.append(c)
elif c == '"':
result.append('\\' * len(bs_buf)*2)
bs_buf = []
result.append('\\"')
else:
if bs_buf:
result.extend(bs_buf)
bs_buf = []
result.append(c)
if bs_buf:
result.extend(bs_buf)
if needquote:
result.extend(bs_buf)
result.append('"')
return ''.join(result)
def updateEnv(env, args):
arg_idx_next = len(args)
unset_next_env_var = False
for arg_idx, arg in enumerate(args[1:]):
if arg == '-u':
unset_next_env_var = True
continue
if unset_next_env_var:
unset_next_env_var = False
if arg in env.env:
del env.env[arg]
continue
key, eq, val = arg.partition('=')
if eq == '':
arg_idx_next = arg_idx + 1
break
env.env[key] = val
return args[arg_idx_next:]
def executeBuiltinCd(cmd, shenv):
if len(cmd.args) != 2:
raise InternalShellError(cmd, "'cd' supports only one argument")
shenv.change_dir(cmd.args[1])
return ShellCommandResult(cmd, "", "", 0, False)
def executeBuiltinPushd(cmd, shenv):
if len(cmd.args) != 2:
raise InternalShellError(cmd, "'pushd' supports only one argument")
shenv.dirStack.append(shenv.cwd)
shenv.change_dir(cmd.args[1])
return ShellCommandResult(cmd, "", "", 0, False)
def executeBuiltinPopd(cmd, shenv):
if len(cmd.args) != 1:
raise InternalShellError(cmd, "'popd' does not support arguments")
if not shenv.dirStack:
raise InternalShellError(cmd, "popd: directory stack empty")
shenv.cwd = shenv.dirStack.pop()
return ShellCommandResult(cmd, "", "", 0, False)
def executeBuiltinExport(cmd, shenv):
if len(cmd.args) != 2:
raise InternalShellError("'export' supports only one argument")
updateEnv(shenv, cmd.args)
return ShellCommandResult(cmd, "", "", 0, False)
def executeBuiltinEcho(cmd, shenv):
opened_files = []
stdin, stdout, stderr = processRedirects(cmd, subprocess.PIPE, shenv,
opened_files)
if stdin != subprocess.PIPE or stderr != subprocess.PIPE:
raise InternalShellError(
cmd, "stdin and stderr redirects not supported for echo")
is_redirected = True
encode = lambda x : x
if stdout == subprocess.PIPE:
is_redirected = False
stdout = StringIO()
elif kIsWindows:
encode = lit.util.to_bytes
stdout = open(stdout.name, stdout.mode + 'b')
opened_files.append((None, None, stdout, None))
args = cmd.args[1:]
interpret_escapes = False
write_newline = True
while len(args) >= 1 and args[0] in ('-e', '-n'):
flag = args[0]
args = args[1:]
if flag == '-e':
interpret_escapes = True
elif flag == '-n':
write_newline = False
def maybeUnescape(arg):
if not interpret_escapes:
return arg
arg = lit.util.to_bytes(arg)
codec = 'string_escape' if sys.version_info < (3,0) else 'unicode_escape'
return arg.decode(codec)
if args:
for arg in args[:-1]:
stdout.write(encode(maybeUnescape(arg)))
stdout.write(encode(' '))
stdout.write(encode(maybeUnescape(args[-1])))
if write_newline:
stdout.write(encode('\n'))
for (name, mode, f, path) in opened_files:
f.close()
output = "" if is_redirected else stdout.getvalue()
return ShellCommandResult(cmd, output, "", 0, False)
def executeBuiltinMkdir(cmd, cmd_shenv):
args = expand_glob_expressions(cmd.args, cmd_shenv.cwd)[1:]
try:
opts, args = getopt.gnu_getopt(args, 'p')
except getopt.GetoptError as err:
raise InternalShellError(cmd, "Unsupported: 'mkdir': %s" % str(err))
parent = False
for o, a in opts:
if o == "-p":
parent = True
else:
assert False, "unhandled option"
if len(args) == 0:
raise InternalShellError(cmd, "Error: 'mkdir' is missing an operand")
stderr = StringIO()
exitCode = 0
for dir in args:
cwd = cmd_shenv.cwd
dir = to_unicode(dir) if kIsWindows else to_bytes(dir)
cwd = to_unicode(cwd) if kIsWindows else to_bytes(cwd)
if not os.path.isabs(dir):
dir = os.path.realpath(os.path.join(cwd, dir))
if parent:
lit.util.mkdir_p(dir)
else:
try:
lit.util.mkdir(dir)
except OSError as err:
stderr.write("Error: 'mkdir' command failed, %s\n" % str(err))
exitCode = 1
return ShellCommandResult(cmd, "", stderr.getvalue(), exitCode, False)
def executeBuiltinRm(cmd, cmd_shenv):
args = expand_glob_expressions(cmd.args, cmd_shenv.cwd)[1:]
try:
opts, args = getopt.gnu_getopt(args, "frR", ["--recursive"])
except getopt.GetoptError as err:
raise InternalShellError(cmd, "Unsupported: 'rm': %s" % str(err))
force = False
recursive = False
for o, a in opts:
if o == "-f":
force = True
elif o in ("-r", "-R", "--recursive"):
recursive = True
else:
assert False, "unhandled option"
if len(args) == 0:
raise InternalShellError(cmd, "Error: 'rm' is missing an operand")
def on_rm_error(func, path, exc_info):
os.chmod(path, stat.S_IMODE( os.stat(path).st_mode) | stat.S_IWRITE)
os.remove(path)
stderr = StringIO()
exitCode = 0
for path in args:
cwd = cmd_shenv.cwd
path = to_unicode(path) if kIsWindows else to_bytes(path)
cwd = to_unicode(cwd) if kIsWindows else to_bytes(cwd)
if not os.path.isabs(path):
path = os.path.realpath(os.path.join(cwd, path))
if force and not os.path.exists(path):
continue
try:
if os.path.isdir(path):
if not recursive:
stderr.write("Error: %s is a directory\n" % path)
exitCode = 1
if platform.system() == 'Windows':
from ctypes.wintypes import BOOL, HWND, LPCWSTR, UINT, WORD
from ctypes import addressof, byref, c_void_p, create_unicode_buffer
from ctypes import Structure
from ctypes import windll, WinError, POINTER
class SHFILEOPSTRUCTW(Structure):
_fields_ = [
('hWnd', HWND),
('wFunc', UINT),
('pFrom', LPCWSTR),
('pTo', LPCWSTR),
('fFlags', WORD),
('fAnyOperationsAborted', BOOL),
('hNameMappings', c_void_p),
('lpszProgressTitle', LPCWSTR),
]
FO_MOVE, FO_COPY, FO_DELETE, FO_RENAME = range(1, 5)
FOF_SILENT = 4
FOF_NOCONFIRMATION = 16
FOF_NOCONFIRMMKDIR = 512
FOF_NOERRORUI = 1024
FOF_NO_UI = FOF_SILENT | FOF_NOCONFIRMATION | FOF_NOERRORUI | FOF_NOCONFIRMMKDIR
SHFileOperationW = windll.shell32.SHFileOperationW
SHFileOperationW.argtypes = [POINTER(SHFILEOPSTRUCTW)]
path = os.path.abspath(path)
pFrom = create_unicode_buffer(path, len(path) + 2)
pFrom[len(path)] = pFrom[len(path) + 1] = '\0'
operation = SHFILEOPSTRUCTW(wFunc=UINT(FO_DELETE),
pFrom=LPCWSTR(addressof(pFrom)),
fFlags=FOF_NO_UI)
result = SHFileOperationW(byref(operation))
if result:
raise WinError(result)
else:
shutil.rmtree(path, onerror = on_rm_error if force else None)
else:
if force and not os.access(path, os.W_OK):
os.chmod(path,
stat.S_IMODE(os.stat(path).st_mode) | stat.S_IWRITE)
os.remove(path)
except OSError as err:
stderr.write("Error: 'rm' command failed, %s" % str(err))
exitCode = 1
return ShellCommandResult(cmd, "", stderr.getvalue(), exitCode, False)
def executeBuiltinColon(cmd, cmd_shenv):
return ShellCommandResult(cmd, "", "", 0, False)
def processRedirects(cmd, stdin_source, cmd_shenv, opened_files):
redirects = [(0,), (1,), (2,)]
for (op, filename) in cmd.redirects:
if op == ('>',2):
redirects[2] = [filename, 'w', None]
elif op == ('>>',2):
redirects[2] = [filename, 'a', None]
elif op == ('>&',2) and filename in '012':
redirects[2] = redirects[int(filename)]
elif op == ('>&',) or op == ('&>',):
redirects[1] = redirects[2] = [filename, 'w', None]
elif op == ('>',):
redirects[1] = [filename, 'w', None]
elif op == ('>>',):
redirects[1] = [filename, 'a', None]
elif op == ('<',):
redirects[0] = [filename, 'r', None]
else:
raise InternalShellError(cmd, "Unsupported redirect: %r" % ((op, filename),))
std_fds = [None, None, None]
for (index, r) in enumerate(redirects):
if isinstance(r, tuple):
if r == (0,):
fd = stdin_source
elif r == (1,):
if index == 0:
raise InternalShellError(cmd, "Unsupported redirect for stdin")
elif index == 1:
fd = subprocess.PIPE
else:
fd = subprocess.STDOUT
elif r == (2,):
if index != 2:
raise InternalShellError(cmd, "Unsupported redirect on stdout")
fd = subprocess.PIPE
else:
raise InternalShellError(cmd, "Bad redirect")
std_fds[index] = fd
continue
(filename, mode, fd) = r
if fd is not None:
std_fds[index] = fd
continue
redir_filename = None
name = expand_glob(filename, cmd_shenv.cwd)
if len(name) != 1:
raise InternalShellError(cmd, "Unsupported: glob in "
"redirect expanded to multiple files")
name = name[0]
if kAvoidDevNull and name == kDevNull:
fd = tempfile.TemporaryFile(mode=mode)
elif kIsWindows and name == '/dev/tty':
fd = open("CON", mode)
else:
redir_filename = os.path.join(cmd_shenv.cwd, name)
redir_filename = to_unicode(redir_filename) \
if kIsWindows else to_bytes(redir_filename)
fd = open(redir_filename, mode)
if mode == 'a':
fd.seek(0, 2)
r[2] = fd
opened_files.append((filename, mode, fd) + (redir_filename,))
std_fds[index] = fd
return std_fds
def _executeShCmd(cmd, shenv, results, timeoutHelper):
if timeoutHelper.timeoutReached():
return None
if isinstance(cmd, ShUtil.Seq):
if cmd.op == ';':
res = _executeShCmd(cmd.lhs, shenv, results, timeoutHelper)
return _executeShCmd(cmd.rhs, shenv, results, timeoutHelper)
if cmd.op == '&':
raise InternalShellError(cmd,"unsupported shell operator: '&'")
if cmd.op == '||':
res = _executeShCmd(cmd.lhs, shenv, results, timeoutHelper)
if res != 0:
res = _executeShCmd(cmd.rhs, shenv, results, timeoutHelper)
return res
if cmd.op == '&&':
res = _executeShCmd(cmd.lhs, shenv, results, timeoutHelper)
if res is None:
return res
if res == 0:
res = _executeShCmd(cmd.rhs, shenv, results, timeoutHelper)
return res
raise ValueError('Unknown shell command: %r' % cmd.op)
assert isinstance(cmd, ShUtil.Pipeline)
procs = []
proc_not_counts = []
default_stdin = subprocess.PIPE
stderrTempFiles = []
opened_files = []
named_temp_files = []
builtin_commands = set(['cat', 'diff'])
builtin_commands_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "builtin_commands")
inproc_builtins = {'cd': executeBuiltinCd,
'export': executeBuiltinExport,
'echo': executeBuiltinEcho,
'mkdir': executeBuiltinMkdir,
'popd': executeBuiltinPopd,
'pushd': executeBuiltinPushd,
'rm': executeBuiltinRm,
':': executeBuiltinColon}
for i,j in enumerate(cmd.commands):
cmd_shenv = shenv
args = list(j.args)
not_args = []
not_count = 0
not_crash = False
while True:
if args[0] == 'env':
if cmd_shenv is shenv:
cmd_shenv = ShellEnvironment(shenv.cwd, shenv.env)
args = updateEnv(cmd_shenv, args)
if not args:
raise InternalShellError(j, "Error: 'env' requires a"
" subcommand")
elif args[0] == 'not':
not_args.append(args.pop(0))
not_count += 1
if args and args[0] == '--crash':
not_args.append(args.pop(0))
not_crash = True
if not args:
raise InternalShellError(j, "Error: 'not' requires a"
" subcommand")
elif args[0] == '!':
not_args.append(args.pop(0))
not_count += 1
if not args:
raise InternalShellError(j, "Error: '!' requires a"
" subcommand")
else:
break
inproc_builtin = inproc_builtins.get(args[0], None)
if inproc_builtin and (args[0] != 'echo' or len(cmd.commands) == 1):
if not cmd_shenv is shenv:
raise InternalShellError(j, "Error: 'env' cannot call '{}'"
.format(args[0]))
if not_crash:
raise InternalShellError(j, "Error: 'not --crash' cannot call"
" '{}'".format(args[0]))
if len(cmd.commands) != 1:
raise InternalShellError(j, "Unsupported: '{}' cannot be part"
" of a pipeline".format(args[0]))
result = inproc_builtin(Command(args, j.redirects), cmd_shenv)
if not_count % 2:
result.exitCode = int(not result.exitCode)
result.command.args = j.args;
results.append(result)
return result.exitCode
if args[0] in builtin_commands:
args.insert(0, sys.executable)
cmd_shenv.env['PYTHONPATH'] = \
os.path.dirname(os.path.abspath(__file__))
args[1] = os.path.join(builtin_commands_dir, args[1] + ".py")
if not_crash:
args = not_args + args
not_count = 0
else:
not_args = []
stdin, stdout, stderr = processRedirects(j, default_stdin, cmd_shenv,
opened_files)
if (stderr == subprocess.STDOUT and stdout != subprocess.PIPE):
stderr = subprocess.PIPE
stderrIsStdout = True
else:
stderrIsStdout = False
if stderr == subprocess.PIPE and j != cmd.commands[-1]:
stderr = tempfile.TemporaryFile(mode='w+b')
stderrTempFiles.append((i, stderr))
executable = None
if args[0].startswith('.'):
exe_in_cwd = os.path.join(cmd_shenv.cwd, args[0])
if os.path.isfile(exe_in_cwd):
executable = exe_in_cwd
if not executable:
executable = lit.util.which(args[0], cmd_shenv.env['PATH'])
if not executable:
raise InternalShellError(j, '%r: command not found' % args[0])
if kAvoidDevNull:
try:
str_type = basestring
except NameError:
str_type = str
for i,arg in enumerate(args):
if isinstance(arg, str_type) and kDevNull in arg:
f = tempfile.NamedTemporaryFile(delete=False)
f.close()
named_temp_files.append(f.name)
args[i] = arg.replace(kDevNull, f.name)
args = expand_glob_expressions(args, cmd_shenv.cwd)
if kIsWindows:
args = quote_windows_command(args)
try:
procs.append(subprocess.Popen(args, cwd=cmd_shenv.cwd,
executable = executable,
stdin = stdin,
stdout = stdout,
stderr = stderr,
env = cmd_shenv.env,
close_fds = kUseCloseFDs,
universal_newlines = True,
errors = 'replace'))
proc_not_counts.append(not_count)
timeoutHelper.addProcess(procs[-1])
except OSError as e:
raise InternalShellError(j, 'Could not create process ({}) due to {}'.format(executable, e))
if stdin == subprocess.PIPE:
procs[-1].stdin.close()
procs[-1].stdin = None
if stdout == subprocess.PIPE:
default_stdin = procs[-1].stdout
elif stderrIsStdout:
default_stdin = procs[-1].stderr
else:
default_stdin = subprocess.PIPE
for (name, mode, f, path) in opened_files:
f.close()
procData = [None] * len(procs)
procData[-1] = procs[-1].communicate()
for i in range(len(procs) - 1):
if procs[i].stdout is not None:
out = procs[i].stdout.read()
else:
out = ''
if procs[i].stderr is not None:
err = procs[i].stderr.read()
else:
err = ''
procData[i] = (out,err)
for i,f in stderrTempFiles:
f.seek(0, 0)
procData[i] = (procData[i][0], f.read())
f.close()
exitCode = None
for i,(out,err) in enumerate(procData):
res = procs[i].wait()
if res == -signal.SIGINT:
raise KeyboardInterrupt
if proc_not_counts[i] % 2:
res = not res
elif proc_not_counts[i] > 1:
res = 1 if res != 0 else 0
try:
if out is None:
out = ''
else:
out = to_string(out.decode('utf-8', errors='replace'))
except:
out = str(out)
try:
if err is None:
err = ''
else:
err = to_string(err.decode('utf-8', errors='replace'))
except:
err = str(err)
output_files = []
if res != 0:
for (name, mode, f, path) in sorted(opened_files):
if path is not None and mode in ('w', 'a'):
try:
with open(path, 'rb') as f:
data = f.read()
except:
data = None
if data is not None:
output_files.append((name, path, data))
results.append(ShellCommandResult(
cmd.commands[i], out, err, res, timeoutHelper.timeoutReached(),
output_files))
if cmd.pipe_err:
if not exitCode or res != 0:
exitCode = res
else:
exitCode = res
for f in named_temp_files:
try:
os.remove(f)
except OSError:
pass
if cmd.negate:
exitCode = not exitCode
return exitCode
def executeScriptInternal(test, litConfig, tmpBase, commands, cwd):
cmds = []
for i, ln in enumerate(commands):
match = re.match(kPdbgRegex, ln)
if match:
command = match.group(2)
ln = commands[i] = \
match.expand(": '\\1'; \\2" if command else ": '\\1'")
try:
cmds.append(ShUtil.ShParser(ln, litConfig.isWindows,
test.config.pipefail).parse())
except:
return lit.Test.Result(Test.FAIL, "shell parser error on: %r" % ln)
cmd = cmds[0]
for c in cmds[1:]:
cmd = ShUtil.Seq(cmd, '&&', c)
results = []
timeoutInfo = None
try:
shenv = ShellEnvironment(cwd, test.config.environment)
exitCode, timeoutInfo = executeShCmd(cmd, shenv, results, timeout=litConfig.maxIndividualTestTime)
except InternalShellError:
e = sys.exc_info()[1]
exitCode = 127
results.append(
ShellCommandResult(e.command, '', e.message, exitCode, False))
out = err = ''
for i,result in enumerate(results):
out += '$ %s\n' % (' '.join('"%s"' % s
for s in result.command.args),)
if litConfig.maxIndividualTestTime == 0 and \
result.exitCode == 0 and \
not result.stdout.strip() and not result.stderr.strip():
continue
for (name, path, data) in result.outputFiles:
if data.strip():
out += "# redirected output from %r:\n" % (name,)
data = to_string(data.decode('utf-8', errors='replace'))
if len(data) > 1024:
out += data[:1024] + "\n...\n"
out += "note: data was truncated\n"
else:
out += data
out += "\n"
if result.stdout.strip():
out += '# command output:\n%s\n' % (result.stdout,)
if result.stderr.strip():
out += '# command stderr:\n%s\n' % (result.stderr,)
if not result.stdout.strip() and not result.stderr.strip():
out += "note: command had no output on stdout or stderr\n"
if result.exitCode != 0:
if litConfig.isWindows and result.exitCode < 0:
codeStr = hex(int(result.exitCode & 0xFFFFFFFF)).rstrip("L")
else:
codeStr = str(result.exitCode)
out += "error: command failed with exit status: %s\n" % (
codeStr,)
if litConfig.maxIndividualTestTime > 0 and result.timeoutReached:
out += 'error: command reached timeout: %s\n' % (
str(result.timeoutReached),)
return out, err, exitCode, timeoutInfo
def executeScript(test, litConfig, tmpBase, commands, cwd):
bashPath = litConfig.getBashPath()
isWin32CMDEXE = (litConfig.isWindows and not bashPath)
script = tmpBase + '.script'
if isWin32CMDEXE:
script += '.bat'
mode = 'w'
open_kwargs = {}
if litConfig.isWindows and not isWin32CMDEXE:
mode += 'b' elif sys.version_info > (3,0):
open_kwargs['encoding'] = 'utf-8'
f = open(script, mode, **open_kwargs)
if isWin32CMDEXE:
for i, ln in enumerate(commands):
match = re.match(kPdbgRegex, ln)
if match:
command = match.group(2)
commands[i] = \
match.expand("echo '\\1' > nul && " if command
else "echo '\\1' > nul")
if litConfig.echo_all_commands:
f.write('@echo on\n')
else:
f.write('@echo off\n')
f.write('\n@if %ERRORLEVEL% NEQ 0 EXIT\n'.join(commands))
else:
for i, ln in enumerate(commands):
match = re.match(kPdbgRegex, ln)
if match:
command = match.group(2)
commands[i] = match.expand(": '\\1'; \\2" if command
else ": '\\1'")
if test.config.pipefail:
f.write(b'set -o pipefail;' if mode == 'wb' else 'set -o pipefail;')
if litConfig.echo_all_commands:
f.write(b'set -x;' if mode == 'wb' else 'set -x;')
if sys.version_info > (3,0) and mode == 'wb':
f.write(bytes('{ ' + '; } &&\n{ '.join(commands) + '; }', 'utf-8'))
else:
f.write('{ ' + '; } &&\n{ '.join(commands) + '; }')
f.write(b'\n' if mode == 'wb' else '\n')
f.close()
if isWin32CMDEXE:
command = ['cmd','/c', script]
else:
if bashPath:
command = [bashPath, script]
else:
command = ['/bin/sh', script]
if litConfig.useValgrind:
command = litConfig.valgrindArgs + command
try:
out, err, exitCode = lit.util.executeCommand(command, cwd=cwd,
env=test.config.environment,
timeout=litConfig.maxIndividualTestTime)
return (out, err, exitCode, None)
except lit.util.ExecuteCommandTimeoutException as e:
return (e.out, e.err, e.exitCode, e.msg)
def parseIntegratedTestScriptCommands(source_path, keywords):
keywords_re = re.compile(
to_bytes("(%s)(.*)\n" % ("|".join(re.escape(k) for k in keywords),)))
f = open(source_path, 'rb')
try:
data = f.read()
if not data.endswith(to_bytes('\n')):
data = data + to_bytes('\n')
line_number = 1
last_match_position = 0
for match in keywords_re.finditer(data):
match_position = match.start()
line_number += data.count(to_bytes('\n'), last_match_position,
match_position)
last_match_position = match_position
keyword,ln = match.groups()
yield (line_number, to_string(keyword.decode('utf-8')),
to_string(ln.decode('utf-8').rstrip('\r')))
finally:
f.close()
def getTempPaths(test):
execpath = test.getExecPath()
execdir,execbase = os.path.split(execpath)
tmpDir = os.path.join(execdir, 'Output')
tmpBase = os.path.join(tmpDir, execbase)
return tmpDir, tmpBase
def colonNormalizePath(path):
if kIsWindows:
return re.sub(r'^(.):', r'\1', path.replace('\\', '/'))
else:
assert path[0] == '/'
return path[1:]
def getDefaultSubstitutions(test, tmpDir, tmpBase, normalize_slashes=False):
sourcepath = test.getSourcePath()
sourcedir = os.path.dirname(sourcepath)
if normalize_slashes:
sourcepath = sourcepath.replace('\\', '/')
sourcedir = sourcedir.replace('\\', '/')
tmpDir = tmpDir.replace('\\', '/')
tmpBase = tmpBase.replace('\\', '/')
substitutions = []
substitutions.extend(test.config.substitutions)
tmpName = tmpBase + '.tmp'
baseName = os.path.basename(tmpBase)
substitutions.extend([('%s', sourcepath),
('%S', sourcedir),
('%p', sourcedir),
('%{pathsep}', os.pathsep),
('%t', tmpName),
('%basename_t', baseName),
('%T', tmpDir)])
substitutions.extend([
('%{fs-src-root}', pathlib.Path(sourcedir).anchor),
('%{fs-tmp-root}', pathlib.Path(tmpBase).anchor),
('%{fs-sep}', os.path.sep),
])
substitutions.extend([
('%/s', sourcepath.replace('\\', '/')),
('%/S', sourcedir.replace('\\', '/')),
('%/p', sourcedir.replace('\\', '/')),
('%/t', tmpBase.replace('\\', '/') + '.tmp'),
('%/T', tmpDir.replace('\\', '/')),
])
def regex_escape(s):
s = s.replace('@', r'\@')
s = s.replace('&', r'\&')
return s
substitutions.extend([
('%{/s:regex_replacement}',
regex_escape(sourcepath.replace('\\', '/'))),
('%{/S:regex_replacement}',
regex_escape(sourcedir.replace('\\', '/'))),
('%{/p:regex_replacement}',
regex_escape(sourcedir.replace('\\', '/'))),
('%{/t:regex_replacement}',
regex_escape(tmpBase.replace('\\', '/')) + '.tmp'),
('%{/T:regex_replacement}',
regex_escape(tmpDir.replace('\\', '/'))),
])
substitutions.extend([
('%:s', colonNormalizePath(sourcepath)),
('%:S', colonNormalizePath(sourcedir)),
('%:p', colonNormalizePath(sourcedir)),
('%:t', colonNormalizePath(tmpBase + '.tmp')),
('%:T', colonNormalizePath(tmpDir)),
])
return substitutions
def _memoize(f):
cache = {} def memoized(x):
if x not in cache:
cache[x] = f(x)
return cache[x]
return memoized
@_memoize
def _caching_re_compile(r):
return re.compile(r)
def applySubstitutions(script, substitutions, conditions={},
recursion_limit=None):
def escapePercents(ln):
return _caching_re_compile('%%').sub('#_MARKER_#', ln)
def unescapePercents(ln):
return _caching_re_compile('#_MARKER_#').sub('%', ln)
def substituteIfElse(ln):
if ln.find('%if ') == -1:
return ln
def tryParseIfCond(ln):
if not ln.startswith('%if '):
return None, ln
ln = ln[4:]
match = _caching_re_compile('%{').search(ln)
if not match:
raise ValueError("'%{' is missing for %if substitution")
cond = ln[:match.start()]
ln = ln[match.end():]
return cond, ln
def tryParseElse(ln):
match = _caching_re_compile('^\s*%else\s*(%{)?').search(ln)
if not match:
return False, ln
if not match.group(1):
raise ValueError("'%{' is missing for %else substitution")
return True, ln[match.end():]
def tryParseEnd(ln):
if ln.startswith('%}'):
return True, ln[2:]
return False, ln
def parseText(ln, isNested):
match = _caching_re_compile(
'(.*?)(?:%if|%})' if isNested else '(.*?)(?:%if)').search(ln)
if not match:
return ln, ''
text_end = match.end(1)
return ln[:text_end], ln[text_end:]
def parseRecursive(ln, isNested):
result = ''
while len(ln):
if isNested:
found_end, _ = tryParseEnd(ln)
if found_end:
break
cond, ln = tryParseIfCond(ln)
if cond:
branch_if, ln = parseRecursive(ln, isNested=True)
found_end, ln = tryParseEnd(ln)
if not found_end:
raise ValueError("'%}' is missing for %if substitution")
branch_else = ''
found_else, ln = tryParseElse(ln)
if found_else:
branch_else, ln = parseRecursive(ln, isNested=True)
found_end, ln = tryParseEnd(ln)
if not found_end:
raise ValueError("'%}' is missing for %else substitution")
if BooleanExpression.evaluate(cond, conditions):
result += branch_if
else:
result += branch_else
continue
text, ln = parseText(ln, isNested)
result += text
return result, ln
result, ln = parseRecursive(ln, isNested=False)
assert len(ln) == 0
return result
def processLine(ln):
ln = substituteIfElse(escapePercents(ln))
for a,b in substitutions:
if kIsWindows:
b = b.replace("\\","\\\\")
ln = _caching_re_compile(a).sub(str(b), escapePercents(ln))
return ln.strip()
def processLineToFixedPoint(ln):
assert isinstance(recursion_limit, int) and recursion_limit >= 0
origLine = ln
steps = 0
processed = processLine(ln)
while processed != ln and steps < recursion_limit:
ln = processed
processed = processLine(ln)
steps += 1
if processed != ln:
raise ValueError("Recursive substitution of '%s' did not complete "
"in the provided recursion limit (%s)" % \
(origLine, recursion_limit))
return processed
process = processLine if recursion_limit is None else processLineToFixedPoint
return [unescapePercents(process(ln)) for ln in script]
class ParserKind(object):
TAG = 0
COMMAND = 1
LIST = 2
BOOLEAN_EXPR = 3
INTEGER = 4
CUSTOM = 5
@staticmethod
def allowedKeywordSuffixes(value):
return { ParserKind.TAG: ['.'],
ParserKind.COMMAND: [':'],
ParserKind.LIST: [':'],
ParserKind.BOOLEAN_EXPR: [':'],
ParserKind.INTEGER: [':'],
ParserKind.CUSTOM: [':', '.']
} [value]
@staticmethod
def str(value):
return { ParserKind.TAG: 'TAG',
ParserKind.COMMAND: 'COMMAND',
ParserKind.LIST: 'LIST',
ParserKind.BOOLEAN_EXPR: 'BOOLEAN_EXPR',
ParserKind.INTEGER: 'INTEGER',
ParserKind.CUSTOM: 'CUSTOM'
} [value]
class IntegratedTestKeywordParser(object):
def __init__(self, keyword, kind, parser=None, initial_value=None):
allowedSuffixes = ParserKind.allowedKeywordSuffixes(kind)
if len(keyword) == 0 or keyword[-1] not in allowedSuffixes:
if len(allowedSuffixes) == 1:
raise ValueError("Keyword '%s' of kind '%s' must end in '%s'"
% (keyword, ParserKind.str(kind),
allowedSuffixes[0]))
else:
raise ValueError("Keyword '%s' of kind '%s' must end in "
" one of '%s'"
% (keyword, ParserKind.str(kind),
' '.join(allowedSuffixes)))
if parser is not None and kind != ParserKind.CUSTOM:
raise ValueError("custom parsers can only be specified with "
"ParserKind.CUSTOM")
self.keyword = keyword
self.kind = kind
self.parsed_lines = []
self.value = initial_value
self.parser = parser
if kind == ParserKind.COMMAND:
self.parser = lambda line_number, line, output: \
self._handleCommand(line_number, line, output,
self.keyword)
elif kind == ParserKind.LIST:
self.parser = self._handleList
elif kind == ParserKind.BOOLEAN_EXPR:
self.parser = self._handleBooleanExpr
elif kind == ParserKind.INTEGER:
self.parser = self._handleSingleInteger
elif kind == ParserKind.TAG:
self.parser = self._handleTag
elif kind == ParserKind.CUSTOM:
if parser is None:
raise ValueError("ParserKind.CUSTOM requires a custom parser")
self.parser = parser
else:
raise ValueError("Unknown kind '%s'" % kind)
def parseLine(self, line_number, line):
try:
self.parsed_lines += [(line_number, line)]
self.value = self.parser(line_number, line, self.value)
except ValueError as e:
raise ValueError(str(e) + ("\nin %s directive on test line %d" %
(self.keyword, line_number)))
def getValue(self):
return self.value
@staticmethod
def _handleTag(line_number, line, output):
return (not line.strip() or output)
@staticmethod
def _handleCommand(line_number, line, output, keyword):
line = line.rstrip()
line = re.sub(r'%\(line\)', str(line_number), line)
def replace_line_number(match):
if match.group(1) == '+':
return str(line_number + int(match.group(2)))
if match.group(1) == '-':
return str(line_number - int(match.group(2)))
line = re.sub(r'%\(line *([\+-]) *(\d+)\)', replace_line_number, line)
if output and output[-1][-1] == '\\':
output[-1] = output[-1][:-1] + line
else:
if output is None:
output = []
pdbg = "%dbg({keyword} at line {line_number})".format(
keyword=keyword,
line_number=line_number)
assert re.match(kPdbgRegex + "$", pdbg), \
"kPdbgRegex expected to match actual %dbg usage"
line = "{pdbg} {real_command}".format(
pdbg=pdbg,
real_command=line)
output.append(line)
return output
@staticmethod
def _handleList(line_number, line, output):
if output is None:
output = []
output.extend([s.strip() for s in line.split(',')])
return output
@staticmethod
def _handleSingleInteger(line_number, line, output):
if output is None:
output = []
try:
n = int(line)
except ValueError:
raise ValueError("INTEGER parser requires the input to be an integer (got {})".format(line))
output.append(n)
return output
@staticmethod
def _handleBooleanExpr(line_number, line, output):
parts = [s.strip() for s in line.split(',') if s.strip() != '']
if output and output[-1][-1] == '\\':
output[-1] = output[-1][:-1] + parts[0]
del parts[0]
if output is None:
output = []
output.extend(parts)
for s in output:
if s != '*' and not s.endswith('\\'):
BooleanExpression.evaluate(s, [])
return output
def _parseKeywords(sourcepath, additional_parsers=[],
require_script=True):
script = []
builtin_parsers = [
IntegratedTestKeywordParser('RUN:', ParserKind.COMMAND, initial_value=script),
IntegratedTestKeywordParser('XFAIL:', ParserKind.BOOLEAN_EXPR),
IntegratedTestKeywordParser('REQUIRES:', ParserKind.BOOLEAN_EXPR),
IntegratedTestKeywordParser('UNSUPPORTED:', ParserKind.BOOLEAN_EXPR),
IntegratedTestKeywordParser('ALLOW_RETRIES:', ParserKind.INTEGER),
IntegratedTestKeywordParser('END.', ParserKind.TAG)
]
keyword_parsers = {p.keyword: p for p in builtin_parsers}
for parser in additional_parsers:
if not isinstance(parser, IntegratedTestKeywordParser):
raise ValueError('Additional parser must be an instance of '
'IntegratedTestKeywordParser')
if parser.keyword in keyword_parsers:
raise ValueError("Parser for keyword '%s' already exists"
% parser.keyword)
keyword_parsers[parser.keyword] = parser
for line_number, command_type, ln in \
parseIntegratedTestScriptCommands(sourcepath,
keyword_parsers.keys()):
parser = keyword_parsers[command_type]
parser.parseLine(line_number, ln)
if command_type == 'END.' and parser.getValue() is True:
break
if require_script and not script:
raise ValueError("Test has no 'RUN:' line")
if script and script[-1][-1] == '\\':
raise ValueError("Test has unterminated 'RUN:' lines (with '\\')")
for key in keyword_parsers:
kp = keyword_parsers[key]
if kp.kind != ParserKind.BOOLEAN_EXPR:
continue
value = kp.getValue()
if value and value[-1][-1] == '\\':
raise ValueError("Test has unterminated '{key}' lines (with '\\')"
.format(key=key))
allowed_retries = keyword_parsers['ALLOW_RETRIES:'].getValue()
if allowed_retries and len(allowed_retries) > 1:
raise ValueError("Test has more than one ALLOW_RETRIES lines")
return {p.keyword: p.getValue() for p in keyword_parsers.values()}
def parseIntegratedTestScript(test, additional_parsers=[],
require_script=True):
try:
parsed = _parseKeywords(test.getSourcePath(), additional_parsers,
require_script)
except ValueError as e:
return lit.Test.Result(Test.UNRESOLVED, str(e))
script = parsed['RUN:'] or []
test.xfails += parsed['XFAIL:'] or []
test.requires += parsed['REQUIRES:'] or []
test.unsupported += parsed['UNSUPPORTED:'] or []
if parsed['ALLOW_RETRIES:']:
test.allowed_retries = parsed['ALLOW_RETRIES:'][0]
missing_required_features = test.getMissingRequiredFeatures()
if missing_required_features:
msg = ', '.join(missing_required_features)
return lit.Test.Result(Test.UNSUPPORTED,
"Test requires the following unavailable "
"features: %s" % msg)
unsupported_features = test.getUnsupportedFeatures()
if unsupported_features:
msg = ', '.join(unsupported_features)
return lit.Test.Result(
Test.UNSUPPORTED,
"Test does not support the following features "
"and/or targets: %s" % msg)
if not test.isWithinFeatureLimits():
msg = ', '.join(test.config.limit_to_features)
return lit.Test.Result(Test.UNSUPPORTED,
"Test does not require any of the features "
"specified in limit_to_features: %s" % msg)
return script
def _runShTest(test, litConfig, useExternalSh, script, tmpBase):
def runOnce(execdir):
if useExternalSh:
res = executeScript(test, litConfig, tmpBase, script, execdir)
else:
res = executeScriptInternal(test, litConfig, tmpBase, script, execdir)
if isinstance(res, lit.Test.Result):
return res
out,err,exitCode,timeoutInfo = res
if exitCode == 0:
status = Test.PASS
else:
if timeoutInfo is None:
status = Test.FAIL
else:
status = Test.TIMEOUT
return out,err,exitCode,timeoutInfo,status
lit.util.mkdir_p(os.path.dirname(tmpBase))
execdir = os.path.dirname(test.getExecPath())
attempts = test.allowed_retries + 1
for i in range(attempts):
res = runOnce(execdir)
if isinstance(res, lit.Test.Result):
return res
out,err,exitCode,timeoutInfo,status = res
if status != Test.FAIL:
break
if i > 0 and status == Test.PASS:
status = Test.FLAKYPASS
output = """Script:\n--\n%s\n--\nExit Code: %d\n""" % (
'\n'.join(script), exitCode)
if timeoutInfo is not None:
output += """Timeout: %s\n""" % (timeoutInfo,)
output += "\n"
if out:
output += """Command Output (stdout):\n--\n%s\n--\n""" % (out,)
if err:
output += """Command Output (stderr):\n--\n%s\n--\n""" % (err,)
return lit.Test.Result(status, output)
def executeShTest(test, litConfig, useExternalSh,
extra_substitutions=[],
preamble_commands=[]):
if test.config.unsupported:
return lit.Test.Result(Test.UNSUPPORTED, 'Test is unsupported')
script = list(preamble_commands)
parsed = parseIntegratedTestScript(test, require_script=not script)
if isinstance(parsed, lit.Test.Result):
return parsed
script += parsed
if litConfig.noExecute:
return lit.Test.Result(Test.PASS)
tmpDir, tmpBase = getTempPaths(test)
substitutions = list(extra_substitutions)
substitutions += getDefaultSubstitutions(test, tmpDir, tmpBase,
normalize_slashes=useExternalSh)
conditions = { feature: True for feature in test.config.available_features }
script = applySubstitutions(script, substitutions, conditions,
recursion_limit=test.config.recursiveExpansionLimit)
return _runShTest(test, litConfig, useExternalSh, script, tmpBase)