Asynchronously compress build logs

[?]
Jun 19, 2015, 12:51 PM
GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC

Dependencies

  • [2] 5LBMP7GA Fix remote building
  • [3] PQFOMNTL hydra-queue-runner: More stats
  • [4] XV4AEKJC hydra-queue-runner: Handle status queries on the main thread
  • [5] WHULPA6S Handle failure with output
  • [6] O776XDS2 Make getDrvLogPath work with both bucketed and non-bucketed nix logs.
  • [7] NJJ7H64S Very basic multi-threaded queue runner
  • [8] T2EIYJNG On SIGINT, shut down the builder threads
  • [9] 5AIYUMTB Basic remote building
  • [10] ENXUSMSV Make concurrency more robust
  • [11] 24BMQDZA Start of single-process hydra-queue-runner
  • [12] OCZ4LSGG Automatically retry aborted builds
  • [13] NNOCZ4RO hydra-queue-runner: Improve dispatcher
  • [14] FQQRJUO4 Mark builds as busy
  • [15] YZAI5GQU Implement a database connection pool
  • [16] HHOMBU7G hydra-queue-runner: Implement timeouts
  • [*] RQUAATWB Add status dump facility
  • [*] RYTQLATY Keep track of failed paths in the Hydra database
  • [*] 2GK5DOU7 * Downloading closures.

Change contents

  • replacement in src/hydra-queue-runner/build-remote.cc at line 120
    [5.2966][5.2966:3044]()
    Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
    [5.2966]
    [5.3044]
    result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
    AutoDelete autoDelete(result.logFile, false);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 123
    [5.3045][5.3045:3077]()
    createDirs(dirOf(logFile));
    [5.3045]
    [5.3077]
    createDirs(dirOf(result.logFile));
  • replacement in src/hydra-queue-runner/build-remote.cc at line 125
    [5.3078][5.3078:3246]()
    AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
    if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile);
    [5.3078]
    [5.3246]
    AutoCloseFD logFD(open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
    if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % result.logFile);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 150
    [5.647][5.647:749]()
    throw Error(format("cannot connect to ‘%1%’: %2%") % sshName % chomp(readFile(logFile)));
    [5.647]
    [2.0]
    string s = chomp(readFile(result.logFile));
    throw Error(format("cannot connect to ‘%1%’: %2%") % sshName % s);
  • edit in src/hydra-queue-runner/build-remote.cc at line 168
    [5.4065]
    [5.4065]
    autoDelete.cancel();
  • edit in src/hydra-queue-runner/build-remote.hh at line 16
    [5.5549]
    [5.5549]
    nix::Path logFile;
  • replacement in src/hydra-queue-runner/build-result.cc at line 61
    [5.2222][5.2222:2316]()
    /* Ensure that the path exists and points into the
    Nix store. */
    [5.2222]
    [5.2316]
    /* Ensure that the path exists and points into the Nix
    store. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 5
    [5.4894]
    [5.250]
    #include <queue>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 13
    [5.4915]
    [5.4915]
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 236
    [5.1123][5.6419:6420](),[5.6419][5.6419:6420](),[5.6420][5.124:172]()
    std::condition_variable_any runnableWakeup;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 257
    [3.254]
    [5.7253]
    /* Log compressor work queue. */
    Sync<std::queue<Path>> logCompressorQueue;
    std::condition_variable_any logCompressorWakeup;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 267
    [5.6457][5.7255:7280]()
    void loadMachines();
    [5.6457]
    [5.7280]
    void clearBusy(time_t stopTime);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 269
    [5.7281][5.0:37]()
    void clearBusy(time_t stopTime);
    [5.7281]
    [5.6539]
    private:
    void loadMachines();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 314
    [18.66]
    [18.66]
    /* Thread that asynchronously bzips logs of finished steps. */
    void logCompressor();
    public:
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 968
    [5.11904][5.11904:11939]()
    dispatcherWakeup.notify_all();
    [5.11904]
    [5.12574]
    dispatcherWakeup.notify_one();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1080
    [5.15935]
    [19.985]
    /* Do the build. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1094
    [19.1529]
    [19.1529]
    /* Asynchronously compress the log. */
    if (result.logFile != "") {
    {
    auto logCompressorQueue_(logCompressorQueue.lock());
    logCompressorQueue_->push(result.logFile);
    }
    logCompressorWakeup.notify_one();
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1346
    [5.3178]
    [5.3178]
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1348
    [5.3179]
    [5.3179]
    void State::logCompressor()
    {
    while (true) {
    try {
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1353
    [5.3180]
    [18.256]
    Path logPath;
    {
    auto logCompressorQueue_(logCompressorQueue.lock());
    while (logCompressorQueue_->empty())
    logCompressorQueue_.wait(logCompressorWakeup);
    logPath = logCompressorQueue_->front();
    logCompressorQueue_->pop();
    }
    if (!pathExists(logPath)) continue;
    printMsg(lvlChatty, format("compressing log file ‘%1%’") % logPath);
    Path tmpPath = logPath + ".bz2.tmp";
    AutoCloseFD fd = open(tmpPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644);
    // FIXME: use libbz2
    Pid pid = startProcess([&]() {
    if (dup2(fd, STDOUT_FILENO) == -1)
    throw SysError("cannot dup output pipe to stdout");
    execlp("bzip2", "bzip2", "-c", logPath.c_str(), nullptr);
    throw SysError("cannot start ssh");
    });
    int res = pid.wait(true);
    if (res != 0)
    throw Error(format("bzip2 returned exit code %1% while compressing ‘%2%’")
    % res % logPath);
    if (rename(tmpPath.c_str(), (logPath + ".bz2").c_str()) != 0)
    throw SysError(format("renaming ‘%1%’") % tmpPath);
    if (unlink(logPath.c_str()) != 0)
    throw SysError(format("unlinking ‘%1%’") % logPath);
    } catch (std::exception & e) {
    printMsg(lvlError, format("log compressor: %1%") % e.what());
    sleep(5);
    }
    }
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1446
    [4.399]
    [4.399]
    /* Run a log compressor thread. If needed, we could start more
    than one. */
    std::thread(&State::logCompressor, this).detach();
  • replacement in src/lib/Hydra/Helper/Nix.pm at line 137
    [5.14961][5.14961:15068]()
    for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
    [5.14961]
    [5.15068]
    for ($fn2 . $bucketed, $fn2 . $bucketed . ".bz2", $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {