Split hydra-queue-runner.cc more

[?]
Jul 21, 2015, 1:14 PM
MHVIT4JYWUYD4UCGB2AHLXWLX6B5SYE22BREERNGANT7RGGDUFOAC

Dependencies

  • [2] GYT6YOZN Lock builds for a shorter amount of time
  • [3] NIXRSHRK Fix finishing steps that are not top-level of any build
  • [4] O7MWBXF6 Doh
  • [5] EXBGZYRJ Don't create a propagated build step to the same build
  • [6] XHBBT2HE Doh
  • [7] WDWPHWQ2 Fix incorrect debug message
  • [8] 2AZSF326 getQueuedBuilds(): Don't catch errors while loading a build from the queue
  • [9] RVHBV3B3 Notify the queue runner when a build is deleted
  • [10] NAYQT2GT hydra-queue-runner: Use cmdBuildDerivation
  • [11] NNOCZ4RO hydra-queue-runner: Improve dispatcher
  • [12] RYTQLATY Keep track of failed paths in the Hydra database
  • [13] T2EIYJNG On SIGINT, shut down the builder threads
  • [14] 24BMQDZA Start of single-process hydra-queue-runner
  • [15] ATJ54SPX Use PostgreSQL notifications for queue events
  • [16] 2IQRXLWE Support cancelling builds
  • [17] IE2PRAQU hydra-queue-runner: Send build notifications
  • [18] KBZHIGLG Record the machine used for a build step
  • [19] 22LDPAIP Check non-runnable steps for unsupported system type
  • [20] QJRDO2B4 Simplify retry handling
  • [21] ENXUSMSV Make concurrency more robust
  • [22] TM6WKSP3 hydra-queue-runner: Set isCachedBuild
  • [23] HUUZFPPK Fix race between the queue monitor and the builder threads
  • [24] 4LAUAXO5 Less verbosity
  • [25] WFYMBNWB Move "created" field into Step::State
  • [26] 7VQ4ALFY Update "make check" for the new queue runner
  • [27] HHOMBU7G hydra-queue-runner: Implement timeouts
  • [28] PQFOMNTL hydra-queue-runner: More stats
  • [29] JAUB2FT5 getQueuedBuilds(): Handle dependent builds first
  • [30] A2GL5FOZ Moar stats
  • [31] XV4AEKJC hydra-queue-runner: Handle status queries on the main thread
  • [32] 4D7CHQ34 createStep(): Cache finished derivations
  • [33] E7WP35SF Create build step for non-top-level cached failures
  • [34] GS4BE6TB Asynchronously compress build logs
  • [35] HPJKBFZ4 Handle concurrent finishing of the same build
  • [36] C6HOMHZW Don't try to handle SIGINT
  • [37] 62MQPRXC Pass null values to libpqxx properly
  • [38] OCZ4LSGG Automatically retry aborted builds
  • [39] WDNUKCTN Queue monitor: Get only the fields we need
  • [40] TKA75HWS Robustness
  • [41] HJOEIMLR Refactor
  • [42] MB3TISH2 Rate-limit the number of threads copying closures at the same time
  • [43] UFUAO7ND Improve logging for aborts
  • [44] MSIHMO45 Tweak build steps
  • [45] IWB3F4Z6 Fail builds with previously failed steps early
  • [46] A3IIKGSG hydra-queue-runner: Fix assertion failure
  • [47] WKJFPR77 hydra-queue-runner: Maintain count of active build steps
  • [48] O64P4XJS Keep per-machine stats
  • [49] WHULPA6S Handle failure with output
  • [50] LE4VZIY5 More stats
  • [51] NJJ7H64S Very basic multi-threaded queue runner
  • [52] FKLICOHY Prefer cached failure over unsupported system type
  • [53] FQQRJUO4 Mark builds as busy
  • [54] SODOV2CM Automatically reload $NIX_REMOTE_SYSTEMS when it changes
  • [55] N5O7VEEO Immediately abort builds that require an unsupported system type
  • [56] 5AIYUMTB Basic remote building
  • [57] UPZXMQDE Fix machine selection
  • [58] UQQ4IL55 Add a error type for "unsupported system type"
  • [59] SK6WHODM Support preferLocalBuild
  • [60] 7I7XHQAE Fix sending notifications in the successful case
  • [61] 7LB6QBXY Keep track of the number of build steps that are being built
  • [62] LJILHOJ7 Create BuildSteps race-free
  • [63] HLSHCK3C Support requiredSystemFeatures
  • [64] ODCBSLFG hydra-queue-runner: Fix segfault sorting machines by load
  • [65] YZAI5GQU Implement a database connection pool
  • [66] GKZN4UV7 Make the queue monitor more robust, and better debug output
  • [67] X7AZHNKG Set finishedInDB in a few more places
  • [68] 63W4T5PU hydra-queue-runner: More stats
  • [69] RQUAATWB Add status dump facility
  • [70] N4IROACV Move buildRemote() into State

Change contents

  • replacement in src/hydra-queue-runner/Makefile.am at line 3
    [11.255][11.0:85]()
    hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc \
    [11.255]
    [11.0]
    hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \
    builder.cc build-result.cc build-remote.cc \
  • file addition: builder.cc (----------)
    [11.187]
    #include <cmath>
    #include "state.hh"
    #include "build-result.hh"
    using namespace nix;
    void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation)
    {
    bool retry = true;
    MaintainCount mc(nrActiveSteps);
    try {
    auto store = openStore(); // FIXME: pool
    retry = doBuildStep(store, step, machine);
    } catch (std::exception & e) {
    printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
    % step->drvPath % machine->sshName % e.what());
    }
    /* Release the machine and wake up the dispatcher. */
    assert(reservation.unique());
    reservation = 0;
    wakeDispatcher();
    /* If there was a temporary failure, retry the step after an
    exponentially increasing interval. */
    if (retry) {
    {
    auto step_(step->state.lock());
    step_->tries++;
    nrRetries++;
    if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
    int delta = retryInterval * powf(retryBackoff, step_->tries - 1);
    printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);
    step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
    }
    makeRunnable(step);
    }
    }
    bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
    Machine::ptr machine)
    {
    {
    auto step_(step->state.lock());
    assert(step_->created);
    assert(!step->finished);
    }
    /* There can be any number of builds in the database that depend
    on this derivation. Arbitrarily pick one (though preferring a
    build of which this is the top-level derivation) for the
    purpose of creating build steps. We could create a build step
    record for every build, but that could be very expensive
    (e.g. a stdenv derivation can be a dependency of tens of
    thousands of builds), so we don't. */
    Build::ptr build;
    {
    std::set<Build::ptr> dependents;
    std::set<Step::ptr> steps;
    getDependents(step, dependents, steps);
    if (dependents.empty()) {
    /* Apparently all builds that depend on this derivation
    are gone (e.g. cancelled). So don't bother. This is
    very unlikely to happen, because normally Steps are
    only kept alive by being reachable from a
    Build. However, it's possible that a new Build just
    created a reference to this step. So to handle that
    possibility, we retry this step (putting it back in
    the runnable queue). If there are really no strong
    pointers to the step, it will be deleted. */
    printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);
    return true;
    }
    for (auto build2 : dependents)
    if (build2->drvPath == step->drvPath) { build = build2; break; }
    if (!build) build = *dependents.begin();
    printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")
    % step->drvPath % machine->sshName % build->id % (dependents.size() - 1));
    }
    bool quit = build->id == buildOne;
    auto conn(dbPool.get());
    RemoteResult result;
    BuildOutput res;
    int stepNr = 0;
    time_t stepStartTime = result.startTime = time(0);
    /* If any of the outputs have previously failed, then don't bother
    building again. */
    bool cachedFailure = checkCachedFailure(step, *conn);
    if (cachedFailure)
    result.status = BuildResult::CachedFailure;
    else {
    /* Create a build step record indicating that we started
    building. Also, mark the selected build as busy. */
    {
    pqxx::work txn(*conn);
    stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
    txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();
    txn.commit();
    }
    /* Do the build. */
    try {
    /* FIXME: referring builds may have conflicting timeouts. */
    buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
    } catch (Error & e) {
    result.status = BuildResult::MiscFailure;
    result.errorMsg = e.msg();
    }
    if (result.success()) res = getBuildOutput(store, step->drv);
    }
    time_t stepStopTime = time(0);
    if (!result.stopTime) result.stopTime = stepStopTime;
    /* Asynchronously compress the log. */
    if (result.logFile != "") {
    {
    auto logCompressorQueue_(logCompressorQueue.lock());
    logCompressorQueue_->push(result.logFile);
    }
    logCompressorWakeup.notify_one();
    }
    /* The step had a hopefully temporary failure (e.g. network
    issue). Retry a number of times. */
    if (result.canRetry()) {
    printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")
    % step->drvPath % machine->sshName % result.errorMsg);
    bool retry;
    {
    auto step_(step->state.lock());
    retry = step_->tries + 1 < maxTries;
    }
    if (retry) {
    pqxx::work txn(*conn);
    finishBuildStep(txn, result.startTime, result.stopTime, build->id,
    stepNr, machine->sshName, bssAborted, result.errorMsg);
    txn.commit();
    if (quit) exit(1);
    return true;
    }
    }
    if (result.success()) {
    /* Register success in the database for all Build objects that
    have this step as the top-level step. Since the queue
    monitor thread may be creating new referring Builds
    concurrently, and updating the database may fail, we do
    this in a loop, marking all known builds, repeating until
    there are no unmarked builds.
    */
    std::vector<BuildID> buildIDs;
    while (true) {
    /* Get the builds that have this one as the top-level. */
    std::vector<Build::ptr> direct;
    {
    auto steps_(steps.lock());
    auto step_(step->state.lock());
    for (auto & b_ : step_->builds) {
    auto b = b_.lock();
    if (b && !b->finishedInDB) direct.push_back(b);
    }
    /* If there are no builds left to update in the DB,
    then we're done (except for calling
    finishBuildStep()). Delete the step from
    ‘steps’. Since we've been holding the ‘steps’ lock,
    no new referrers can have been added in the
    meantime or be added afterwards. */
    if (direct.empty()) {
    printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);
    steps_->erase(step->drvPath);
    }
    }
    /* Update the database. */
    {
    pqxx::work txn(*conn);
    finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);
    for (auto & b : direct)
    markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,
    result.startTime, result.stopTime);
    txn.commit();
    }
    if (direct.empty()) break;
    /* Remove the direct dependencies from ‘builds’. This will
    cause them to be destroyed. */
    for (auto & b : direct) {
    auto builds_(builds.lock());
    b->finishedInDB = true;
    builds_->erase(b->id);
    buildIDs.push_back(b->id);
    }
    }
    /* Send notification about the builds that have this step as
    the top-level. */
    for (auto id : buildIDs) {
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));
    }
    notificationSenderWakeup.notify_one();
    }
    /* Wake up any dependent steps that have no other
    dependencies. */
    {
    auto step_(step->state.lock());
    for (auto & rdepWeak : step_->rdeps) {
    auto rdep = rdepWeak.lock();
    if (!rdep) continue;
    bool runnable = false;
    {
    auto rdep_(rdep->state.lock());
    rdep_->deps.erase(step);
    /* Note: if the step has not finished
    initialisation yet, it will be made runnable in
    createStep(), if appropriate. */
    if (rdep_->deps.empty() && rdep_->created) runnable = true;
    }
    if (runnable) makeRunnable(rdep);
    }
    }
    } else {
    /* Register failure in the database for all Build objects that
    directly or indirectly depend on this step. */
    std::vector<BuildID> dependentIDs;
    while (true) {
    /* Get the builds and steps that depend on this step. */
    std::set<Build::ptr> indirect;
    {
    auto steps_(steps.lock());
    std::set<Step::ptr> steps;
    getDependents(step, indirect, steps);
    /* If there are no builds left, delete all referring
    steps from ‘steps’. As for the success case, we can
    be certain no new referrers can be added. */
    if (indirect.empty()) {
    for (auto & s : steps) {
    printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);
    steps_->erase(s->drvPath);
    }
    break;
    }
    }
    /* Update the database. */
    {
    pqxx::work txn(*conn);
    BuildStatus buildStatus =
    result.status == BuildResult::TimedOut ? bsTimedOut :
    result.canRetry() ? bsAborted :
    bsFailed;
    BuildStepStatus buildStepStatus =
    result.status == BuildResult::TimedOut ? bssTimedOut :
    result.canRetry() ? bssAborted :
    bssFailed;
    /* For standard failures, we don't care about the error
    message. */
    if (result.status == BuildResult::PermanentFailure ||
    result.status == BuildResult::TransientFailure ||
    result.status == BuildResult::CachedFailure ||
    result.status == BuildResult::TimedOut)
    result.errorMsg = "";
    /* Create failed build steps for every build that depends
    on this. For cached failures, only create a step for
    builds that don't have this step as top-level
    (otherwise the user won't be able to see what caused
    the build to fail). */
    for (auto & build2 : indirect) {
    if ((cachedFailure && build2->drvPath == step->drvPath) ||
    (!cachedFailure && build == build2) ||
    build2->finishedInDB)
    continue;
    createBuildStep(txn, 0, build2, step, machine->sshName,
    buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);
    }
    if (!cachedFailure)
    finishBuildStep(txn, result.startTime, result.stopTime, build->id,
    stepNr, machine->sshName, buildStepStatus, result.errorMsg);
    /* Mark all builds that depend on this derivation as failed. */
    for (auto & build2 : indirect) {
    if (build2->finishedInDB) continue;
    printMsg(lvlError, format("marking build %1% as failed") % build2->id);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
    (build2->id)
    ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
    (result.startTime)
    (result.stopTime)
    (cachedFailure ? 1 : 0).exec();
    nrBuildsDone++;
    }
    /* Remember failed paths in the database so that they
    won't be built again. */
    if (!cachedFailure && result.status == BuildResult::PermanentFailure)
    for (auto & path : outputPaths(step->drv))
    txn.parameterized("insert into FailedPaths values ($1)")(path).exec();
    txn.commit();
    }
    /* Remove the indirect dependencies from ‘builds’. This
    will cause them to be destroyed. */
    for (auto & b : indirect) {
    auto builds_(builds.lock());
    b->finishedInDB = true;
    builds_->erase(b->id);
    dependentIDs.push_back(b->id);
    if (buildOne == b->id) quit = true;
    }
    }
    /* Send notification about this build and its dependents. */
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));
    }
    notificationSenderWakeup.notify_one();
    }
    // FIXME: keep stats about aborted steps?
    nrStepsDone++;
    totalStepTime += stepStopTime - stepStartTime;
    totalStepBuildTime += result.stopTime - result.startTime;
    machine->state->nrStepsDone++;
    machine->state->totalStepTime += stepStopTime - stepStartTime;
    machine->state->totalStepBuildTime += result.stopTime - result.startTime;
    if (quit) exit(0); // testing hack
    return false;
    }
  • file addition: dispatcher.cc (----------)
    [11.187]
    #include <algorithm>
    #include <thread>
    #include "state.hh"
    using namespace nix;
    void State::makeRunnable(Step::ptr step)
    {
    printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);
    {
    auto step_(step->state.lock());
    assert(step_->created);
    assert(!step->finished);
    assert(step_->deps.empty());
    }
    {
    auto runnable_(runnable.lock());
    runnable_->push_back(step);
    }
    wakeDispatcher();
    }
    void State::dispatcher()
    {
    while (true) {
    printMsg(lvlDebug, "dispatcher woken up");
    auto sleepUntil = system_time::max();
    bool keepGoing;
    do {
    /* Copy the currentJobs field of each machine. This is
    necessary to ensure that the sort comparator below is
    an ordering. std::sort() can segfault if it isn't. */
    struct MachineInfo
    {
    Machine::ptr machine;
    unsigned int currentJobs;
    };
    std::vector<MachineInfo> machinesSorted;
    {
    auto machines_(machines.lock());
    for (auto & m : *machines_)
    machinesSorted.push_back({m.second, m.second->state->currentJobs});
    }
    /* Sort the machines by a combination of speed factor and
    available slots. Prioritise the available machines as
    follows:
    - First by load divided by speed factor, rounded to the
    nearest integer. This causes fast machines to be
    preferred over slow machines with similar loads.
    - Then by speed factor.
    - Finally by load. */
    sort(machinesSorted.begin(), machinesSorted.end(),
    [](const MachineInfo & a, const MachineInfo & b) -> bool
    {
    float ta = roundf(a.currentJobs / a.machine->speedFactor);
    float tb = roundf(b.currentJobs / b.machine->speedFactor);
    return
    ta != tb ? ta < tb :
    a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
    a.currentJobs > b.currentJobs;
    });
    /* Find a machine with a free slot and find a step to run
    on it. Once we find such a pair, we restart the outer
    loop because the machine sorting will have changed. */
    keepGoing = false;
    system_time now = std::chrono::system_clock::now();
    for (auto & mi : machinesSorted) {
    // FIXME: can we lose a wakeup if a builder exits concurrently?
    if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
    auto runnable_(runnable.lock());
    //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
    /* FIXME: we're holding the runnable lock too long
    here. This could be more efficient. */
    for (auto i = runnable_->begin(); i != runnable_->end(); ) {
    auto step = i->lock();
    /* Delete dead steps. */
    if (!step) {
    i = runnable_->erase(i);
    continue;
    }
    /* Can this machine do this step? */
    if (!mi.machine->supportsStep(step)) {
    ++i;
    continue;
    }
    /* Skip previously failed steps that aren't ready
    to be retried. */
    {
    auto step_(step->state.lock());
    if (step_->tries > 0 && step_->after > now) {
    if (step_->after < sleepUntil)
    sleepUntil = step_->after;
    ++i;
    continue;
    }
    }
    /* Make a slot reservation and start a thread to
    do the build. */
    auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
    i = runnable_->erase(i);
    auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
    builderThread.detach(); // FIXME?
    keepGoing = true;
    break;
    }
    if (keepGoing) break;
    }
    } while (keepGoing);
    /* Sleep until we're woken up (either because a runnable build
    is added, or because a build finishes). */
    {
    std::unique_lock<std::mutex> lock(dispatcherMutex);
    printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
    std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
    dispatcherWakeup.wait_until(lock, sleepUntil);
    nrDispatcherWakeups++;
    }
    }
    printMsg(lvlError, "dispatcher exits");
    }
    void State::wakeDispatcher()
    {
    { std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
    dispatcherWakeup.notify_one();
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 3
    [11.286][11.1042:1059](),[11.1077][11.0:21]()
    #include <cmath>
    #include <algorithm>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 8
    [11.4916][11.4916:4943]()
    #include "build-result.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 9
    [11.1072]
    [11.19]
    #include "build-result.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 18
    [11.22][11.826:938](),[11.938][11.1145:1177](),[11.1145][11.1145:1177](),[11.1177][11.939:986](),[11.23][11.1178:1179](),[11.1251][11.1251:1252](),[11.1253][11.23:121](),[11.23][11.23:121](),[11.905][11.905:907](),[11.626][11.1254:1255](),[11.1340][11.1340:1341]()
    // FIXME: Make configurable.
    const unsigned int maxTries = 5;
    const unsigned int retryInterval = 60; // seconds
    const float retryBackoff = 3.0;
    const unsigned int maxParallelCopyClosure = 4;
    template <class C, class V>
    bool has(const C & c, const V & v)
    {
    return c.find(v) != c.end();
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 170
    [11.577][11.9821:9825](),[11.1014][11.9821:9825](),[11.9796][11.9821:9825](),[11.9821][11.9821:9825](),[11.9825][11.199:226](),[11.226][11.238:543](),[11.226][11.1433:1435](),[11.543][11.1433:1435](),[11.1433][11.1433:1435](),[11.1435][11.88:117](),[11.573][11.573:737](),[11.737][9.0:53](),[9.53][11.737:738](),[11.163][11.737:738](),[11.737][11.737:738](),[11.738][11.1435:1480](),[11.1435][11.1435:1480](),[11.1480][11.739:774](),[11.774][11.1502:1503](),[11.1502][11.1502:1503](),[11.1503][11.0:19](),[11.19][11.160:212](),[11.212][11.820:948](),[11.820][11.820:948](),[11.948][11.164:190](),[11.190][11.1560:1561](),[11.480][11.1560:1561](),[11.948][11.1560:1561](),[11.1560][11.1560:1561](),[11.1561][11.949:980](),[11.980][11.544:631](),[11.631][11.1063:1100](),[11.1063][11.1063:1100](),[11.1100][11.632:706](),[11.706][11.1170:1229](),[11.1170][11.1170:1229](),[11.1229][9.54:114](),[9.114][11.707:781](),[11.1266][11.707:781](),[11.781][11.213:255](),[11.1336][11.213:255](),[11.255][11.1739:1749](),[11.395][11.1739:1749](),[11.1336][11.1739:1749](),[11.1739][11.1739:1749](),[11.1749][11.191:192](),[11.255][11.1749:1755](),[11.1749][11.1749:1755](),[11.1803][11.1803:1807](),[11.1807][11.256:364](),[11.364][11.9878:9880](),[11.542][11.9878:9880](),[11.1426][11.9878:9880](),[11.1893][11.9878:9880](),[11.9878][11.9878:9880](),[11.9880][11.782:869](),[11.573][11.1943:1944](),[11.869][11.1943:1944](),[11.1943][11.1943:1944](),[11.1605][11.1605:1741](),[11.1741][11.0:47](),[11.47][11.1798:1805](),[11.1798][11.1798:1805](),[11.1805][11.365:395](),[11.395][11.1835:1836](),[11.605][11.1835:1836](),[11.1835][11.1835:1836](),[11.1836][11.414:591](),[11.157][11.9906:9907](),[11.591][11.9906:9907](),[11.1641][11.9906:9907](),[11.2009][11.9906:9907](),[11.9906][11.9906:9907](),[11.2048][11.2048:2087](),[11.2087][2.0:41](),[2.41][11.2087:2137](),[11.2087][11.2087:2137](),[11.2137][11.999:1053](),[11.1053][11.1642:1694](),[11.2137][11.1642:1694](),[11.1694][11.2137:2450](),[11.2137][11.2137:2450](),[11.2450][11.592:715](),[11.795][11.10056:10057](),[11.2450][11.10056:10057](),[11.10056][11.10056:10057](),[11.10057][11.48:118](),[11.118][11.2491:2507](),[11.2491][11.2491:2507](),[11.2507][11.10235:10236](),[11.10235][11.10235:10236](),[11.10236][11.119:231](),[11.231][11.10351:10352](),[11.2856][11.10351:10352](),[11.10351][11.10351:10352](),[11.10352][11.232:274](),[11.274][11.870:970](),[11.10352][11.870:970](),[11.970][11.275:294](),[11.294][11.10473:10582](),[11.970][11.10473:10582](),[11.2952][11.10473:10582](),[11.10473][11.10473:10582](),[11.10582][11.191:271](),[11.271][11.72:644](),[11.283][11.295:315](),[11.644][11.295:315](),[11.10994][11.295:315](),[11.315][11.11016:11027](),[11.11016][11.11016:11027](),[11.11027][11.316:354](),[11.354][11.92:254](),[11.254][11.355:923](),[11.460][11.355:923](),[11.211][11.355:923](),[11.211][11.3067:3206](),[11.923][11.3067:3206](),[11.3067][11.3067:3206](),[11.2005][11.11080:11162](),[11.3206][11.11080:11162](),[11.11080][11.11080:11162](),[11.11162][10.2225:2283](),[10.2283][11.9877:9878](),[11.2064][11.9877:9878](),[11.11214][11.431:465](),[11.465][11.11277:11402](),[11.677][11.11277:11402](),[11.11277][11.11277:11402](),[11.11402][11.272:273](),[11.273][11.461:502](),[11.502][11.924:944](),[11.273][11.924:944](),[11.944][11.295:306](),[11.295][11.295:306](),[11.306][11.66:247](),[11.247][11.212:248](),[11.430][11.212:248](),[11.248][11.248:297](),[11.297][11.450:507](),[11.455][11.455:503](),[11.503][11.1065:1160](),[11.1160][6.0:66](),[6.66][11.667:727](),[11.667][11.667:727](),[11.727][11.0:230](),[11.230][11.3592:3676](),[11.3676][11.307:590](),[11.307][11.307:590](),[11.590][11.727:771](),[11.727][11.727:771](),[11.771][11.272:310](),[11.272][11.272:310](),[11.310][11.645:1331](),[11.316][11.1168:1200](),[11.1331][11.1168:1200](),[11.683][11.1168:1200](),[11.1200][11.683:720](),[11.683][11.683:720](),[11.720][11.787:797](),[11.787][11.787:797](),[11.797][11.11402:11403](),[11.11402][11.11402:11403](),[11.11403][11.945:974](),[11.758][11.11435:11436](),[11.974][11.11435:11436](),[11.1232][11.11435:11436](),[11.11435][11.11435:11436](),[11.11436][11.3207:3328](),[11.3328][11.11475:11476](),[11.11475][11.11475:11476](),[11.11476][11.3329:3380](),[11.3380][11.1332:1444](),[11.1444][11.3511:3558](),[11.3511][11.3511:3558](),[11.3558][11.975:1125](),[11.3806][11.1126:1510](),[11.1510][11.0:76](),[11.76][8.0:93](),[8.93][11.216:226](),[11.216][11.216:226](),[11.226][11.1538:1539](),[11.1538][11.1538:1539](),[11.1539][11.3806:3915](),[11.3806][11.3806:3915](),[11.3915][11.1540:1658](),[11.1658][11.3915:3981](),[11.3915][11.3915:3981](),[11.3981][11.317:351](),[11.351][11.11504:11514](),[11.3981][11.11504:11514](),[11.11504][11.11504:11514](),[11.11514][11.466:1203](),[11.1203][11.3982:4065](),[11.11514][11.3982:4065](),[11.4065][11.255:342](),[11.342][11.443:514](),[11.606][11.443:514](),[11.4065][11.443:514](),[11.514][11.11564:11566](),[11.2148][11.11564:11566](),[11.4104][11.11564:11566](),[11.11564][11.11564:11566](),[11.11566][11.343:412](),[11.412][11.607:1013](),[11.11566][11.607:1013](),[11.1013][11.4159:4200](),[11.4159][11.4159:4200](),[11.4200][11.1014:1104](),[11.1104][11.4200:4280](),[11.4200][11.4200:4280](),[11.4280][11.1105:1145](),[11.1145][11.4325:4450](),[11.4325][11.4325:4450](),[11.4450][11.1146:1401](),[11.1401][11.4543:4553](),[11.4543][11.4543:4553](),[11.4553][11.1402:1444](),[11.1444][11.92:134](),[11.134][11.1444:1640](),[11.1444][11.1444:1640](),[11.1640][11.4553:4559](),[11.4553][11.4553:4559](),[11.1354][11.11731:11732](),[11.11731][11.11731:11732](),[11.11732][11.135:164](),[11.164][11.0:79](),[11.1717][11.0:79](),[11.79][11.1717:1964](),[11.1717][11.1717:1964](),[11.1964][11.11791:11832](),[11.11791][11.11791:11832](),[11.11832][11.508:714](),[11.542][11.11832:11833](),[11.11832][11.11832:11833](),[11.11833][11.549:746](),[11.746][11.11833:12108](),[11.11833][11.11833:12108](),[11.12108][11.413:492](),[11.492][11.12133:12166](),[11.12133][11.12133:12166](),[11.12166][11.1355:1430](),[11.1430][11.1965:1992](),[11.1430][11.12240:12286](),[11.1992][11.12240:12286](),[11.12240][11.12240:12286](),[11.4629][11.12286:12329](),[11.12286][11.12286:12329](),[11.12329][11.493:586](),[11.586][11.12374:12393](),[11.618][11.12374:12393](),[11.2072][11.12374:12393](),[11.2201][11.12374:12393](),[11.4695][11.12374:12393](),[11.12374][11.12374:12393](),[11.4724][11.4724:4768](),[11.4810][11.4810:4847](),[11.4888][11.12469:12479](),[11.12469][11.12469:12479](),[11.5235][11.5235:5242](),[11.5242][11.2073:2153](),[11.2153][11.5278:5324](),[11.5278][11.5278:5324](),[11.5324][11.165:229](),[11.229][11.2216:2287](),[11.2216][11.2216:2287](),[11.2287][11.5608:5615](),[11.5608][11.5608:5615](),[11.5615][11.2288:2305]()
    }
    void State::queueMonitor()
    {
    while (true) {
    try {
    queueMonitorLoop();
    } catch (std::exception & e) {
    printMsg(lvlError, format("queue monitor: %1%") % e.what());
    sleep(10); // probably a DB problem, so don't retry right away
    }
    }
    }
    void State::queueMonitorLoop()
    {
    auto conn(dbPool.get());
    receiver buildsAdded(*conn, "builds_added");
    receiver buildsRestarted(*conn, "builds_restarted");
    receiver buildsCancelled(*conn, "builds_cancelled");
    receiver buildsDeleted(*conn, "builds_deleted");
    auto store = openStore(); // FIXME: pool
    unsigned int lastBuildId = 0;
    while (true) {
    getQueuedBuilds(*conn, store, lastBuildId);
    /* Sleep until we get notification from the database about an
    event. */
    conn->await_notification();
    nrQueueWakeups++;
    if (buildsAdded.get())
    printMsg(lvlTalkative, "got notification: new builds added to the queue");
    if (buildsRestarted.get()) {
    printMsg(lvlTalkative, "got notification: builds restarted");
    lastBuildId = 0; // check all builds
    }
    if (buildsCancelled.get() || buildsDeleted.get()) {
    printMsg(lvlTalkative, "got notification: builds cancelled");
    removeCancelledBuilds(*conn);
    }
    }
    }
    void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
    {
    printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
    /* Grab the queued builds from the database, but don't process
    them yet (since we don't want a long-running transaction). */
    std::multimap<Path, Build::ptr> newBuilds;
    {
    pqxx::work txn(conn);
    auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
    for (auto const & row : res) {
    auto builds_(builds.lock());
    BuildID id = row["id"].as<BuildID>();
    if (buildOne && id != buildOne) continue;
    if (id > lastBuildId) lastBuildId = id;
    if (has(*builds_, id)) continue;
    auto build = std::make_shared<Build>();
    build->id = id;
    build->drvPath = row["drvPath"].as<string>();
    build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
    build->maxSilentTime = row["maxsilent"].as<int>();
    build->buildTimeout = row["timeout"].as<int>();
    newBuilds.emplace(std::make_pair(build->drvPath, build));
    }
    }
    std::set<Step::ptr> newRunnable;
    unsigned int nrAdded;
    std::function<void(Build::ptr)> createBuild;
    createBuild = [&](Build::ptr build) {
    printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);
    nrAdded++;
    if (!store->isValidPath(build->drvPath)) {
    /* Derivation has been GC'ed prematurely. */
    printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
    if (!build->finishedInDB) {
    pqxx::work txn(conn);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
    (build->id)
    ((int) bsAborted)
    (time(0))
    ("derivation was garbage-collected prior to build").exec();
    txn.commit();
    build->finishedInDB = true;
    nrBuildsDone++;
    }
    return;
    }
    std::set<Step::ptr> newSteps;
    std::set<Path> finishedDrvs; // FIXME: re-use?
    Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
    /* Some of the new steps may be the top level of builds that
    we haven't processed yet. So do them now. This ensures that
    if build A depends on build B with top-level step X, then X
    will be "accounted" to B in doBuildStep(). */
    for (auto & r : newSteps) {
    while (true) {
    auto i = newBuilds.find(r->drvPath);
    if (i == newBuilds.end()) break;
    Build::ptr b = i->second;
    newBuilds.erase(i);
    createBuild(b);
    }
    }
    /* If we didn't get a step, it means the step's outputs are
    all valid. So we mark this as a finished, cached build. */
    if (!step) {
    Derivation drv = readDerivation(build->drvPath);
    BuildOutput res = getBuildOutput(store, drv);
    pqxx::work txn(conn);
    time_t now = time(0);
    markSucceededBuild(txn, build, res, true, now, now);
    txn.commit();
    build->finishedInDB = true;
    return;
    }
    /* If any step has an unsupported system type or has a
    previously failed output path, then fail the build right
    away. */
    bool badStep = false;
    for (auto & r : newSteps) {
    BuildStatus buildStatus = bsSuccess;
    BuildStepStatus buildStepStatus = bssFailed;
    if (checkCachedFailure(r, conn)) {
    printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
    buildStatus = step == r ? bsFailed : bsDepFailed;
    buildStepStatus = bssFailed;
    }
    if (buildStatus == bsSuccess) {
    bool supported = false;
    {
    auto machines_(machines.lock()); // FIXME: use shared_mutex
    for (auto & m : *machines_)
    if (m.second->supportsStep(r)) { supported = true; break; }
    }
    if (!supported) {
    printMsg(lvlError, format("aborting unsupported build %1%") % build->id);
    buildStatus = bsUnsupported;
    buildStepStatus = bssUnsupported;
    }
    }
    if (buildStatus != bsSuccess) {
    time_t now = time(0);
    if (!build->finishedInDB) {
    pqxx::work txn(conn);
    createBuildStep(txn, 0, build, r, "", buildStepStatus);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")
    (build->id)
    ((int) buildStatus)
    (now)
    (buildStatus != bsUnsupported ? 1 : 0).exec();
    txn.commit();
    build->finishedInDB = true;
    nrBuildsDone++;
    }
    badStep = true;
    break;
    }
    }
    if (badStep) return;
    /* Note: if we exit this scope prior to this, the build and
    all newly created steps are destroyed. */
    {
    auto builds_(builds.lock());
    if (!build->finishedInDB) // FIXME: can this happen?
    (*builds_)[build->id] = build;
    build->toplevel = step;
    }
    printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
    % build->id % step->drvPath % newSteps.size());
    };
    /* Now instantiate build steps for each new build. The builder
    threads can start building the runnable build steps right away,
    even while we're still processing other new builds. */
    while (!newBuilds.empty()) {
    auto build = newBuilds.begin()->second;
    newBuilds.erase(newBuilds.begin());
    newRunnable.clear();
    nrAdded = 0;
    try {
    createBuild(build);
    } catch (Error & e) {
    e.addPrefix(format("while loading build %1%: ") % build->id);
    throw;
    }
    /* Add the new runnable build steps to ‘runnable’ and wake up
    the builder threads. */
    printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
    for (auto & r : newRunnable)
    makeRunnable(r);
    nrBuildsRead += nrAdded;
    }
    }
    void State::removeCancelledBuilds(Connection & conn)
    {
    /* Get the current set of queued builds. */
    std::set<BuildID> currentIds;
    {
    pqxx::work txn(conn);
    auto res = txn.exec("select id from Builds where finished = 0");
    for (auto const & row : res)
    currentIds.insert(row["id"].as<BuildID>());
    }
    auto builds_(builds.lock());
    for (auto i = builds_->begin(); i != builds_->end(); ) {
    if (currentIds.find(i->first) == currentIds.end()) {
    printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
    i = builds_->erase(i);
    // FIXME: ideally we would interrupt active build steps here.
    } else
    ++i;
    }
    }
    Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
    std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
    {
    if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;
    /* Check if the requested step already exists. If not, create a
    new step. In any case, make the step reachable from
    referringBuild or referringStep. This is done atomically (with
    ‘steps’ locked), to ensure that this step can never become
    reachable from a new build after doBuildStep has removed it
    from ‘steps’. */
    Step::ptr step;
    bool isNew = false;
    {
    auto steps_(steps.lock());
    /* See if the step already exists in ‘steps’ and is not
    stale. */
    auto prev = steps_->find(drvPath);
    if (prev != steps_->end()) {
    step = prev->second.lock();
    /* Since ‘step’ is a strong pointer, the referred Step
    object won't be deleted after this. */
    if (!step) steps_->erase(drvPath); // remove stale entry
    }
    /* If it doesn't exist, create it. */
    if (!step) {
    step = std::make_shared<Step>();
    step->drvPath = drvPath;
    isNew = true;
    }
    auto step_(step->state.lock());
    assert(step_->created != isNew);
    if (referringBuild)
    step_->builds.push_back(referringBuild);
    if (referringStep)
    step_->rdeps.push_back(referringStep);
    (*steps_)[drvPath] = step;
    }
    if (!isNew) return step;
    printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);
    /* Initialize the step. Note that the step may be visible in
    ‘steps’ before this point, but that doesn't matter because
    it's not runnable yet, and other threads won't make it
    runnable while step->created == false. */
    step->drv = readDerivation(drvPath);
    {
    auto i = step->drv.env.find("requiredSystemFeatures");
    if (i != step->drv.env.end())
    step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
    }
    auto attr = step->drv.env.find("preferLocalBuild");
    step->preferLocalBuild =
    attr != step->drv.env.end() && attr->second == "1"
    && has(localPlatforms, step->drv.platform);
    /* Are all outputs valid? */
    bool valid = true;
    for (auto & i : step->drv.outputs) {
    if (!store->isValidPath(i.second.path)) {
    valid = false;
    break;
    }
    }
    // FIXME: check whether all outputs are in the binary cache.
    if (valid) {
    finishedDrvs.insert(drvPath);
    return 0;
    }
    /* No, we need to build. */
    printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);
    newSteps.insert(step);
    /* Create steps for the dependencies. */
    for (auto & i : step->drv.inputDrvs) {
    auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
    if (dep) {
    auto step_(step->state.lock());
    step_->deps.insert(dep);
    }
    }
    /* If the step has no (remaining) dependencies, make it
    runnable. */
    {
    auto step_(step->state.lock());
    assert(!step_->created);
    step_->created = true;
    if (step_->deps.empty())
    newRunnable.insert(step);
    }
    return step;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 204
    [11.14015][11.14015:14019](),[11.14019][11.2288:2329](),[11.2329][11.14046:14048](),[11.14046][11.14046:14048](),[11.14048][11.1515:1598](),[11.1598][11.10256:10257](),[11.10256][11.10256:10257](),[11.10257][11.6558:6604](),[11.14048][11.6558:6604](),[11.6604][11.230:262](),[11.262][11.2650:2683](),[11.2650][11.2650:2683](),[11.2683][11.6604:6647](),[11.6604][11.6604:6647](),[11.6647][11.2362:2369](),[11.2362][11.2362:2369](),[11.2369][11.6648:6725](),[11.2457][11.14256:14262](),[11.6725][11.14256:14262](),[11.14256][11.14256:14262](),[11.14262][11.2458:2459](),[11.2459][11.10258:10280](),[11.429][11.14262:14266](),[11.2488][11.14262:14266](),[11.10280][11.14262:14266](),[11.14262][11.14262:14266](),[11.14266][11.10281:10306](),[11.10306][11.2530:2532](),[11.2530][11.2530:2532](),[11.2532][11.20:39](),[11.39][11.1599:1650](),[11.1650][11.2577:2578](),[11.10387][11.2577:2578](),[11.2577][11.2577:2578](),[11.2578][11.1868:1915](),[11.1915][11.135:159](),[11.159][11.10471:10472](),[11.1734][11.10471:10472](),[11.10471][11.10471:10472](),[11.10472][11.160:173](),[11.173][11.0:67](),[11.67][11.0:138](),[11.138][11.204:397](),[11.204][11.204:397](),[11.397][11.284:347](),[11.284][11.284:347](),[11.347][11.398:442](),[11.442][11.3677:3765](),[11.509][11.466:480](),[11.3765][11.466:480](),[11.466][11.466:480](),[11.480][11.10589:10590](),[11.10589][11.10589:10590](),[11.10590][11.481:644](),[11.644][11.1980:1981](),[11.1980][11.1980:1981](),[11.1981][11.645:849](),[11.849][11.10702:10703](),[11.10702][11.10702:10703](),[11.10703][11.850:889](),[11.889][11.2000:2001](),[11.2000][11.2000:2001](),[11.2001][11.890:990](),[11.990][11.510:583](),[11.583][11.2104:2122](),[11.1065][11.2104:2122](),[11.2104][11.2104:2122](),[11.2122][11.584:742](),[11.742][11.1210:1237](),[11.1210][11.1210:1237](),[11.1237][11.0:45](),[11.45][11.743:923](),[11.103][11.1424:1750](),[11.923][11.1424:1750](),[11.1424][11.1424:1750](),[11.1750][11.924:971](),[11.971][11.1802:1882](),[11.1802][11.1802:1882](),[11.1882][11.3766:3851](),[11.1050][11.1954:2004](),[11.3851][11.1954:2004](),[11.1954][11.1954:2004](),[11.2004][11.2684:2773](),[11.2773][11.2091:2580](),[11.2091][11.2091:2580](),[11.2580][11.1051:1110](),[11.1110][11.2350:2435](),[11.2636][11.2350:2435](),[11.2350][11.2350:2435](),[11.2435][11.2637:3244](),[11.3244][11.3852:3956](),[11.1200][11.3330:3376](),[11.3956][11.3330:3376](),[11.3330][11.3330:3376](),[11.3376][11.3957:4065](),[11.4065][11.3472:3526](),[11.3472][11.3472:3526](),[11.3526][11.10866:10867](),[11.10866][11.10866:10867](),[11.10867][11.3527:3592](),[11.3592][11.11119:11137](),[11.11119][11.11119:11137](),[11.11346][11.11346:11347](),[11.11347][11.3593:3631](),[11.3631][11.11489:11503](),[11.11489][11.11489:11503](),[11.11503][11.3632:3662](),[11.3662][11.2852:2853](),[11.2852][11.2852:2853](),[11.2853][11.11504:11703](),[11.11703][11.2436:2724](),[11.2724][11.11744:11754](),[11.11744][11.11744:11754](),[11.11754][11.2985:2992](),[11.2985][11.2985:2992](),[11.2992][11.11755:11904](),[11.11904][11.1031:1066](),[11.1066][11.12574:12578](),[11.12574][11.12574:12578](),[11.12578][11.4066:4168](),[11.4168][11.12651:12653](),[11.12651][11.12651:12653](),[11.12653][11.2725:2749](),[11.2749][11.238:276](),[11.276][11.12653:12712](),[11.2749][11.12653:12712](),[11.12653][11.12653:12712](),[11.12712][11.4169:4220](),[11.2814][11.12768:12803](),[11.4220][11.12768:12803](),[11.12768][11.12768:12803](),[11.12803][11.2815:2908](),[11.2908][11.4221:4281](),[11.2981][11.12955:13097](),[11.4281][11.12955:13097](),[11.12955][11.12955:13097](),[11.13097][11.2982:3628](),[11.3628][11.13139:13143](),[11.13139][11.13139:13143](),[11.13143][11.3629:3702](),[11.3702][11.13216:13242](),[11.13216][11.13216:13242](),[11.3117][11.14306:14308](),[11.13242][11.14306:14308](),[11.14306][11.14306:14308](),[11.14308][11.2774:2820](),[11.2820][11.263:295](),[11.295][11.2851:2891](),[11.2851][11.2851:2891](),[11.2891][11.14341:14410](),[11.14341][11.14341:14410](),[11.14410][11.7094:7227](),[11.7227][11.14547:14812](),[11.14547][11.14547:14812](),[11.14812][11.7228:7234](),[11.7234][11.2892:3016](),[11.3016][11.14856:14857](),[11.7286][11.14856:14857](),[11.14856][11.14856:14857](),[11.14857][11.7287:7389](),[11.7389][11.3703:3770](),[11.3770][11.7457:7581](),[11.7457][11.7457:7581](),[11.7581][11.3771:4098](),[11.4098][11.147:243](),[11.243][11.4099:4124](),[11.7770][11.4099:4124](),[11.4124][11.7828:7838](),[11.7828][11.7828:7838](),[11.7838][11.15146:15147](),[11.15146][11.15146:15147](),[11.15147][11.7839:7955](),[11.7955][11.15251:15252](),[11.15251][11.15251:15252](),[11.15252][11.7956:8005](),[11.8005][11.15293:15294](),[11.15293][11.15293:15294](),[11.15294][11.796:995](),[11.995][11.8135:8141](),[11.2010][11.8135:8141](),[11.8135][11.8135:8141](),[11.8141][11.1054:1094](),[11.1094][11.15415:15416](),[11.8141][11.15415:15416](),[11.15415][11.15415:15416](),[11.454][11.678:707](),[11.15497][11.678:707](),[11.707][11.97:98](),[11.98][11.13243:13268](),[11.707][11.13243:13268](),[11.13268][10.2284:2305](),[10.2305][11.120:141](),[11.120][11.120:141](),[11.141][11.244:299](),[11.299][11.142:143](),[11.13300][11.142:143](),[11.143][11.4125:4222](),[11.4222][11.1233:1291](),[11.224][11.1233:1291](),[11.1291][11.15702:15703](),[11.15702][11.15702:15703](),[11.15703][11.488:511](),[11.511][10.2306:2358](),[10.2358][11.569:580](),[11.569][11.569:580](),[11.580][11.15902:15903](),[11.15902][11.15902:15903](),[11.15903][11.581:984](),[11.984][11.15934:15935](),[11.13706][11.15934:15935](),[11.15934][11.15934:15935](),[11.15935][11.1067:1095](),[11.1095][11.985:999](),[11.15935][11.985:999](),[11.999][11.996:1069](),[11.1069][11.1454:1552](),[11.617][11.1108:1138](),[11.699][11.1108:1138](),[11.1237][11.1108:1138](),[11.1307][11.1108:1138](),[11.1364][11.1108:1138](),[11.1552][11.1108:1138](),[11.1108][11.1108:1138](),[11.1138][10.2359:2413](),[10.2413][11.1195:1234](),[11.1195][11.1195:1234](),[11.2041][11.1319:1329](),[11.4382][11.1319:1329](),[11.1319][11.1319:1329](),[11.1329][11.16006:16007](),[11.3175][11.16006:16007](),[11.13797][11.16006:16007](),[11.16006][11.16006:16007](),[11.16007][10.2414:2484](),[10.2484][11.1469:1475](),[11.1469][11.1469:1475](),[11.1475][11.16047:16048](),[11.16047][11.16047:16048](),[11.16048][11.300:393](),[11.393][11.1096:1360](),[11.1529][11.1096:1360](),[11.1360][11.1529:1530](),[11.1529][11.1529:1530](),[11.1530][11.0:107](),[11.107][10.2485:2615](),[10.2615][11.92:159](),[11.92][11.92:159](),[11.159][11.108:474](),[11.4463][11.108:474](),[11.474][11.1095:1126](),[11.1126][11.474:509](),[11.474][11.474:509](),[11.509][11.4548:4555](),[11.4548][11.4548:4555](),[11.4555][10.2616:2644](),[10.2644][11.3069:3457](),[11.3069][11.3069:3457](),[11.3457][11.0:41](),[11.41][11.3457:3946](),[11.3457][11.3457:3946](),[11.3946][3.0:115](),[3.115][11.4003:4336](),[11.4003][11.4003:4336](),[11.4371][11.4371:4421](),[11.4448][11.4448:4737](),[11.4737][10.2645:2748](),[10.2748][11.4801:4861](),[11.4801][11.4801:4861](),[11.575][11.8648:8649](),[11.4861][11.8648:8649](),[11.8648][11.8648:8649](),[11.8649][11.4862:4906](),[11.4906][11.16115:16116](),[11.8746][11.16115:16116](),[11.16115][11.16115:16116](),[11.16116][3.116:156](),[3.156][11.4907:5190](),[11.16116][11.4907:5190](),[11.5190][11.42:85](),[11.85][11.5190:5204](),[11.5190][11.5190:5204](),[11.5204][11.597:608](),[11.608][11.86:471](),[11.471][11.8953:8963](),[11.840][11.8953:8963](),[11.5204][11.8953:8963](),[11.8953][11.8953:8963](),[11.888][11.8969:8970](),[11.8969][11.8969:8970](),[11.8970][11.5205:5478](),[11.771][11.16152:16153](),[11.5478][11.16152:16153](),[11.16152][11.16152:16153](),[11.16153][11.5479:5633](),[11.5633][11.139:324](),[11.324][11.296:376](),[11.376][11.5695:5713](),[11.403][11.5695:5713](),[11.5695][11.5695:5713](),[11.5713][11.16176:16177](),[11.13854][11.16176:16177](),[11.16176][11.16176:16177](),[11.16177][11.5714:5788](),[11.1231][11.16252:16253](),[11.5788][11.16252:16253](),[11.13955][11.16252:16253](),[11.16252][11.16252:16253](),[11.16253][11.5789:5802](),[11.124][11.16587:16588](),[11.5802][11.16587:16588](),[11.14052][11.16587:16588](),[11.16587][11.16587:16588](),[11.16588][11.5803:5932](),[11.5932][11.889:933](),[11.933][11.199:200](),[11.1563][11.199:200](),[11.5932][11.199:200](),[11.16983][11.199:200](),[11.200][11.5933:5956](),[11.1600][11.4841:4842](),[11.5956][11.4841:4842](),[11.4841][11.4841:4842](),[11.4842][11.5957:6223](),[11.334][11.16983:16984](),[11.2131][11.16983:16984](),[11.6223][11.16983:16984](),[11.16983][11.16983:16984](),[11.16984][11.6224:6517](),[11.6517][7.0:99](),[7.99][11.6658:6776](),[11.6658][11.6658:6776](),[11.1192][11.2270:2284](),[11.6776][11.2270:2284](),[11.2270][11.2270:2284](),[11.2284][11.6777:6870](),[11.6870][11.5100:5101](),[11.2284][11.5100:5101](),[11.5101][11.6871:6913](),[11.6913][10.2749:2905](),[10.2905][11.7104:7154](),[11.7104][11.7104:7154](),[11.7154][10.2906:3065](),[10.3065][11.7348:7349](),[11.7348][11.7348:7349](),[11.7349][10.3066:3138](),[10.3138][11.7420:7451](),[11.7420][11.7420:7451](),[11.7451][10.3139:3448](),[10.3448][11.7519:7894](),[11.7519][11.7519:7894](),[11.7894][11.0:79](),[11.79][4.0:109](),[4.109][11.140:174](),[11.140][11.140:174](),[11.174][11.8030:8106](),[11.8030][11.8030:8106](),[11.8106][5.0:92](),[5.92][11.8176:8533](),[11.8176][11.8176:8533](),[11.8533][4.110:166](),[4.166][11.8533:8625](),[11.8533][11.8533:8625](),[11.990][11.8675:8713](),[11.1500][11.8675:8713](),[11.8675][11.8675:8713](),[11.8713][11.1501:1670](),[11.1670][11.8865:9334](),[11.8865][11.8865:9334](),[11.9334][10.3449:3535](),[10.3535][11.9423:9581](),[11.9423][11.9423:9581](),[11.5297][11.2284:2285](),[11.9581][11.2284:2285](),[11.2284][11.2284:2285](),[11.2285][11.9582:9612](),[11.9612][11.1890:1904](),[11.1890][11.1890:1904](),[11.1904][11.2526:2527](),[11.6045][11.2526:2527](),[11.17548][11.2526:2527](),[11.2527][11.9613:9900](),[11.9900][11.472:519](),[11.519][11.1127:1179](),[11.519][11.9900:9914](),[11.1179][11.9900:9914](),[11.9900][11.9900:9914](),[11.9914][11.991:1243](),[11.1243][11.17548:17558](),[11.2868][11.17548:17558](),[11.9914][11.17548:17558](),[11.17548][11.17548:17558](),[11.17558][11.1244:1291](),[11.1291][11.17558:17559](),[11.17558][11.17558:17559](),[11.17582][11.17582:17588](),[11.2115][11.9569:9570](),[11.6292][11.9569:9570](),[11.9569][11.9569:9570](),[11.9570][11.226:272](),[11.272][11.394:526](),[11.9570][11.394:526](),[11.526][11.4282:4462](),[11.432][11.526:527](),[11.4462][11.526:527](),[11.526][11.526:527](),[11.527][11.1180:1220](),[11.527][11.2182:2200](),[11.1220][11.2182:2200](),[11.6379][11.2182:2200]()
    }
    void State::makeRunnable(Step::ptr step)
    {
    printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);
    {
    auto step_(step->state.lock());
    assert(step_->created);
    assert(!step->finished);
    assert(step_->deps.empty());
    }
    {
    auto runnable_(runnable.lock());
    runnable_->push_back(step);
    }
    wakeDispatcher();
    }
    void State::dispatcher()
    {
    while (true) {
    printMsg(lvlDebug, "dispatcher woken up");
    auto sleepUntil = system_time::max();
    bool keepGoing;
    do {
    /* Copy the currentJobs field of each machine. This is
    necessary to ensure that the sort comparator below is
    an ordering. std::sort() can segfault if it isn't. */
    struct MachineInfo
    {
    Machine::ptr machine;
    unsigned int currentJobs;
    };
    std::vector<MachineInfo> machinesSorted;
    {
    auto machines_(machines.lock());
    for (auto & m : *machines_)
    machinesSorted.push_back({m.second, m.second->state->currentJobs});
    }
    /* Sort the machines by a combination of speed factor and
    available slots. Prioritise the available machines as
    follows:
    - First by load divided by speed factor, rounded to the
    nearest integer. This causes fast machines to be
    preferred over slow machines with similar loads.
    - Then by speed factor.
    - Finally by load. */
    sort(machinesSorted.begin(), machinesSorted.end(),
    [](const MachineInfo & a, const MachineInfo & b) -> bool
    {
    float ta = roundf(a.currentJobs / a.machine->speedFactor);
    float tb = roundf(b.currentJobs / b.machine->speedFactor);
    return
    ta != tb ? ta < tb :
    a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
    a.currentJobs > b.currentJobs;
    });
    /* Find a machine with a free slot and find a step to run
    on it. Once we find such a pair, we restart the outer
    loop because the machine sorting will have changed. */
    keepGoing = false;
    system_time now = std::chrono::system_clock::now();
    for (auto & mi : machinesSorted) {
    // FIXME: can we lose a wakeup if a builder exits concurrently?
    if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
    auto runnable_(runnable.lock());
    //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
    /* FIXME: we're holding the runnable lock too long
    here. This could be more efficient. */
    for (auto i = runnable_->begin(); i != runnable_->end(); ) {
    auto step = i->lock();
    /* Delete dead steps. */
    if (!step) {
    i = runnable_->erase(i);
    continue;
    }
    /* Can this machine do this step? */
    if (!mi.machine->supportsStep(step)) {
    ++i;
    continue;
    }
    /* Skip previously failed steps that aren't ready
    to be retried. */
    {
    auto step_(step->state.lock());
    if (step_->tries > 0 && step_->after > now) {
    if (step_->after < sleepUntil)
    sleepUntil = step_->after;
    ++i;
    continue;
    }
    }
    /* Make a slot reservation and start a thread to
    do the build. */
    auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
    i = runnable_->erase(i);
    auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
    builderThread.detach(); // FIXME?
    keepGoing = true;
    break;
    }
    if (keepGoing) break;
    }
    } while (keepGoing);
    /* Sleep until we're woken up (either because a runnable build
    is added, or because a build finishes). */
    {
    std::unique_lock<std::mutex> lock(dispatcherMutex);
    printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
    std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
    dispatcherWakeup.wait_until(lock, sleepUntil);
    nrDispatcherWakeups++;
    }
    }
    printMsg(lvlError, "dispatcher exits");
    }
    void State::wakeDispatcher()
    {
    { std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
    dispatcherWakeup.notify_one();
    }
    void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation)
    {
    bool retry = true;
    MaintainCount mc(nrActiveSteps);
    try {
    auto store = openStore(); // FIXME: pool
    retry = doBuildStep(store, step, machine);
    } catch (std::exception & e) {
    printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
    % step->drvPath % machine->sshName % e.what());
    }
    /* Release the machine and wake up the dispatcher. */
    assert(reservation.unique());
    reservation = 0;
    wakeDispatcher();
    /* If there was a temporary failure, retry the step after an
    exponentially increasing interval. */
    if (retry) {
    {
    auto step_(step->state.lock());
    step_->tries++;
    nrRetries++;
    if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
    int delta = retryInterval * powf(retryBackoff, step_->tries - 1);
    printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);
    step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
    }
    makeRunnable(step);
    }
    }
    bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
    Machine::ptr machine)
    {
    {
    auto step_(step->state.lock());
    assert(step_->created);
    assert(!step->finished);
    }
    /* There can be any number of builds in the database that depend
    on this derivation. Arbitrarily pick one (though preferring a
    build of which this is the top-level derivation) for the
    purpose of creating build steps. We could create a build step
    record for every build, but that could be very expensive
    (e.g. a stdenv derivation can be a dependency of tens of
    thousands of builds), so we don't. */
    Build::ptr build;
    {
    std::set<Build::ptr> dependents;
    std::set<Step::ptr> steps;
    getDependents(step, dependents, steps);
    if (dependents.empty()) {
    /* Apparently all builds that depend on this derivation
    are gone (e.g. cancelled). So don't bother. This is
    very unlikely to happen, because normally Steps are
    only kept alive by being reachable from a
    Build. However, it's possible that a new Build just
    created a reference to this step. So to handle that
    possibility, we retry this step (putting it back in
    the runnable queue). If there are really no strong
    pointers to the step, it will be deleted. */
    printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);
    return true;
    }
    for (auto build2 : dependents)
    if (build2->drvPath == step->drvPath) { build = build2; break; }
    if (!build) build = *dependents.begin();
    printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")
    % step->drvPath % machine->sshName % build->id % (dependents.size() - 1));
    }
    bool quit = build->id == buildOne;
    auto conn(dbPool.get());
    RemoteResult result;
    BuildOutput res;
    int stepNr = 0;
    time_t stepStartTime = result.startTime = time(0);
    /* If any of the outputs have previously failed, then don't bother
    building again. */
    bool cachedFailure = checkCachedFailure(step, *conn);
    if (cachedFailure)
    result.status = BuildResult::CachedFailure;
    else {
    /* Create a build step record indicating that we started
    building. Also, mark the selected build as busy. */
    {
    pqxx::work txn(*conn);
    stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
    txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();
    txn.commit();
    }
    /* Do the build. */
    try {
    /* FIXME: referring builds may have conflicting timeouts. */
    buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
    } catch (Error & e) {
    result.status = BuildResult::MiscFailure;
    result.errorMsg = e.msg();
    }
    if (result.success()) res = getBuildOutput(store, step->drv);
    }
    time_t stepStopTime = time(0);
    if (!result.stopTime) result.stopTime = stepStopTime;
    /* Asynchronously compress the log. */
    if (result.logFile != "") {
    {
    auto logCompressorQueue_(logCompressorQueue.lock());
    logCompressorQueue_->push(result.logFile);
    }
    logCompressorWakeup.notify_one();
    }
    /* The step had a hopefully temporary failure (e.g. network
    issue). Retry a number of times. */
    if (result.canRetry()) {
    printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")
    % step->drvPath % machine->sshName % result.errorMsg);
    bool retry;
    {
    auto step_(step->state.lock());
    retry = step_->tries + 1 < maxTries;
    }
    if (retry) {
    pqxx::work txn(*conn);
    finishBuildStep(txn, result.startTime, result.stopTime, build->id,
    stepNr, machine->sshName, bssAborted, result.errorMsg);
    txn.commit();
    if (quit) exit(1);
    return true;
    }
    }
    if (result.success()) {
    /* Register success in the database for all Build objects that
    have this step as the top-level step. Since the queue
    monitor thread may be creating new referring Builds
    concurrently, and updating the database may fail, we do
    this in a loop, marking all known builds, repeating until
    there are no unmarked builds.
    */
    std::vector<BuildID> buildIDs;
    while (true) {
    /* Get the builds that have this one as the top-level. */
    std::vector<Build::ptr> direct;
    {
    auto steps_(steps.lock());
    auto step_(step->state.lock());
    for (auto & b_ : step_->builds) {
    auto b = b_.lock();
    if (b && !b->finishedInDB) direct.push_back(b);
    }
    /* If there are no builds left to update in the DB,
    then we're done (except for calling
    finishBuildStep()). Delete the step from
    ‘steps’. Since we've been holding the ‘steps’ lock,
    no new referrers can have been added in the
    meantime or be added afterwards. */
    if (direct.empty()) {
    printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);
    steps_->erase(step->drvPath);
    }
    }
    /* Update the database. */
    {
    pqxx::work txn(*conn);
    finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);
    for (auto & b : direct)
    markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,
    result.startTime, result.stopTime);
    txn.commit();
    }
    if (direct.empty()) break;
    /* Remove the direct dependencies from ‘builds’. This will
    cause them to be destroyed. */
    for (auto & b : direct) {
    auto builds_(builds.lock());
    b->finishedInDB = true;
    builds_->erase(b->id);
    buildIDs.push_back(b->id);
    }
    }
    /* Send notification about the builds that have this step as
    the top-level. */
    for (auto id : buildIDs) {
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));
    }
    notificationSenderWakeup.notify_one();
    }
    /* Wake up any dependent steps that have no other
    dependencies. */
    {
    auto step_(step->state.lock());
    for (auto & rdepWeak : step_->rdeps) {
    auto rdep = rdepWeak.lock();
    if (!rdep) continue;
    bool runnable = false;
    {
    auto rdep_(rdep->state.lock());
    rdep_->deps.erase(step);
    /* Note: if the step has not finished
    initialisation yet, it will be made runnable in
    createStep(), if appropriate. */
    if (rdep_->deps.empty() && rdep_->created) runnable = true;
    }
    if (runnable) makeRunnable(rdep);
    }
    }
    } else {
    /* Register failure in the database for all Build objects that
    directly or indirectly depend on this step. */
    std::vector<BuildID> dependentIDs;
    while (true) {
    /* Get the builds and steps that depend on this step. */
    std::set<Build::ptr> indirect;
    {
    auto steps_(steps.lock());
    std::set<Step::ptr> steps;
    getDependents(step, indirect, steps);
    /* If there are no builds left, delete all referring
    steps from ‘steps’. As for the success case, we can
    be certain no new referrers can be added. */
    if (indirect.empty()) {
    for (auto & s : steps) {
    printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);
    steps_->erase(s->drvPath);
    }
    break;
    }
    }
    /* Update the database. */
    {
    pqxx::work txn(*conn);
    BuildStatus buildStatus =
    result.status == BuildResult::TimedOut ? bsTimedOut :
    result.canRetry() ? bsAborted :
    bsFailed;
    BuildStepStatus buildStepStatus =
    result.status == BuildResult::TimedOut ? bssTimedOut :
    result.canRetry() ? bssAborted :
    bssFailed;
    /* For standard failures, we don't care about the error
    message. */
    if (result.status == BuildResult::PermanentFailure ||
    result.status == BuildResult::TransientFailure ||
    result.status == BuildResult::CachedFailure ||
    result.status == BuildResult::TimedOut)
    result.errorMsg = "";
    /* Create failed build steps for every build that depends
    on this. For cached failures, only create a step for
    builds that don't have this step as top-level
    (otherwise the user won't be able to see what caused
    the build to fail). */
    for (auto & build2 : indirect) {
    if ((cachedFailure && build2->drvPath == step->drvPath) ||
    (!cachedFailure && build == build2) ||
    build2->finishedInDB)
    continue;
    createBuildStep(txn, 0, build2, step, machine->sshName,
    buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);
    }
    if (!cachedFailure)
    finishBuildStep(txn, result.startTime, result.stopTime, build->id,
    stepNr, machine->sshName, buildStepStatus, result.errorMsg);
    /* Mark all builds that depend on this derivation as failed. */
    for (auto & build2 : indirect) {
    if (build2->finishedInDB) continue;
    printMsg(lvlError, format("marking build %1% as failed") % build2->id);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
    (build2->id)
    ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
    (result.startTime)
    (result.stopTime)
    (cachedFailure ? 1 : 0).exec();
    nrBuildsDone++;
    }
    /* Remember failed paths in the database so that they
    won't be built again. */
    if (!cachedFailure && result.status == BuildResult::PermanentFailure)
    for (auto & path : outputPaths(step->drv))
    txn.parameterized("insert into FailedPaths values ($1)")(path).exec();
    txn.commit();
    }
    /* Remove the indirect dependencies from ‘builds’. This
    will cause them to be destroyed. */
    for (auto & b : indirect) {
    auto builds_(builds.lock());
    b->finishedInDB = true;
    builds_->erase(b->id);
    dependentIDs.push_back(b->id);
    if (buildOne == b->id) quit = true;
    }
    }
    /* Send notification about this build and its dependents. */
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));
    }
    notificationSenderWakeup.notify_one();
    }
    // FIXME: keep stats about aborted steps?
    nrStepsDone++;
    totalStepTime += stepStopTime - stepStartTime;
    totalStepBuildTime += result.stopTime - result.startTime;
    machine->state->nrStepsDone++;
    machine->state->totalStepTime += stepStopTime - stepStartTime;
    machine->state->totalStepBuildTime += result.stopTime - result.startTime;
    if (quit) exit(0); // testing hack
    return false;
  • file addition: queue-monitor.cc (----------)
    [11.187]
    #include "state.hh"
    #include "build-result.hh"
    using namespace nix;
    void State::queueMonitor()
    {
    while (true) {
    try {
    queueMonitorLoop();
    } catch (std::exception & e) {
    printMsg(lvlError, format("queue monitor: %1%") % e.what());
    sleep(10); // probably a DB problem, so don't retry right away
    }
    }
    }
    void State::queueMonitorLoop()
    {
    auto conn(dbPool.get());
    receiver buildsAdded(*conn, "builds_added");
    receiver buildsRestarted(*conn, "builds_restarted");
    receiver buildsCancelled(*conn, "builds_cancelled");
    receiver buildsDeleted(*conn, "builds_deleted");
    auto store = openStore(); // FIXME: pool
    unsigned int lastBuildId = 0;
    while (true) {
    getQueuedBuilds(*conn, store, lastBuildId);
    /* Sleep until we get notification from the database about an
    event. */
    conn->await_notification();
    nrQueueWakeups++;
    if (buildsAdded.get())
    printMsg(lvlTalkative, "got notification: new builds added to the queue");
    if (buildsRestarted.get()) {
    printMsg(lvlTalkative, "got notification: builds restarted");
    lastBuildId = 0; // check all builds
    }
    if (buildsCancelled.get() || buildsDeleted.get()) {
    printMsg(lvlTalkative, "got notification: builds cancelled");
    removeCancelledBuilds(*conn);
    }
    }
    }
    void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
    {
    printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
    /* Grab the queued builds from the database, but don't process
    them yet (since we don't want a long-running transaction). */
    std::multimap<Path, Build::ptr> newBuilds;
    {
    pqxx::work txn(conn);
    auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
    for (auto const & row : res) {
    auto builds_(builds.lock());
    BuildID id = row["id"].as<BuildID>();
    if (buildOne && id != buildOne) continue;
    if (id > lastBuildId) lastBuildId = id;
    if (has(*builds_, id)) continue;
    auto build = std::make_shared<Build>();
    build->id = id;
    build->drvPath = row["drvPath"].as<string>();
    build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
    build->maxSilentTime = row["maxsilent"].as<int>();
    build->buildTimeout = row["timeout"].as<int>();
    newBuilds.emplace(std::make_pair(build->drvPath, build));
    }
    }
    std::set<Step::ptr> newRunnable;
    unsigned int nrAdded;
    std::function<void(Build::ptr)> createBuild;
    createBuild = [&](Build::ptr build) {
    printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);
    nrAdded++;
    if (!store->isValidPath(build->drvPath)) {
    /* Derivation has been GC'ed prematurely. */
    printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
    if (!build->finishedInDB) {
    pqxx::work txn(conn);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
    (build->id)
    ((int) bsAborted)
    (time(0))
    ("derivation was garbage-collected prior to build").exec();
    txn.commit();
    build->finishedInDB = true;
    nrBuildsDone++;
    }
    return;
    }
    std::set<Step::ptr> newSteps;
    std::set<Path> finishedDrvs; // FIXME: re-use?
    Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
    /* Some of the new steps may be the top level of builds that
    we haven't processed yet. So do them now. This ensures that
    if build A depends on build B with top-level step X, then X
    will be "accounted" to B in doBuildStep(). */
    for (auto & r : newSteps) {
    while (true) {
    auto i = newBuilds.find(r->drvPath);
    if (i == newBuilds.end()) break;
    Build::ptr b = i->second;
    newBuilds.erase(i);
    createBuild(b);
    }
    }
    /* If we didn't get a step, it means the step's outputs are
    all valid. So we mark this as a finished, cached build. */
    if (!step) {
    Derivation drv = readDerivation(build->drvPath);
    BuildOutput res = getBuildOutput(store, drv);
    pqxx::work txn(conn);
    time_t now = time(0);
    markSucceededBuild(txn, build, res, true, now, now);
    txn.commit();
    build->finishedInDB = true;
    return;
    }
    /* If any step has an unsupported system type or has a
    previously failed output path, then fail the build right
    away. */
    bool badStep = false;
    for (auto & r : newSteps) {
    BuildStatus buildStatus = bsSuccess;
    BuildStepStatus buildStepStatus = bssFailed;
    if (checkCachedFailure(r, conn)) {
    printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
    buildStatus = step == r ? bsFailed : bsDepFailed;
    buildStepStatus = bssFailed;
    }
    if (buildStatus == bsSuccess) {
    bool supported = false;
    {
    auto machines_(machines.lock()); // FIXME: use shared_mutex
    for (auto & m : *machines_)
    if (m.second->supportsStep(r)) { supported = true; break; }
    }
    if (!supported) {
    printMsg(lvlError, format("aborting unsupported build %1%") % build->id);
    buildStatus = bsUnsupported;
    buildStepStatus = bssUnsupported;
    }
    }
    if (buildStatus != bsSuccess) {
    time_t now = time(0);
    if (!build->finishedInDB) {
    pqxx::work txn(conn);
    createBuildStep(txn, 0, build, r, "", buildStepStatus);
    txn.parameterized
    ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")
    (build->id)
    ((int) buildStatus)
    (now)
    (buildStatus != bsUnsupported ? 1 : 0).exec();
    txn.commit();
    build->finishedInDB = true;
    nrBuildsDone++;
    }
    badStep = true;
    break;
    }
    }
    if (badStep) return;
    /* Note: if we exit this scope prior to this, the build and
    all newly created steps are destroyed. */
    {
    auto builds_(builds.lock());
    if (!build->finishedInDB) // FIXME: can this happen?
    (*builds_)[build->id] = build;
    build->toplevel = step;
    }
    printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
    % build->id % step->drvPath % newSteps.size());
    };
    /* Now instantiate build steps for each new build. The builder
    threads can start building the runnable build steps right away,
    even while we're still processing other new builds. */
    while (!newBuilds.empty()) {
    auto build = newBuilds.begin()->second;
    newBuilds.erase(newBuilds.begin());
    newRunnable.clear();
    nrAdded = 0;
    try {
    createBuild(build);
    } catch (Error & e) {
    e.addPrefix(format("while loading build %1%: ") % build->id);
    throw;
    }
    /* Add the new runnable build steps to ‘runnable’ and wake up
    the builder threads. */
    printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
    for (auto & r : newRunnable)
    makeRunnable(r);
    nrBuildsRead += nrAdded;
    }
    }
    void State::removeCancelledBuilds(Connection & conn)
    {
    /* Get the current set of queued builds. */
    std::set<BuildID> currentIds;
    {
    pqxx::work txn(conn);
    auto res = txn.exec("select id from Builds where finished = 0");
    for (auto const & row : res)
    currentIds.insert(row["id"].as<BuildID>());
    }
    auto builds_(builds.lock());
    for (auto i = builds_->begin(); i != builds_->end(); ) {
    if (currentIds.find(i->first) == currentIds.end()) {
    printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
    i = builds_->erase(i);
    // FIXME: ideally we would interrupt active build steps here.
    } else
    ++i;
    }
    }
    Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
    std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
    {
    if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;
    /* Check if the requested step already exists. If not, create a
    new step. In any case, make the step reachable from
    referringBuild or referringStep. This is done atomically (with
    ‘steps’ locked), to ensure that this step can never become
    reachable from a new build after doBuildStep has removed it
    from ‘steps’. */
    Step::ptr step;
    bool isNew = false;
    {
    auto steps_(steps.lock());
    /* See if the step already exists in ‘steps’ and is not
    stale. */
    auto prev = steps_->find(drvPath);
    if (prev != steps_->end()) {
    step = prev->second.lock();
    /* Since ‘step’ is a strong pointer, the referred Step
    object won't be deleted after this. */
    if (!step) steps_->erase(drvPath); // remove stale entry
    }
    /* If it doesn't exist, create it. */
    if (!step) {
    step = std::make_shared<Step>();
    step->drvPath = drvPath;
    isNew = true;
    }
    auto step_(step->state.lock());
    assert(step_->created != isNew);
    if (referringBuild)
    step_->builds.push_back(referringBuild);
    if (referringStep)
    step_->rdeps.push_back(referringStep);
    (*steps_)[drvPath] = step;
    }
    if (!isNew) return step;
    printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);
    /* Initialize the step. Note that the step may be visible in
    ‘steps’ before this point, but that doesn't matter because
    it's not runnable yet, and other threads won't make it
    runnable while step->created == false. */
    step->drv = readDerivation(drvPath);
    {
    auto i = step->drv.env.find("requiredSystemFeatures");
    if (i != step->drv.env.end())
    step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
    }
    auto attr = step->drv.env.find("preferLocalBuild");
    step->preferLocalBuild =
    attr != step->drv.env.end() && attr->second == "1"
    && has(localPlatforms, step->drv.platform);
    /* Are all outputs valid? */
    bool valid = true;
    for (auto & i : step->drv.outputs) {
    if (!store->isValidPath(i.second.path)) {
    valid = false;
    break;
    }
    }
    // FIXME: check whether all outputs are in the binary cache.
    if (valid) {
    finishedDrvs.insert(drvPath);
    return 0;
    }
    /* No, we need to build. */
    printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);
    newSteps.insert(step);
    /* Create steps for the dependencies. */
    for (auto & i : step->drv.inputDrvs) {
    auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
    if (dep) {
    auto step_(step->state.lock());
    step_->deps.insert(dep);
    }
    }
    /* If the step has no (remaining) dependencies, make it
    runnable. */
    {
    auto step_(step->state.lock());
    assert(!step_->created);
    step_->created = true;
    if (step_->deps.empty())
    newRunnable.insert(step);
    }
    return step;
    }
  • edit in src/hydra-queue-runner/state.hh at line 119
    [11.3264]
    [11.3264]
    void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
  • edit in src/hydra-queue-runner/state.hh at line 164
    [11.4356]
    [11.4356]
    // FIXME: Make configurable.
    const unsigned int maxTries = 5;
    const unsigned int retryInterval = 60; // seconds
    const float retryBackoff = 3.0;
    const unsigned int maxParallelCopyClosure = 4;
  • edit in src/hydra-queue-runner/state.hh at line 318
    [11.9202]
    template <class C, class V>
    bool has(const C & c, const V & v)
    {
    return c.find(v) != c.end();
    }