import os
import sys
import time
from subprocess import CalledProcessError, check_call
from typing import List, IO, Optional, Tuple
def which(command: str, paths: Optional[str] = None) -> Optional[str]:
if paths is None:
paths = os.environ.get('PATH', '')
if os.path.exists(command):
return command
if not paths:
paths = os.defpath
if os.pathsep == ';':
pathext = os.environ.get('PATHEXT', '').split(';')
else:
pathext = ['']
for path in paths.split(os.pathsep):
for ext in pathext:
p = os.path.join(path, command + ext)
if os.path.exists(p):
return p
return None
def has_no_extension(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext == ""
def is_valid_single_input_file(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
def time_to_str(time: float) -> str:
return f"{time:.2f}s"
def memory_to_str(memory: int) -> str:
if memory:
try:
import humanize
return humanize.naturalsize(memory, gnu=True)
except ImportError:
return f"{memory}B"
return "N/A"
def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
peak_mem = 0
start_time = time.time()
try:
import psutil as ps
def get_memory(process: ps.Process) -> int:
mem = 0
descendants = list(process.children(recursive=True))
descendants.append(process)
for subprocess in descendants:
try:
mem += subprocess.memory_info().rss
except (ps.NoSuchProcess, ps.AccessDenied):
continue
return mem
with ps.Popen(*popenargs, **kwargs) as process:
while (process.is_running() and
process.status() != ps.STATUS_ZOMBIE):
peak_mem = max(peak_mem, get_memory(process))
time.sleep(.5)
if process.is_running():
process.kill()
if process.returncode != 0:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(process.returncode, cmd)
except ImportError:
peak_mem = 0
check_call(*popenargs, **kwargs)
return time.time() - start_time, peak_mem
def run_script(script_path: str, build_log_file: IO, cwd: str,
out=sys.stdout, err=sys.stderr, verbose: int = 0):
if os.path.exists(script_path):
try:
if verbose == 1:
out.write(f" Executing: {script_path}\n")
check_call(f"chmod +x '{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
check_call(f"'{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
except CalledProcessError:
err.write(f"Error: Running {script_path} failed. "
f"See {build_log_file.name} for details.\n")
sys.exit(-1)
def is_comment_csv_line(entries: List[str]) -> bool:
return len(entries) > 0 and entries[0].startswith("#")