Concurrent hydra-evaluator

[?]
Oct 13, 2016, 1:53 PM
4YCF3KBGI4VYKHJXAREJLCJLY3UWB2FX447CJ4XQWFRKRFKG5WCQC

Dependencies

  • [2] Z4Y3TVEE hydra-queue-{runner,evaluator}: don't clutter the system log with debug messages
  • [3] 3CVY4QJX hydra-evaluator: Do the actual work in a subprocess
  • [4] N66HHKLQ Drop the 5 minute minimum interval between triggered evals
  • [5] ACBS7C6Q hydra-queue-runner: Detect changes to the scheduling shares
  • [6] Q3VTDUSQ Fix the API test
  • [7] 4VYY2ADP Allow the machines file to specify host public keys
  • [8] 3X6FN7RG Print jobset name when evaluator fails
  • [9] PKTKSVTL Bump Nix
  • [10] FYXZQ55G Drop obsolete -laws-cpp-sdk-s3
  • [11] WJTP6VZI Fix building against the latest Nix
  • [12] UXRNODRJ Remove hydra-build and the old hydra-queue-runner
  • [13] HJOEIMLR Refactor
  • [14] 3YSJ3LYK Remove finally.hh
  • [15] IE2PRAQU hydra-queue-runner: Send build notifications
  • [16] 7LB6QBXY Keep track of the number of build steps that are being built
  • [17] J7WIAUWT Better error message
  • [18] XLYHZUHT Cache .narinfo lookups
  • [19] B2L4T3X6 Sync with Nix
  • [20] FS3HUMVU Fix Makefile.am
  • [21] 34UH6B6L Build against the bleeding edge of Nix
  • [22] X27GNHDV * Basic job info in the database.
  • [23] RXVJFQ5A Evaluator cleanups
  • [24] QIW2WZKW Fix indentation
  • [25] DIEY5USN Keep better bytesReceived/bytesSent stats
  • [26] OB7GB5DL Test environment cleanups
  • [27] C3AG65SW Add one-shot jobsets
  • [28] GH4S4AWM Rename file
  • [29] KX5L74EY add nix-prefetch- scripts for now, were externals in svn
  • [30] L4AI5YL6 Rename hydra_*.pl to hydra-*
  • [31] TULPZ62Y * Perform builds in parallel.
  • [32] GAZHOBWE Better fix
  • [33] OVR2RWBI hydra-evaluator: Always pick the jobset that hasn't been evaluated longest
  • [34] 3YHNO5H2 Don't use Perl's -w flag
  • [35] YZAI5GQU Implement a database connection pool
  • [36] 5AIYUMTB Basic remote building
  • [37] N2NKSKHS Refactor local binary cache code into a subclass
  • [38] QMW24O5S Add support for Guile & Guix.
  • [39] RBZRCTAL hydra: fixed and re-enabled evaluation tests
  • [40] PCKLFRT5 Support push notification of repository changes
  • [41] SBKX3YH2 cleanup
  • [42] QLOLZHRX Allow a per-jobset check interval
  • [43] 6L3ZM55S Add font for the captcha
  • [44] LZVO64YG Merge in the first bits of the API work
  • [45] YI3BZG5N Cleanup Jobset serialization
  • [46] 7VQ4ALFY Update "make check" for the new queue runner
  • [47] 73YR46NJ hydra-queue-runner: Write directly to a binary cache
  • [48] GTUZLZRH Add an S3-backed binary cache store
  • [49] RBQEBVT5 Doh
  • [50] MS676RZW Remove s3binarystore (moved to nix in d155d80)
  • [51] 5QYMALGU Fix Perl error in tests.api
  • [52] D5QIOJGP * Move everything up one directory.
  • [53] Q6SOGMDQ Hydra/28: Rename "scheduler" to "evaluator"
  • [54] T4LLYESZ * Nix expression for building Hydra.
  • [55] ENXUSMSV Make concurrency more robust
  • [56] 5GRJZZOR
  • [57] D6YQQQCN * Don't ignore SIGCHLD after all, Perl doesn't like it. Just do
  • [58] WQ2VQ7H3 Use hydra-module.nix in the tests
  • [59] NI5BVF2V * In job inputs of type "build", allow the project and jobset names of
  • [60] 24BMQDZA Start of single-process hydra-queue-runner
  • [61] MQYHIUEE Cleanup eval serialization
  • [62] UGA45FNC Add a plugin for backing up builds in s3
  • [63] FV2M6MOT hydra: use autoconf/-make
  • [64] SQQSV4NB * Top-level: don't exit on errors.
  • [65] AMFMXR52 Provide a command ‘hydra-init’ to initialise/upgrade the database
  • [66] JK2QWPH6
  • [67] 4VYSDP4I Add eager fetching of relations and enable that for jobset->jobsetinputs
  • [68] KYSBJAYN Allow dashes in jobset input names
  • [69] 53PW36WS hydra-evaluator: When evaluating a single jobset, exit with a failure code if evaluation fails
  • [70] KS7NNPQW Bleh Automake
  • [71] 6K5PBUUN Use buildEnv to combine Hydra's Perl dependencies
  • [72] V6H6BWMK Sync with Nix
  • [73] 6MGFQDR2 deleted some old scripts
  • [74] XHOZT4WT Add a command `hydra-create-user' for managing user accounts
  • [75] UN2KZL3A Rename c -> hydra-eval-jobs
  • [76] N22GPKYT * Put info about logs / build products in the DB.
  • [77] QAJK5MCE Remove obsolete hydra-control script
  • [78] MB3TISH2 Rate-limit the number of threads copying closures at the same time
  • [79] Y6H7Y3OT Capture the path to `guile', when available.
  • [80] ARD6Z67T Do incremental SVN checkouts
  • [81] MHVIT4JY Split hydra-queue-runner.cc more
  • [82] ERNOO5ZZ * Reorganising.
  • [83] N4IROACV Move buildRemote() into State
  • [84] 3PNG7NIB Remove trailing whitespace
  • [85] DODOGD7M Send queue runner stats to statsd
  • [86] UQYHPQ6U Run PostgreSQL during "make check"
  • [87] 7VHPMFAG Use /usr/bin/env to find perl
  • [88] NP7IOJ4Q Flesh out the API a bit
  • [89] M2CFFNJY Remove unused file
  • [90] TIOBBINA * Some renaming.
  • [91] MDAIP23T hydra-evaluator: Respect triggers of disabled jobsets
  • [92] J74UTA3I Handle the case where a jobset has never been evaluated
  • [*] YQWH4POV * Simplify.
  • [*] CLXEECMF * Start putting build results in a database.
  • [*] G2ZB6464 first test, not yet in buildprocess
  • [*] HX4QYOYA add first evaluations tests

Change contents

  • replacement in configure.ac at line 14
    [11.1][7.0:24]()
    CXXFLAGS+=" -std=c++11"
    [11.1]
    [11.16096]
    CXXFLAGS+=" -std=c++17"
  • edit in configure.ac at line 73
    [11.16466]
    [11.38]
    src/hydra-evaluator/Makefile
  • replacement in release.nix at line 61
    [11.565][9.0:135]()
    rev = "edf9eb8181e01f6b2123e5690019cfeeb44fc1c2";
    sha256 = "1a00q9pypfziyi9hxl4rsammhwj7991wm4b1z9zcgl7zqksr3582";
    [11.565]
    [11.700]
    rev = "5e61b422c58baac26b232233d39f5814cc35d52a";
    sha256 = "0awic5zwibgpj5shpgjf2364imp2f84c8xi5r0x4p351q4kpg9z4";
  • edit in release.nix at line 127
    [11.926]
    [94.1760]
    stdenv = overrideCC stdenv gcc6;
  • replacement in src/Makefile.am at line 1
    [11.21][11.112:185]()
    SUBDIRS = hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf
    [11.21]
    [11.62]
    SUBDIRS = hydra-evaluator hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf
  • file addition: hydra-evaluator (d--r------)
    [95.4]
  • file addition: Makefile.am (----------)
    [0.325]
    bin_PROGRAMS = hydra-evaluator
    hydra_evaluator_SOURCES = hydra-evaluator.cc
    hydra_evaluator_LDADD = $(NIX_LIBS) -lpqxx
    hydra_evaluator_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra
  • file addition: hydra-evaluator.cc (----------)
    [0.325]
    #include "shared.hh"
    #include "db.hh"
    #include "pool.hh"
    #include <algorithm>
    #include <thread>
    #include <cstring>
    #include <sys/types.h>
    #include <sys/wait.h>
    using namespace nix;
    struct Evaluator
    {
    nix::Pool<Connection> dbPool;
    typedef std::pair<std::string, std::string> JobsetName;
    struct Jobset
    {
    JobsetName name;
    time_t lastCheckedTime, triggerTime;
    int checkInterval;
    Pid pid;
    };
    typedef std::map<JobsetName, Jobset> Jobsets;
    int evalTimeout = 3600;
    size_t maxEvals = 4;
    struct State
    {
    size_t runningEvals = 0;
    Jobsets jobsets;
    };
    Sync<State> state_;
    std::condition_variable childStarted;
    std::condition_variable maybeDoWork;
    const time_t notTriggered = std::numeric_limits<time_t>::max();
    void readJobsets()
    {
    auto conn(dbPool.get());
    pqxx::work txn(*conn);
    auto res = txn.parameterized
    ("select project, j.name, lastCheckedTime, triggerTime, checkInterval from Jobsets j join Projects p on j.project = p.name "
    "where j.enabled != 0 and p.enabled != 0").exec();
    auto state(state_.lock());
    std::set<JobsetName> seen;
    for (auto const & row : res) {
    auto name = JobsetName{row["project"].as<std::string>(), row["name"].as<std::string>()};
    auto res = state->jobsets.try_emplace(name, Jobset{name});
    auto & jobset = res.first->second;
    jobset.lastCheckedTime = row["lastCheckedTime"].as<time_t>(0);
    jobset.triggerTime = row["triggerTime"].as<time_t>(notTriggered);
    jobset.checkInterval = row["checkInterval"].as<time_t>();
    seen.insert(name);
    }
    for (auto i = state->jobsets.begin(); i != state->jobsets.end(); )
    if (seen.count(i->first))
    ++i;
    else {
    printInfo("forgetting jobset ‘%s:%s’", i->first.first, i->first.second);
    i = state->jobsets.erase(i);
    }
    }
    void startEval(State & state, Jobset & jobset)
    {
    printInfo("starting evaluation of jobset ‘%s:%s’", jobset.name.first, jobset.name.second);
    assert(jobset.pid == -1);
    jobset.pid = startProcess([&]() {
    Strings args = { "timeout", "-s", "KILL", std::to_string(evalTimeout), "hydra-eval-jobset", jobset.name.first, jobset.name.second };
    execvp(args.front().c_str(), stringsToCharPtrs(args).data());
    throw SysError(format("executing ‘%1%’") % args.front());
    });
    state.runningEvals++;
    childStarted.notify_one();
    time_t now = time(0);
    {
    auto conn(dbPool.get());
    pqxx::work txn(*conn);
    txn.parameterized
    ("update Jobsets set lastCheckedTime = $1, triggerTime = null where project = $2 and name = $3")
    (now)
    (jobset.name.first)
    (jobset.name.second)
    .exec();
    txn.commit();
    jobset.lastCheckedTime = now;
    jobset.triggerTime = notTriggered;
    }
    }
    void startEvals(State & state)
    {
    std::vector<Jobsets::iterator> sorted;
    time_t now = time(0);
    /* Filter out jobsets that have been evaluated recently and have
    not been triggered. */
    for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i)
    if (i->second.pid == -1 &&
    (i->second.triggerTime != std::numeric_limits<time_t>::max() ||
    (i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now)))
    sorted.push_back(i);
    /* Put jobsets in order of ascending trigger time, last checked
    time, and name. */
    std::sort(sorted.begin(), sorted.end(),
    [](const Jobsets::iterator & a, const Jobsets::iterator & b) {
    return
    a->second.triggerTime != b->second.triggerTime
    ? a->second.triggerTime < b->second.triggerTime
    : a->second.lastCheckedTime != b->second.lastCheckedTime
    ? a->second.lastCheckedTime < b->second.lastCheckedTime
    : a->first < b->first;
    });
    /* Start jobset evaluations up to the concurrency limit.*/
    for (auto & i : sorted) {
    if (state.runningEvals >= maxEvals) break;
    startEval(state, i->second);
    }
    }
    void loop()
    {
    auto state(state_.lock());
    while (true) {
    time_t now = time(0);
    std::chrono::seconds sleepTime = std::chrono::seconds::max();
    if (state->runningEvals < maxEvals) {
    for (auto & i : state->jobsets)
    if (i.second.pid == -1 &&
    i.second.checkInterval > 0)
    sleepTime = std::min(sleepTime, std::chrono::seconds(
    std::max((time_t) 1, i.second.lastCheckedTime - now + i.second.checkInterval)));
    }
    debug("waiting for %d s", sleepTime.count());
    if (sleepTime == std::chrono::seconds::max())
    state.wait(maybeDoWork);
    else
    state.wait_for(maybeDoWork, sleepTime);
    startEvals(*state);
    }
    }
    /* A thread that listens to PostgreSQL notifications about jobset
    changes, updates the jobsets map, and signals the main thread
    to start evaluations. */
    void databaseMonitor()
    {
    while (true) {
    try {
    auto conn(dbPool.get());
    receiver jobsetsAdded(*conn, "jobsets_added");
    receiver jobsetsDeleted(*conn, "jobsets_deleted");
    receiver jobsetsChanged(*conn, "jobset_scheduling_changed");
    while (true) {
    /* Note: we read/notify before
    await_notification() to ensure we don't miss a
    state change. */
    readJobsets();
    maybeDoWork.notify_one();
    conn->await_notification();
    printInfo("received jobset event");
    }
    } catch (std::exception & e) {
    printError("exception in database monitor thread: %s", e.what());
    sleep(30);
    }
    }
    }
    /* A thread that reaps child processes.*/
    void reaper()
    {
    while (true) {
    {
    auto state(state_.lock());
    while (!state->runningEvals)
    state.wait(childStarted);
    }
    int status;
    pid_t pid = waitpid(-1, &status, 0);
    if (pid == -1) {
    if (errno == EINTR) continue;
    throw SysError("waiting for children");
    }
    {
    auto state(state_.lock());
    assert(state->runningEvals);
    state->runningEvals--;
    for (auto & jobset : state->jobsets)
    if (jobset.second.pid == pid) {
    printInfo("evaluation of jobset ‘%s:%s’ finished with status %d",
    jobset.first.first, jobset.first.second, status);
    jobset.second.pid.release();
    maybeDoWork.notify_one();
    break;
    }
    }
    }
    }
    void run()
    {
    std::thread reaperThread([&]() { reaper(); });
    std::thread monitorThread([&]() { databaseMonitor(); });
    while (true) {
    try {
    loop();
    } catch (std::exception & e) {
    printError("exception in main loop: %s", e.what());
    sleep(30);
    }
    }
    }
    };
    int main(int argc, char * * argv)
    {
    return handleExceptions(argv[0], [&]() {
    initNix();
    signal(SIGINT, SIG_DFL);
    signal(SIGTERM, SIG_DFL);
    signal(SIGHUP, SIG_DFL);
    parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
    return false;
    });
    Evaluator().run();
    });
    }
  • replacement in src/hydra-queue-runner/Makefile.am at line 7
    [11.368][11.368:369](),[11.369][10.0:34]()
    AM_CXXFLAGS = $(NIX_CFLAGS) -Wall
    [11.368]
    hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra
  • file addition: libhydra (d--r------)
    [95.4]
  • file move: db.hh (----------)db.hh (----------)
    [0.9041]
    [11.93]
  • replacement in src/script/Makefile.am at line 7
    [11.442][11.442:465]()
    hydra-evaluator \
    [11.422]
    [11.491]
    hydra-eval-jobset \
  • file move: hydra-evaluator (---r------)hydra-eval-jobset (---r------)
    [11.2543]
    [11.1319]
  • replacement in src/script/hydra-eval-jobset at line 348
    [11.477][11.477:499](),[11.499][11.2600:2660](),[11.2660][11.499:553](),[11.499][11.499:553](),[11.553][4.0:46](),[4.46][11.2857:2931](),[11.2857][11.2857:2931]()
    sub checkSomeJobset {
    # If any jobset has been triggered by a push, check it.
    my ($jobset) = $db->resultset('Jobsets')->search(
    { 'triggertime' => { '!=', undef } },
    { join => 'project', order_by => [ 'triggertime' ], rows => 1 });
    [11.9901]
    [11.2931]
    die "syntax: $0 <PROJECT> <JOBSET>\n" unless @ARGV == 2;
  • replacement in src/script/hydra-eval-jobset at line 350
    [11.2932][11.2932:2999](),[11.2999][11.1015:1111](),[11.1111][11.3086:3137](),[11.3086][11.3086:3137](),[11.3137][11.1609:1674](),[11.1674][11.1112:1270](),[11.3190][11.1112:1270](),[11.1270][11.3191:3312](),[11.308][11.3191:3312](),[11.398][11.744:783](),[11.3312][11.744:783](),[11.744][11.744:783](),[11.783][3.0:67](),[3.67][11.0:4](),[11.824][11.0:4](),[11.10180][11.0:4](),[11.56][11.56:144](),[11.144][8.0:160](),[11.124][11.64:95](),[8.160][11.64:95](),[11.228][11.64:95](),[11.95][11.10180:10184](),[11.284][11.10180:10184](),[11.10180][11.10180:10184](),[11.10184][11.968:980](),[11.980][11.0:11](),[11.11][11.852:1029](),[11.1029][2.0:44](),[2.44][11.1071:1103](),[11.1071][11.1071:1103](),[11.1103][11.30:37](),[11.1461][11.30:37](),[11.30][11.30:37](),[11.37][11.3067:3102](),[11.869][11.1036:1038](),[11.1036][11.1036:1038]()
    # Otherwise, check the jobset that hasn't been checked for the
    # longest time (but don't check more often than the jobset's
    # minimal check interval).
    ($jobset) = $db->resultset('Jobsets')->search(
    { 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 },
    , 'checkinterval' => { '!=', 0 }
    , -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] },
    { join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 })
    unless defined $jobset;
    return 0 unless defined $jobset;
    return system($0, $jobset->project->name, $jobset->name) == 0;
    }
    if (scalar @ARGV == 2) {
    my $projectName = $ARGV[0];
    my $jobsetName = $ARGV[1];
    my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
    die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
    exit checkJobset($jobset);
    }
    while (1) {
    eval {
    if (checkSomeJobset) {
    # Just so we don't go completely crazy if lastcheckedtime
    # isn't updated properly.
    sleep 1;
    } else {
    # print STDERR "sleeping...\n";
    sleep 30;
    }
    };
    if ($@) { print STDERR "$@"; }
    }
    [11.2932]
    my $projectName = $ARGV[0];
    my $jobsetName = $ARGV[1];
    my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
    die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
    exit checkJobset($jobset);
  • edit in src/sql/hydra.sql at line 85
    [5.1688]
    [5.1688]
    create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql;
    create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded();
  • edit in src/sql/hydra.sql at line 89
    [5.1689]
    [5.1689]
    create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
    create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
    create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
    create trigger JobsetSchedulingChanged after update on Jobsets for each row
    when (((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
    or old.checkInterval != new.checkInterval
    or old.enabled != new.enabled)
    execute procedure notifyJobsetSchedulingChanged();
  • file addition: upgrade-50.sql (----------)
    [11.3004]
    create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql;
    create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded();
    create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
    create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
    create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
    create trigger JobsetSchedulingChanged after update on Jobsets for each row
    when ((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
    or old.checkInterval != new.checkInterval
    or old.enabled != new.enabled
    execute procedure notifyJobsetSchedulingChanged();
  • replacement in tests/Makefile.am at line 16
    [11.58][11.106:230]()
    PATH=$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \
    [11.58]
    [11.37753]
    PATH=$(abs_top_srcdir)/src/hydra-evaluator:$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \
  • replacement in tests/Setup.pm at line 64
    [11.1336][11.9344:9463]()
    my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-evaluator", $jobset->project->name, $jobset->name));
    [11.1336]
    [11.1469]
    my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-eval-jobset", $jobset->project->name, $jobset->name));
  • replacement in tests/api-test.pl at line 52
    [11.245][6.175:217]()
    system("hydra-evaluator sample default");
    [11.245]
    [11.38737]
    system("hydra-eval-jobset sample default");
  • replacement in tests/api-test.pl at line 59
    [11.39017][6.218:293]()
    system("echo >> /run/jobset/default.nix; hydra-evaluator sample default");
    [11.39017]
    [11.39299]
    system("echo >> /run/jobset/default.nix; hydra-eval-jobset sample default");
  • replacement in tests/s3-backup-test.pl at line 22
    [11.10569][11.10569:10644]()
    system("hydra-evaluator " . $jobset->project->name . " " . $jobset->name);
    [11.10569]
    [11.10644]
    system("hydra-eval-jobset " . $jobset->project->name . " " . $jobset->name);