import CmpRuns
import SATestUtils as utils
from ProjectMap import DownloadType, ProjectInfo
import glob
import logging
import math
import multiprocessing
import os
import plistlib
import shutil
import sys
import threading
import time
import zipfile
from queue import Queue
from plistlib import InvalidFileException  from subprocess import CalledProcessError, check_call
from typing import Dict, IO, List, NamedTuple, Optional, TYPE_CHECKING, Tuple
class StreamToLogger:
    def __init__(self, logger: logging.Logger,
                 log_level: int = logging.INFO):
        self.logger = logger
        self.log_level = log_level
    def write(self, message: str):
                self.logger.log(self.log_level, message.rstrip())
    def flush(self):
        pass
    def fileno(self) -> int:
        return 0
LOCAL = threading.local()
def init_logger(name: str):
        logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    LOCAL.stdout = StreamToLogger(logger, logging.INFO)
    LOCAL.stderr = StreamToLogger(logger, logging.ERROR)
init_logger("main")
def stderr(message: str):
    LOCAL.stderr.write(message)
def stdout(message: str):
    LOCAL.stdout.write(message)
logging.basicConfig(
    format='%(asctime)s:%(levelname)s:%(name)s: %(message)s')
if 'CC' in os.environ:
    cc_candidate: Optional[str] = os.environ['CC']
else:
    cc_candidate = utils.which("clang", os.environ['PATH'])
if not cc_candidate:
    stderr("Error: cannot find 'clang' in PATH")
    sys.exit(1)
CLANG = cc_candidate
MAX_JOBS = int(math.ceil(multiprocessing.cpu_count() * 0.75))
DOWNLOAD_SCRIPT = "download_project.sh"
CLEANUP_SCRIPT = "cleanup_run_static_analyzer.sh"
BUILD_SCRIPT = "run_static_analyzer.cmd"
NO_PREFIX_CMD = "#NOPREFIX"
LOG_DIR_NAME = "Logs"
BUILD_LOG_NAME = "run_static_analyzer.log"
NUM_OF_FAILURES_IN_SUMMARY = 10
OUTPUT_DIR_NAME = "ScanBuildResults"
REF_PREFIX = "Ref"
CACHED_SOURCE_DIR_NAME = "CachedSource"
PATCHED_SOURCE_DIR_NAME = "PatchedSource"
PATCHFILE_NAME = "changes_for_analyzer.patch"
CHECKERS = ",".join([
    "alpha.unix.SimpleStream",
    "alpha.security.taint",
    "cplusplus.NewDeleteLeaks",
    "core",
    "cplusplus",
    "deadcode",
    "security",
    "unix",
    "osx",
    "nullability"
])
VERBOSE = 0
def run_cleanup_script(directory: str, build_log_file: IO):
    
    cwd = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
    script_path = os.path.join(directory, CLEANUP_SCRIPT)
    utils.run_script(script_path, build_log_file, cwd,
                     out=LOCAL.stdout, err=LOCAL.stderr,
                     verbose=VERBOSE)
class TestInfo(NamedTuple):
    
    project: ProjectInfo
    override_compiler: bool = False
    extra_analyzer_config: str = ""
    extra_checkers: str = ""
    is_reference_build: bool = False
    strictness: int = 0
if TYPE_CHECKING:
    TestQueue = Queue[TestInfo]  else:
    TestQueue = Queue  
class RegressionTester:
    
    def __init__(self, jobs: int, projects: List[ProjectInfo],
                 override_compiler: bool, extra_analyzer_config: str,
                 extra_checkers: str,
                 regenerate: bool, strictness: bool):
        self.jobs = jobs
        self.projects = projects
        self.override_compiler = override_compiler
        self.extra_analyzer_config = extra_analyzer_config
        self.extra_checkers = extra_checkers
        self.regenerate = regenerate
        self.strictness = strictness
    def test_all(self) -> bool:
        projects_to_test: List[TestInfo] = []
                for project in self.projects:
            projects_to_test.append(
                TestInfo(project,
                         self.override_compiler,
                         self.extra_analyzer_config,
                         self.extra_checkers,
                         self.regenerate, self.strictness))
        if self.jobs <= 1:
            return self._single_threaded_test_all(projects_to_test)
        else:
            return self._multi_threaded_test_all(projects_to_test)
    def _single_threaded_test_all(self,
                                  projects_to_test: List[TestInfo]) -> bool:
        
        success = True
        for project_info in projects_to_test:
            tester = ProjectTester(project_info)
            success &= tester.test()
        return success
    def _multi_threaded_test_all(self,
                                 projects_to_test: List[TestInfo]) -> bool:
        
        tasks_queue = TestQueue()
        for project_info in projects_to_test:
            tasks_queue.put(project_info)
        results_differ = threading.Event()
        failure_flag = threading.Event()
        for _ in range(self.jobs):
            T = TestProjectThread(tasks_queue, results_differ, failure_flag)
            T.start()
                while tasks_queue.unfinished_tasks:
            time.sleep(0.1)              if failure_flag.is_set():
                stderr("Test runner crashed\n")
                sys.exit(1)
        return not results_differ.is_set()
class ProjectTester:
    
    def __init__(self, test_info: TestInfo, silent: bool = False):
        self.project = test_info.project
        self.override_compiler = test_info.override_compiler
        self.extra_analyzer_config = test_info.extra_analyzer_config
        self.extra_checkers = test_info.extra_checkers
        self.is_reference_build = test_info.is_reference_build
        self.strictness = test_info.strictness
        self.silent = silent
    def test(self) -> bool:
        
        if not self.project.enabled:
            self.out(
                f" \n\n--- Skipping disabled project {self.project.name}\n")
            return True
        self.out(f" \n\n--- Building project {self.project.name}\n")
        start_time = time.time()
        project_dir = self.get_project_dir()
        self.vout(f"  Build directory: {project_dir}.\n")
                output_dir = self.get_output_dir()
        self.build(project_dir, output_dir)
        check_build(output_dir)
        if self.is_reference_build:
            cleanup_reference_results(output_dir)
            passed = True
        else:
            passed = run_cmp_results(project_dir, self.strictness)
        self.out(f"Completed tests for project {self.project.name} "
                 f"(time: {time.time() - start_time:.2f}).\n")
        return passed
    def get_project_dir(self) -> str:
        return os.path.join(os.path.abspath(os.curdir), self.project.name)
    def get_output_dir(self) -> str:
        if self.is_reference_build:
            dirname = REF_PREFIX + OUTPUT_DIR_NAME
        else:
            dirname = OUTPUT_DIR_NAME
        return os.path.join(self.get_project_dir(), dirname)
    def build(self, directory: str, output_dir: str) -> Tuple[float, int]:
        build_log_path = get_build_log_path(output_dir)
        self.out(f"Log file: {build_log_path}\n")
        self.out(f"Output directory: {output_dir}\n")
        remove_log_file(output_dir)
                if os.path.exists(output_dir):
            self.vout(f"  Removing old results: {output_dir}\n")
            shutil.rmtree(output_dir)
        assert(not os.path.exists(output_dir))
        os.makedirs(os.path.join(output_dir, LOG_DIR_NAME))
                with open(build_log_path, "w+") as build_log_file:
            if self.project.mode == 1:
                self._download_and_patch(directory, build_log_file)
                run_cleanup_script(directory, build_log_file)
                build_time, memory = self.scan_build(directory, output_dir,
                                                     build_log_file)
            else:
                build_time, memory = self.analyze_preprocessed(directory,
                                                               output_dir)
            if self.is_reference_build:
                run_cleanup_script(directory, build_log_file)
                normalize_reference_results(directory, output_dir,
                                            self.project.mode)
        self.out(f"Build complete (time: {utils.time_to_str(build_time)}, "
                 f"peak memory: {utils.memory_to_str(memory)}). "
                 f"See the log for more details: {build_log_path}\n")
        return build_time, memory
    def scan_build(self, directory: str, output_dir: str,
                   build_log_file: IO) -> Tuple[float, int]:
        
        build_script_path = os.path.join(directory, BUILD_SCRIPT)
        if not os.path.exists(build_script_path):
            stderr(f"Error: build script is not defined: "
                   f"{build_script_path}\n")
            sys.exit(1)
        all_checkers = CHECKERS
        if 'SA_ADDITIONAL_CHECKERS' in os.environ:
            all_checkers = (all_checkers + ',' +
                            os.environ['SA_ADDITIONAL_CHECKERS'])
        if self.extra_checkers != "":
            all_checkers += "," + self.extra_checkers
                cwd = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
        options = f"--use-analyzer '{CLANG}' "
        options += f"-plist-html -o '{output_dir}' "
        options += f"-enable-checker {all_checkers} "
        options += "--keep-empty "
        options += f"-analyzer-config '{self.generate_config()}' "
        if self.override_compiler:
            options += "--override-compiler "
        extra_env: Dict[str, str] = {}
        execution_time = 0.0
        peak_memory = 0
        try:
            command_file = open(build_script_path, "r")
            command_prefix = "scan-build " + options + " "
            for command in command_file:
                command = command.strip()
                if len(command) == 0:
                    continue
                                                                if command == NO_PREFIX_CMD:
                    command_prefix = ""
                    extra_env['OUTPUT'] = output_dir
                    extra_env['CC'] = CLANG
                    extra_env['ANALYZER_CONFIG'] = self.generate_config()
                    continue
                if command.startswith("#"):
                    continue
                                                                if (command.startswith("make ") or command == "make") and \
                        "-j" not in command:
                    command += f" -j{MAX_JOBS}"
                command_to_run = command_prefix + command
                self.vout(f"  Executing: {command_to_run}\n")
                time, mem = utils.check_and_measure_call(
                    command_to_run, cwd=cwd,
                    stderr=build_log_file,
                    stdout=build_log_file,
                    env=dict(os.environ, **extra_env),
                    shell=True)
                execution_time += time
                peak_memory = max(peak_memory, mem)
        except CalledProcessError:
            stderr("Error: scan-build failed. Its output was: \n")
            build_log_file.seek(0)
            shutil.copyfileobj(build_log_file, LOCAL.stderr)
            sys.exit(1)
        return execution_time, peak_memory
    def analyze_preprocessed(self, directory: str,
                             output_dir: str) -> Tuple[float, int]:
        
        if os.path.exists(os.path.join(directory, BUILD_SCRIPT)):
            stderr(f"Error: The preprocessed files project "
                   f"should not contain {BUILD_SCRIPT}\n")
            raise Exception()
        prefix = CLANG + " --analyze "
        prefix += "--analyzer-output plist "
        prefix += " -Xclang -analyzer-checker=" + CHECKERS
        prefix += " -fcxx-exceptions -fblocks "
        prefix += " -Xclang -analyzer-config "
        prefix += f"-Xclang {self.generate_config()} "
        if self.project.mode == 2:
            prefix += "-std=c++11 "
        plist_path = os.path.join(directory, output_dir, "date")
        fail_path = os.path.join(plist_path, "failures")
        os.makedirs(fail_path)
        execution_time = 0.0
        peak_memory = 0
        for full_file_name in glob.glob(directory + "/*"):
            file_name = os.path.basename(full_file_name)
            failed = False
                        if utils.has_no_extension(file_name):
                continue
            if not utils.is_valid_single_input_file(file_name):
                stderr(f"Error: Invalid single input file {full_file_name}.\n")
                raise Exception()
                        plist_basename = os.path.join(plist_path, file_name)
            output_option = f"-o '{plist_basename}.plist' "
            command = f"{prefix}{output_option}'{file_name}'"
            log_path = os.path.join(fail_path, file_name + ".stderr.txt")
            with open(log_path, "w+") as log_file:
                try:
                    self.vout(f"  Executing: {command}\n")
                    time, mem = utils.check_and_measure_call(
                        command, cwd=directory, stderr=log_file,
                        stdout=log_file, shell=True)
                    execution_time += time
                    peak_memory = max(peak_memory, mem)
                except CalledProcessError as e:
                    stderr(f"Error: Analyzes of {full_file_name} failed. "
                           f"See {log_file.name} for details. "
                           f"Error code {e.returncode}.\n")
                    failed = True
                                if not failed:
                    os.remove(log_file.name)
        return execution_time, peak_memory
    def generate_config(self) -> str:
        out = "serialize-stats=true,stable-report-filename=true"
        if self.extra_analyzer_config:
            out += "," + self.extra_analyzer_config
        return out
    def _download_and_patch(self, directory: str, build_log_file: IO):
        
        cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
                        if not os.path.exists(cached_source):
            self._download(directory, build_log_file)
            if not os.path.exists(cached_source):
                stderr(f"Error: '{cached_source}' not found after download.\n")
                exit(1)
        patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
                if os.path.exists(patched_source):
            shutil.rmtree(patched_source)
                shutil.copytree(cached_source, patched_source, symlinks=True)
        self._apply_patch(directory, build_log_file)
    def _download(self, directory: str, build_log_file: IO):
        
        if self.project.source == DownloadType.GIT:
            self._download_from_git(directory, build_log_file)
        elif self.project.source == DownloadType.ZIP:
            self._unpack_zip(directory, build_log_file)
        elif self.project.source == DownloadType.SCRIPT:
            self._run_download_script(directory, build_log_file)
        else:
            raise ValueError(
                f"Unknown source type '{self.project.source}' is found "
                f"for the '{self.project.name}' project")
    def _download_from_git(self, directory: str, build_log_file: IO):
        repo = self.project.origin
        cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
        check_call(f"git clone --recursive {repo} {cached_source}",
                   cwd=directory, stderr=build_log_file,
                   stdout=build_log_file, shell=True)
        check_call(f"git checkout --quiet {self.project.commit}",
                   cwd=cached_source, stderr=build_log_file,
                   stdout=build_log_file, shell=True)
    def _unpack_zip(self, directory: str, build_log_file: IO):
        zip_files = list(glob.glob(directory + "/*.zip"))
        if len(zip_files) == 0:
            raise ValueError(
                f"Couldn't find any zip files to unpack for the "
                f"'{self.project.name}' project")
        if len(zip_files) > 1:
            raise ValueError(
                f"Couldn't decide which of the zip files ({zip_files}) "
                f"for the '{self.project.name}' project to unpack")
        with zipfile.ZipFile(zip_files[0], "r") as zip_file:
            zip_file.extractall(os.path.join(directory,
                                             CACHED_SOURCE_DIR_NAME))
    @staticmethod
    def _run_download_script(directory: str, build_log_file: IO):
        script_path = os.path.join(directory, DOWNLOAD_SCRIPT)
        utils.run_script(script_path, build_log_file, directory,
                         out=LOCAL.stdout, err=LOCAL.stderr,
                         verbose=VERBOSE)
    def _apply_patch(self, directory: str, build_log_file: IO):
        patchfile_path = os.path.join(directory, PATCHFILE_NAME)
        patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
        if not os.path.exists(patchfile_path):
            self.out("  No local patches.\n")
            return
        self.out("  Applying patch.\n")
        try:
            check_call(f"patch -p1 < '{patchfile_path}'",
                       cwd=patched_source,
                       stderr=build_log_file,
                       stdout=build_log_file,
                       shell=True)
        except CalledProcessError:
            stderr(f"Error: Patch failed. "
                   f"See {build_log_file.name} for details.\n")
            sys.exit(1)
    def out(self, what: str):
        if not self.silent:
            stdout(what)
    def vout(self, what: str):
        if VERBOSE >= 1:
            self.out(what)
class TestProjectThread(threading.Thread):
    def __init__(self, tasks_queue: TestQueue,
                 results_differ: threading.Event,
                 failure_flag: threading.Event):
        
        self.tasks_queue = tasks_queue
        self.results_differ = results_differ
        self.failure_flag = failure_flag
        super().__init__()
                self.daemon = True
    def run(self):
        while not self.tasks_queue.empty():
            try:
                test_info = self.tasks_queue.get()
                init_logger(test_info.project.name)
                tester = ProjectTester(test_info)
                if not tester.test():
                    self.results_differ.set()
                self.tasks_queue.task_done()
            except BaseException:
                self.failure_flag.set()
                raise
def check_build(output_dir: str):
    
        failures = glob.glob(output_dir + "/*/failures/*.stderr.txt")
    total_failed = len(failures)
    if total_failed == 0:
        clean_up_empty_plists(output_dir)
        clean_up_empty_folders(output_dir)
        plists = glob.glob(output_dir + "/*/*.plist")
        stdout(f"Number of bug reports "
               f"(non-empty plist files) produced: {len(plists)}\n")
        return
    stderr("Error: analysis failed.\n")
    stderr(f"Total of {total_failed} failures discovered.\n")
    if total_failed > NUM_OF_FAILURES_IN_SUMMARY:
        stderr(f"See the first {NUM_OF_FAILURES_IN_SUMMARY} below.\n")
    for index, failed_log_path in enumerate(failures, start=1):
        if index >= NUM_OF_FAILURES_IN_SUMMARY:
            break
        stderr(f"\n-- Error #{index} -----------\n")
        with open(failed_log_path, "r") as failed_log:
            shutil.copyfileobj(failed_log, LOCAL.stdout)
    if total_failed > NUM_OF_FAILURES_IN_SUMMARY:
        stderr("See the results folder for more.")
    sys.exit(1)
def cleanup_reference_results(output_dir: str):
    
    extensions = ["html", "css", "js"]
    for extension in extensions:
        for file_to_rm in glob.glob(f"{output_dir}/*/*.{extension}"):
            file_to_rm = os.path.join(output_dir, file_to_rm)
            os.remove(file_to_rm)
        remove_log_file(output_dir)
def run_cmp_results(directory: str, strictness: int = 0) -> bool:
    
    tests_passed = True
    start_time = time.time()
    ref_dir = os.path.join(directory, REF_PREFIX + OUTPUT_DIR_NAME)
    new_dir = os.path.join(directory, OUTPUT_DIR_NAME)
        ref_list = glob.glob(ref_dir + "/*")
    new_list = glob.glob(new_dir + "/*")
        ref_log_dir = os.path.join(ref_dir, LOG_DIR_NAME)
    if ref_log_dir in ref_list:
        ref_list.remove(ref_log_dir)
    new_list.remove(os.path.join(new_dir, LOG_DIR_NAME))
    if len(ref_list) != len(new_list):
        stderr(f"Mismatch in number of results folders: "
               f"{ref_list} vs {new_list}")
        sys.exit(1)
            if len(ref_list) > 1:
                ref_list.sort()
        new_list.sort()
        num_diffs = 0
    for ref_dir, new_dir in zip(ref_list, new_list):
        assert(ref_dir != new_dir)
        if VERBOSE >= 1:
            stdout(f"  Comparing Results: {ref_dir} {new_dir}\n")
        patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
        ref_results = CmpRuns.ResultsDirectory(ref_dir)
        new_results = CmpRuns.ResultsDirectory(new_dir, patched_source)
                num_diffs, reports_in_ref, reports_in_new = \
            CmpRuns.dump_scan_build_results_diff(ref_results, new_results,
                                                 delete_empty=False,
                                                 out=LOCAL.stdout)
        if num_diffs > 0:
            stdout(f"Warning: {num_diffs} differences in diagnostics.\n")
        if strictness >= 2 and num_diffs > 0:
            stdout("Error: Diffs found in strict mode (2).\n")
            tests_passed = False
        elif strictness >= 1 and reports_in_ref != reports_in_new:
            stdout("Error: The number of results are different "
                   " strict mode (1).\n")
            tests_passed = False
    stdout(f"Diagnostic comparison complete "
           f"(time: {time.time() - start_time:.2f}).\n")
    return tests_passed
def normalize_reference_results(directory: str, output_dir: str,
                                build_mode: int):
    
    for dir_path, _, filenames in os.walk(output_dir):
        for filename in filenames:
            if not filename.endswith('plist'):
                continue
            plist = os.path.join(dir_path, filename)
            with open(plist, "rb") as plist_file:
                data = plistlib.load(plist_file)
            path_prefix = directory
            if build_mode == 1:
                path_prefix = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
            paths = [source[len(path_prefix) + 1:]
                     if source.startswith(path_prefix) else source
                     for source in data['files']]
            data['files'] = paths
                        for diagnostic in data['diagnostics']:
                if 'HTMLDiagnostics_files' in diagnostic:
                    diagnostic.pop('HTMLDiagnostics_files')
            if 'clang_version' in data:
                data.pop('clang_version')
            with open(plist, "wb") as plist_file:
                plistlib.dump(data, plist_file)
def get_build_log_path(output_dir: str) -> str:
    return os.path.join(output_dir, LOG_DIR_NAME, BUILD_LOG_NAME)
def remove_log_file(output_dir: str):
    build_log_path = get_build_log_path(output_dir)
        if os.path.exists(build_log_path):
        if VERBOSE >= 1:
            stdout(f"  Removing log file: {build_log_path}\n")
        os.remove(build_log_path)
def clean_up_empty_plists(output_dir: str):
    
    for plist in glob.glob(output_dir + "/*/*.plist"):
        plist = os.path.join(output_dir, plist)
        try:
            with open(plist, "rb") as plist_file:
                data = plistlib.load(plist_file)
                        if not data['files']:
                os.remove(plist)
                continue
        except InvalidFileException as e:
            stderr(f"Error parsing plist file {plist}: {str(e)}")
            continue
def clean_up_empty_folders(output_dir: str):
    
    subdirs = glob.glob(output_dir + "/*")
    for subdir in subdirs:
        if not os.listdir(subdir):
            os.removedirs(subdir)
if __name__ == "__main__":
    print("SATestBuild.py should not be used on its own.")
    print("Please use 'SATest.py build' instead")
    sys.exit(1)