Split hydra-queue-runner.cc more
[?]
Jul 21, 2015, 1:14 PM
MHVIT4JYWUYD4UCGB2AHLXWLX6B5SYE22BREERNGANT7RGGDUFOACDependencies
- [2]
GYT6YOZNLock builds for a shorter amount of time - [3]
NIXRSHRKFix finishing steps that are not top-level of any build - [4]
O7MWBXF6Doh - [5]
EXBGZYRJDon't create a propagated build step to the same build - [6]
XHBBT2HEDoh - [7]
WDWPHWQ2Fix incorrect debug message - [8]
2AZSF326getQueuedBuilds(): Don't catch errors while loading a build from the queue - [9]
RVHBV3B3Notify the queue runner when a build is deleted - [10]
NAYQT2GThydra-queue-runner: Use cmdBuildDerivation - [11]
YZAI5GQUImplement a database connection pool - [12]
UQQ4IL55Add a error type for "unsupported system type" - [13]
22LDPAIPCheck non-runnable steps for unsupported system type - [14]
MSIHMO45Tweak build steps - [15]
NNOCZ4ROhydra-queue-runner: Improve dispatcher - [16]
C6HOMHZWDon't try to handle SIGINT - [17]
OCZ4LSGGAutomatically retry aborted builds - [18]
UFUAO7NDImprove logging for aborts - [19]
NJJ7H64SVery basic multi-threaded queue runner - [20]
63W4T5PUhydra-queue-runner: More stats - [21]
LJILHOJ7Create BuildSteps race-free - [22]
O64P4XJSKeep per-machine stats - [23]
N4IROACVMove buildRemote() into State - [24]
4D7CHQ34createStep(): Cache finished derivations - [25]
IE2PRAQUhydra-queue-runner: Send build notifications - [26]
RYTQLATYKeep track of failed paths in the Hydra database - [27]
HHOMBU7Ghydra-queue-runner: Implement timeouts - [28]
ODCBSLFGhydra-queue-runner: Fix segfault sorting machines by load - [29]
WDNUKCTNQueue monitor: Get only the fields we need - [30]
WHULPA6SHandle failure with output - [31]
FQQRJUO4Mark builds as busy - [32]
WFYMBNWBMove "created" field into Step::State - [33]
QJRDO2B4Simplify retry handling - [34]
A3IIKGSGhydra-queue-runner: Fix assertion failure - [35]
2IQRXLWESupport cancelling builds - [36]
WKJFPR77hydra-queue-runner: Maintain count of active build steps - [37]
4LAUAXO5Less verbosity - [38]
TKA75HWSRobustness - [39]
HJOEIMLRRefactor - [40]
JAUB2FT5getQueuedBuilds(): Handle dependent builds first - [41]
SK6WHODMSupport preferLocalBuild - [42]
ATJ54SPXUse PostgreSQL notifications for queue events - [43]
XV4AEKJChydra-queue-runner: Handle status queries on the main thread - [44]
7LB6QBXYKeep track of the number of build steps that are being built - [45]
PQFOMNTLhydra-queue-runner: More stats - [46]
N5O7VEEOImmediately abort builds that require an unsupported system type - [47]
FKLICOHYPrefer cached failure over unsupported system type - [48]
T2EIYJNGOn SIGINT, shut down the builder threads - [49]
HPJKBFZ4Handle concurrent finishing of the same build - [50]
TM6WKSP3hydra-queue-runner: Set isCachedBuild - [51]
RQUAATWBAdd status dump facility - [52]
HUUZFPPKFix race between the queue monitor and the builder threads - [53]
7I7XHQAEFix sending notifications in the successful case - [54]
SODOV2CMAutomatically reload $NIX_REMOTE_SYSTEMS when it changes - [55]
62MQPRXCPass null values to libpqxx properly - [56]
24BMQDZAStart of single-process hydra-queue-runner - [57]
X7AZHNKGSet finishedInDB in a few more places - [58]
ENXUSMSVMake concurrency more robust - [59]
GS4BE6TBAsynchronously compress build logs - [60]
5AIYUMTBBasic remote building - [61]
KBZHIGLGRecord the machine used for a build step - [62]
HLSHCK3CSupport requiredSystemFeatures - [63]
MB3TISH2Rate-limit the number of threads copying closures at the same time - [64]
E7WP35SFCreate build step for non-top-level cached failures - [65]
IWB3F4Z6Fail builds with previously failed steps early - [66]
GKZN4UV7Make the queue monitor more robust, and better debug output - [67]
LE4VZIY5More stats - [68]
A2GL5FOZMoar stats - [69]
UPZXMQDEFix machine selection - [70]
7VQ4ALFYUpdate "make check" for the new queue runner
Change contents
- replacement in src/hydra-queue-runner/Makefile.am at line 3
hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc \hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \builder.cc build-result.cc build-remote.cc \ - file addition: builder.cc[11.187]
#include <cmath>#include "state.hh"#include "build-result.hh"using namespace nix;void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation){bool retry = true;MaintainCount mc(nrActiveSteps);try {auto store = openStore(); // FIXME: poolretry = doBuildStep(store, step, machine);} catch (std::exception & e) {printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")% step->drvPath % machine->sshName % e.what());}/* Release the machine and wake up the dispatcher. */assert(reservation.unique());reservation = 0;wakeDispatcher();/* If there was a temporary failure, retry the step after anexponentially increasing interval. */if (retry) {{auto step_(step->state.lock());step_->tries++;nrRetries++;if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomicint delta = retryInterval * powf(retryBackoff, step_->tries - 1);printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);}makeRunnable(step);}}bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine){{auto step_(step->state.lock());assert(step_->created);assert(!step->finished);}/* There can be any number of builds in the database that dependon this derivation. Arbitrarily pick one (though preferring abuild of which this is the top-level derivation) for thepurpose of creating build steps. We could create a build steprecord for every build, but that could be very expensive(e.g. a stdenv derivation can be a dependency of tens ofthousands of builds), so we don't. */Build::ptr build;{std::set<Build::ptr> dependents;std::set<Step::ptr> steps;getDependents(step, dependents, steps);if (dependents.empty()) {/* Apparently all builds that depend on this derivationare gone (e.g. cancelled). So don't bother. This isvery unlikely to happen, because normally Steps areonly kept alive by being reachable from aBuild. However, it's possible that a new Build justcreated a reference to this step. So to handle thatpossibility, we retry this step (putting it back inthe runnable queue). If there are really no strongpointers to the step, it will be deleted. */printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);return true;}for (auto build2 : dependents)if (build2->drvPath == step->drvPath) { build = build2; break; }if (!build) build = *dependents.begin();printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")% step->drvPath % machine->sshName % build->id % (dependents.size() - 1));}bool quit = build->id == buildOne;auto conn(dbPool.get());RemoteResult result;BuildOutput res;int stepNr = 0;time_t stepStartTime = result.startTime = time(0);/* If any of the outputs have previously failed, then don't botherbuilding again. */bool cachedFailure = checkCachedFailure(step, *conn);if (cachedFailure)result.status = BuildResult::CachedFailure;else {/* Create a build step record indicating that we startedbuilding. Also, mark the selected build as busy. */{pqxx::work txn(*conn);stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();txn.commit();}/* Do the build. */try {/* FIXME: referring builds may have conflicting timeouts. */buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);} catch (Error & e) {result.status = BuildResult::MiscFailure;result.errorMsg = e.msg();}if (result.success()) res = getBuildOutput(store, step->drv);}time_t stepStopTime = time(0);if (!result.stopTime) result.stopTime = stepStopTime;/* Asynchronously compress the log. */if (result.logFile != "") {{auto logCompressorQueue_(logCompressorQueue.lock());logCompressorQueue_->push(result.logFile);}logCompressorWakeup.notify_one();}/* The step had a hopefully temporary failure (e.g. networkissue). Retry a number of times. */if (result.canRetry()) {printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")% step->drvPath % machine->sshName % result.errorMsg);bool retry;{auto step_(step->state.lock());retry = step_->tries + 1 < maxTries;}if (retry) {pqxx::work txn(*conn);finishBuildStep(txn, result.startTime, result.stopTime, build->id,stepNr, machine->sshName, bssAborted, result.errorMsg);txn.commit();if (quit) exit(1);return true;}}if (result.success()) {/* Register success in the database for all Build objects thathave this step as the top-level step. Since the queuemonitor thread may be creating new referring Buildsconcurrently, and updating the database may fail, we dothis in a loop, marking all known builds, repeating untilthere are no unmarked builds.*/std::vector<BuildID> buildIDs;while (true) {/* Get the builds that have this one as the top-level. */std::vector<Build::ptr> direct;{auto steps_(steps.lock());auto step_(step->state.lock());for (auto & b_ : step_->builds) {auto b = b_.lock();if (b && !b->finishedInDB) direct.push_back(b);}/* If there are no builds left to update in the DB,then we're done (except for callingfinishBuildStep()). Delete the step from‘steps’. Since we've been holding the ‘steps’ lock,no new referrers can have been added in themeantime or be added afterwards. */if (direct.empty()) {printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);steps_->erase(step->drvPath);}}/* Update the database. */{pqxx::work txn(*conn);finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);for (auto & b : direct)markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,result.startTime, result.stopTime);txn.commit();}if (direct.empty()) break;/* Remove the direct dependencies from ‘builds’. This willcause them to be destroyed. */for (auto & b : direct) {auto builds_(builds.lock());b->finishedInDB = true;builds_->erase(b->id);buildIDs.push_back(b->id);}}/* Send notification about the builds that have this step asthe top-level. */for (auto id : buildIDs) {{auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));}notificationSenderWakeup.notify_one();}/* Wake up any dependent steps that have no otherdependencies. */{auto step_(step->state.lock());for (auto & rdepWeak : step_->rdeps) {auto rdep = rdepWeak.lock();if (!rdep) continue;bool runnable = false;{auto rdep_(rdep->state.lock());rdep_->deps.erase(step);/* Note: if the step has not finishedinitialisation yet, it will be made runnable increateStep(), if appropriate. */if (rdep_->deps.empty() && rdep_->created) runnable = true;}if (runnable) makeRunnable(rdep);}}} else {/* Register failure in the database for all Build objects thatdirectly or indirectly depend on this step. */std::vector<BuildID> dependentIDs;while (true) {/* Get the builds and steps that depend on this step. */std::set<Build::ptr> indirect;{auto steps_(steps.lock());std::set<Step::ptr> steps;getDependents(step, indirect, steps);/* If there are no builds left, delete all referringsteps from ‘steps’. As for the success case, we canbe certain no new referrers can be added. */if (indirect.empty()) {for (auto & s : steps) {printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);steps_->erase(s->drvPath);}break;}}/* Update the database. */{pqxx::work txn(*conn);BuildStatus buildStatus =result.status == BuildResult::TimedOut ? bsTimedOut :result.canRetry() ? bsAborted :bsFailed;BuildStepStatus buildStepStatus =result.status == BuildResult::TimedOut ? bssTimedOut :result.canRetry() ? bssAborted :bssFailed;/* For standard failures, we don't care about the errormessage. */if (result.status == BuildResult::PermanentFailure ||result.status == BuildResult::TransientFailure ||result.status == BuildResult::CachedFailure ||result.status == BuildResult::TimedOut)result.errorMsg = "";/* Create failed build steps for every build that dependson this. For cached failures, only create a step forbuilds that don't have this step as top-level(otherwise the user won't be able to see what causedthe build to fail). */for (auto & build2 : indirect) {if ((cachedFailure && build2->drvPath == step->drvPath) ||(!cachedFailure && build == build2) ||build2->finishedInDB)continue;createBuildStep(txn, 0, build2, step, machine->sshName,buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);}if (!cachedFailure)finishBuildStep(txn, result.startTime, result.stopTime, build->id,stepNr, machine->sshName, buildStepStatus, result.errorMsg);/* Mark all builds that depend on this derivation as failed. */for (auto & build2 : indirect) {if (build2->finishedInDB) continue;printMsg(lvlError, format("marking build %1% as failed") % build2->id);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")(build2->id)((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))(result.startTime)(result.stopTime)(cachedFailure ? 1 : 0).exec();nrBuildsDone++;}/* Remember failed paths in the database so that theywon't be built again. */if (!cachedFailure && result.status == BuildResult::PermanentFailure)for (auto & path : outputPaths(step->drv))txn.parameterized("insert into FailedPaths values ($1)")(path).exec();txn.commit();}/* Remove the indirect dependencies from ‘builds’. Thiswill cause them to be destroyed. */for (auto & b : indirect) {auto builds_(builds.lock());b->finishedInDB = true;builds_->erase(b->id);dependentIDs.push_back(b->id);if (buildOne == b->id) quit = true;}}/* Send notification about this build and its dependents. */{auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));}notificationSenderWakeup.notify_one();}// FIXME: keep stats about aborted steps?nrStepsDone++;totalStepTime += stepStopTime - stepStartTime;totalStepBuildTime += result.stopTime - result.startTime;machine->state->nrStepsDone++;machine->state->totalStepTime += stepStopTime - stepStartTime;machine->state->totalStepBuildTime += result.stopTime - result.startTime;if (quit) exit(0); // testing hackreturn false;} - file addition: dispatcher.cc[11.187]
#include <algorithm>#include <thread>#include "state.hh"using namespace nix;void State::makeRunnable(Step::ptr step){printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);{auto step_(step->state.lock());assert(step_->created);assert(!step->finished);assert(step_->deps.empty());}{auto runnable_(runnable.lock());runnable_->push_back(step);}wakeDispatcher();}void State::dispatcher(){while (true) {printMsg(lvlDebug, "dispatcher woken up");auto sleepUntil = system_time::max();bool keepGoing;do {/* Copy the currentJobs field of each machine. This isnecessary to ensure that the sort comparator below isan ordering. std::sort() can segfault if it isn't. */struct MachineInfo{Machine::ptr machine;unsigned int currentJobs;};std::vector<MachineInfo> machinesSorted;{auto machines_(machines.lock());for (auto & m : *machines_)machinesSorted.push_back({m.second, m.second->state->currentJobs});}/* Sort the machines by a combination of speed factor andavailable slots. Prioritise the available machines asfollows:- First by load divided by speed factor, rounded to thenearest integer. This causes fast machines to bepreferred over slow machines with similar loads.- Then by speed factor.- Finally by load. */sort(machinesSorted.begin(), machinesSorted.end(),[](const MachineInfo & a, const MachineInfo & b) -> bool{float ta = roundf(a.currentJobs / a.machine->speedFactor);float tb = roundf(b.currentJobs / b.machine->speedFactor);returnta != tb ? ta < tb :a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :a.currentJobs > b.currentJobs;});/* Find a machine with a free slot and find a step to runon it. Once we find such a pair, we restart the outerloop because the machine sorting will have changed. */keepGoing = false;system_time now = std::chrono::system_clock::now();for (auto & mi : machinesSorted) {// FIXME: can we lose a wakeup if a builder exits concurrently?if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;auto runnable_(runnable.lock());//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());/* FIXME: we're holding the runnable lock too longhere. This could be more efficient. */for (auto i = runnable_->begin(); i != runnable_->end(); ) {auto step = i->lock();/* Delete dead steps. */if (!step) {i = runnable_->erase(i);continue;}/* Can this machine do this step? */if (!mi.machine->supportsStep(step)) {++i;continue;}/* Skip previously failed steps that aren't readyto be retried. */{auto step_(step->state.lock());if (step_->tries > 0 && step_->after > now) {if (step_->after < sleepUntil)sleepUntil = step_->after;++i;continue;}}/* Make a slot reservation and start a thread todo the build. */auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);i = runnable_->erase(i);auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);builderThread.detach(); // FIXME?keepGoing = true;break;}if (keepGoing) break;}} while (keepGoing);/* Sleep until we're woken up (either because a runnable buildis added, or because a build finishes). */{std::unique_lock<std::mutex> lock(dispatcherMutex);printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());dispatcherWakeup.wait_until(lock, sleepUntil);nrDispatcherWakeups++;}}printMsg(lvlError, "dispatcher exits");}void State::wakeDispatcher(){{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrierdispatcherWakeup.notify_one();} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 3
#include <cmath>#include <algorithm> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 8
#include "build-result.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 9
#include "build-result.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 18[11.22]→[11.826:938](∅→∅),[11.938]→[11.1145:1177](∅→∅),[11.1145]→[11.1145:1177](∅→∅),[11.1177]→[11.939:986](∅→∅),[11.23]→[11.1178:1179](∅→∅),[11.1251]→[11.1251:1252](∅→∅),[11.1253]→[11.23:121](∅→∅),[11.23]→[11.23:121](∅→∅),[11.905]→[11.905:907](∅→∅),[11.626]→[11.1254:1255](∅→∅),[11.1340]→[11.1340:1341](∅→∅)
// FIXME: Make configurable.const unsigned int maxTries = 5;const unsigned int retryInterval = 60; // secondsconst float retryBackoff = 3.0;const unsigned int maxParallelCopyClosure = 4;template <class C, class V>bool has(const C & c, const V & v){return c.find(v) != c.end();} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 170[11.577]→[11.9821:9825](∅→∅),[11.1014]→[11.9821:9825](∅→∅),[11.9796]→[11.9821:9825](∅→∅),[11.9821]→[11.9821:9825](∅→∅),[11.9825]→[11.199:226](∅→∅),[11.226]→[11.238:543](∅→∅),[11.226]→[11.1433:1435](∅→∅),[11.543]→[11.1433:1435](∅→∅),[11.1433]→[11.1433:1435](∅→∅),[11.1435]→[11.88:117](∅→∅),[11.573]→[11.573:737](∅→∅),[11.737]→[9.0:53](∅→∅),[9.53]→[11.737:738](∅→∅),[11.163]→[11.737:738](∅→∅),[11.737]→[11.737:738](∅→∅),[11.738]→[11.1435:1480](∅→∅),[11.1435]→[11.1435:1480](∅→∅),[11.1480]→[11.739:774](∅→∅),[11.774]→[11.1502:1503](∅→∅),[11.1502]→[11.1502:1503](∅→∅),[11.1503]→[11.0:19](∅→∅),[11.19]→[11.160:212](∅→∅),[11.212]→[11.820:948](∅→∅),[11.820]→[11.820:948](∅→∅),[11.948]→[11.164:190](∅→∅),[11.190]→[11.1560:1561](∅→∅),[11.480]→[11.1560:1561](∅→∅),[11.948]→[11.1560:1561](∅→∅),[11.1560]→[11.1560:1561](∅→∅),[11.1561]→[11.949:980](∅→∅),[11.980]→[11.544:631](∅→∅),[11.631]→[11.1063:1100](∅→∅),[11.1063]→[11.1063:1100](∅→∅),[11.1100]→[11.632:706](∅→∅),[11.706]→[11.1170:1229](∅→∅),[11.1170]→[11.1170:1229](∅→∅),[11.1229]→[9.54:114](∅→∅),[9.114]→[11.707:781](∅→∅),[11.1266]→[11.707:781](∅→∅),[11.781]→[11.213:255](∅→∅),[11.1336]→[11.213:255](∅→∅),[11.255]→[11.1739:1749](∅→∅),[11.395]→[11.1739:1749](∅→∅),[11.1336]→[11.1739:1749](∅→∅),[11.1739]→[11.1739:1749](∅→∅),[11.1749]→[11.191:192](∅→∅),[11.255]→[11.1749:1755](∅→∅),[11.1749]→[11.1749:1755](∅→∅),[11.1803]→[11.1803:1807](∅→∅),[11.1807]→[11.256:364](∅→∅),[11.364]→[11.9878:9880](∅→∅),[11.542]→[11.9878:9880](∅→∅),[11.1426]→[11.9878:9880](∅→∅),[11.1893]→[11.9878:9880](∅→∅),[11.9878]→[11.9878:9880](∅→∅),[11.9880]→[11.782:869](∅→∅),[11.573]→[11.1943:1944](∅→∅),[11.869]→[11.1943:1944](∅→∅),[11.1943]→[11.1943:1944](∅→∅),[11.1605]→[11.1605:1741](∅→∅),[11.1741]→[11.0:47](∅→∅),[11.47]→[11.1798:1805](∅→∅),[11.1798]→[11.1798:1805](∅→∅),[11.1805]→[11.365:395](∅→∅),[11.395]→[11.1835:1836](∅→∅),[11.605]→[11.1835:1836](∅→∅),[11.1835]→[11.1835:1836](∅→∅),[11.1836]→[11.414:591](∅→∅),[11.157]→[11.9906:9907](∅→∅),[11.591]→[11.9906:9907](∅→∅),[11.1641]→[11.9906:9907](∅→∅),[11.2009]→[11.9906:9907](∅→∅),[11.9906]→[11.9906:9907](∅→∅),[11.2048]→[11.2048:2087](∅→∅),[11.2087]→[2.0:41](∅→∅),[2.41]→[11.2087:2137](∅→∅),[11.2087]→[11.2087:2137](∅→∅),[11.2137]→[11.999:1053](∅→∅),[11.1053]→[11.1642:1694](∅→∅),[11.2137]→[11.1642:1694](∅→∅),[11.1694]→[11.2137:2450](∅→∅),[11.2137]→[11.2137:2450](∅→∅),[11.2450]→[11.592:715](∅→∅),[11.795]→[11.10056:10057](∅→∅),[11.2450]→[11.10056:10057](∅→∅),[11.10056]→[11.10056:10057](∅→∅),[11.10057]→[11.48:118](∅→∅),[11.118]→[11.2491:2507](∅→∅),[11.2491]→[11.2491:2507](∅→∅),[11.2507]→[11.10235:10236](∅→∅),[11.10235]→[11.10235:10236](∅→∅),[11.10236]→[11.119:231](∅→∅),[11.231]→[11.10351:10352](∅→∅),[11.2856]→[11.10351:10352](∅→∅),[11.10351]→[11.10351:10352](∅→∅),[11.10352]→[11.232:274](∅→∅),[11.274]→[11.870:970](∅→∅),[11.10352]→[11.870:970](∅→∅),[11.970]→[11.275:294](∅→∅),[11.294]→[11.10473:10582](∅→∅),[11.970]→[11.10473:10582](∅→∅),[11.2952]→[11.10473:10582](∅→∅),[11.10473]→[11.10473:10582](∅→∅),[11.10582]→[11.191:271](∅→∅),[11.271]→[11.72:644](∅→∅),[11.283]→[11.295:315](∅→∅),[11.644]→[11.295:315](∅→∅),[11.10994]→[11.295:315](∅→∅),[11.315]→[11.11016:11027](∅→∅),[11.11016]→[11.11016:11027](∅→∅),[11.11027]→[11.316:354](∅→∅),[11.354]→[11.92:254](∅→∅),[11.254]→[11.355:923](∅→∅),[11.460]→[11.355:923](∅→∅),[11.211]→[11.355:923](∅→∅),[11.211]→[11.3067:3206](∅→∅),[11.923]→[11.3067:3206](∅→∅),[11.3067]→[11.3067:3206](∅→∅),[11.2005]→[11.11080:11162](∅→∅),[11.3206]→[11.11080:11162](∅→∅),[11.11080]→[11.11080:11162](∅→∅),[11.11162]→[10.2225:2283](∅→∅),[10.2283]→[11.9877:9878](∅→∅),[11.2064]→[11.9877:9878](∅→∅),[11.11214]→[11.431:465](∅→∅),[11.465]→[11.11277:11402](∅→∅),[11.677]→[11.11277:11402](∅→∅),[11.11277]→[11.11277:11402](∅→∅),[11.11402]→[11.272:273](∅→∅),[11.273]→[11.461:502](∅→∅),[11.502]→[11.924:944](∅→∅),[11.273]→[11.924:944](∅→∅),[11.944]→[11.295:306](∅→∅),[11.295]→[11.295:306](∅→∅),[11.306]→[11.66:247](∅→∅),[11.247]→[11.212:248](∅→∅),[11.430]→[11.212:248](∅→∅),[11.248]→[11.248:297](∅→∅),[11.297]→[11.450:507](∅→∅),[11.455]→[11.455:503](∅→∅),[11.503]→[11.1065:1160](∅→∅),[11.1160]→[6.0:66](∅→∅),[6.66]→[11.667:727](∅→∅),[11.667]→[11.667:727](∅→∅),[11.727]→[11.0:230](∅→∅),[11.230]→[11.3592:3676](∅→∅),[11.3676]→[11.307:590](∅→∅),[11.307]→[11.307:590](∅→∅),[11.590]→[11.727:771](∅→∅),[11.727]→[11.727:771](∅→∅),[11.771]→[11.272:310](∅→∅),[11.272]→[11.272:310](∅→∅),[11.310]→[11.645:1331](∅→∅),[11.316]→[11.1168:1200](∅→∅),[11.1331]→[11.1168:1200](∅→∅),[11.683]→[11.1168:1200](∅→∅),[11.1200]→[11.683:720](∅→∅),[11.683]→[11.683:720](∅→∅),[11.720]→[11.787:797](∅→∅),[11.787]→[11.787:797](∅→∅),[11.797]→[11.11402:11403](∅→∅),[11.11402]→[11.11402:11403](∅→∅),[11.11403]→[11.945:974](∅→∅),[11.758]→[11.11435:11436](∅→∅),[11.974]→[11.11435:11436](∅→∅),[11.1232]→[11.11435:11436](∅→∅),[11.11435]→[11.11435:11436](∅→∅),[11.11436]→[11.3207:3328](∅→∅),[11.3328]→[11.11475:11476](∅→∅),[11.11475]→[11.11475:11476](∅→∅),[11.11476]→[11.3329:3380](∅→∅),[11.3380]→[11.1332:1444](∅→∅),[11.1444]→[11.3511:3558](∅→∅),[11.3511]→[11.3511:3558](∅→∅),[11.3558]→[11.975:1125](∅→∅),[11.3806]→[11.1126:1510](∅→∅),[11.1510]→[11.0:76](∅→∅),[11.76]→[8.0:93](∅→∅),[8.93]→[11.216:226](∅→∅),[11.216]→[11.216:226](∅→∅),[11.226]→[11.1538:1539](∅→∅),[11.1538]→[11.1538:1539](∅→∅),[11.1539]→[11.3806:3915](∅→∅),[11.3806]→[11.3806:3915](∅→∅),[11.3915]→[11.1540:1658](∅→∅),[11.1658]→[11.3915:3981](∅→∅),[11.3915]→[11.3915:3981](∅→∅),[11.3981]→[11.317:351](∅→∅),[11.351]→[11.11504:11514](∅→∅),[11.3981]→[11.11504:11514](∅→∅),[11.11504]→[11.11504:11514](∅→∅),[11.11514]→[11.466:1203](∅→∅),[11.1203]→[11.3982:4065](∅→∅),[11.11514]→[11.3982:4065](∅→∅),[11.4065]→[11.255:342](∅→∅),[11.342]→[11.443:514](∅→∅),[11.606]→[11.443:514](∅→∅),[11.4065]→[11.443:514](∅→∅),[11.514]→[11.11564:11566](∅→∅),[11.2148]→[11.11564:11566](∅→∅),[11.4104]→[11.11564:11566](∅→∅),[11.11564]→[11.11564:11566](∅→∅),[11.11566]→[11.343:412](∅→∅),[11.412]→[11.607:1013](∅→∅),[11.11566]→[11.607:1013](∅→∅),[11.1013]→[11.4159:4200](∅→∅),[11.4159]→[11.4159:4200](∅→∅),[11.4200]→[11.1014:1104](∅→∅),[11.1104]→[11.4200:4280](∅→∅),[11.4200]→[11.4200:4280](∅→∅),[11.4280]→[11.1105:1145](∅→∅),[11.1145]→[11.4325:4450](∅→∅),[11.4325]→[11.4325:4450](∅→∅),[11.4450]→[11.1146:1401](∅→∅),[11.1401]→[11.4543:4553](∅→∅),[11.4543]→[11.4543:4553](∅→∅),[11.4553]→[11.1402:1444](∅→∅),[11.1444]→[11.92:134](∅→∅),[11.134]→[11.1444:1640](∅→∅),[11.1444]→[11.1444:1640](∅→∅),[11.1640]→[11.4553:4559](∅→∅),[11.4553]→[11.4553:4559](∅→∅),[11.1354]→[11.11731:11732](∅→∅),[11.11731]→[11.11731:11732](∅→∅),[11.11732]→[11.135:164](∅→∅),[11.164]→[11.0:79](∅→∅),[11.1717]→[11.0:79](∅→∅),[11.79]→[11.1717:1964](∅→∅),[11.1717]→[11.1717:1964](∅→∅),[11.1964]→[11.11791:11832](∅→∅),[11.11791]→[11.11791:11832](∅→∅),[11.11832]→[11.508:714](∅→∅),[11.542]→[11.11832:11833](∅→∅),[11.11832]→[11.11832:11833](∅→∅),[11.11833]→[11.549:746](∅→∅),[11.746]→[11.11833:12108](∅→∅),[11.11833]→[11.11833:12108](∅→∅),[11.12108]→[11.413:492](∅→∅),[11.492]→[11.12133:12166](∅→∅),[11.12133]→[11.12133:12166](∅→∅),[11.12166]→[11.1355:1430](∅→∅),[11.1430]→[11.1965:1992](∅→∅),[11.1430]→[11.12240:12286](∅→∅),[11.1992]→[11.12240:12286](∅→∅),[11.12240]→[11.12240:12286](∅→∅),[11.4629]→[11.12286:12329](∅→∅),[11.12286]→[11.12286:12329](∅→∅),[11.12329]→[11.493:586](∅→∅),[11.586]→[11.12374:12393](∅→∅),[11.618]→[11.12374:12393](∅→∅),[11.2072]→[11.12374:12393](∅→∅),[11.2201]→[11.12374:12393](∅→∅),[11.4695]→[11.12374:12393](∅→∅),[11.12374]→[11.12374:12393](∅→∅),[11.4724]→[11.4724:4768](∅→∅),[11.4810]→[11.4810:4847](∅→∅),[11.4888]→[11.12469:12479](∅→∅),[11.12469]→[11.12469:12479](∅→∅),[11.5235]→[11.5235:5242](∅→∅),[11.5242]→[11.2073:2153](∅→∅),[11.2153]→[11.5278:5324](∅→∅),[11.5278]→[11.5278:5324](∅→∅),[11.5324]→[11.165:229](∅→∅),[11.229]→[11.2216:2287](∅→∅),[11.2216]→[11.2216:2287](∅→∅),[11.2287]→[11.5608:5615](∅→∅),[11.5608]→[11.5608:5615](∅→∅),[11.5615]→[11.2288:2305](∅→∅)
}void State::queueMonitor(){while (true) {try {queueMonitorLoop();} catch (std::exception & e) {printMsg(lvlError, format("queue monitor: %1%") % e.what());sleep(10); // probably a DB problem, so don't retry right away}}}void State::queueMonitorLoop(){auto conn(dbPool.get());receiver buildsAdded(*conn, "builds_added");receiver buildsRestarted(*conn, "builds_restarted");receiver buildsCancelled(*conn, "builds_cancelled");receiver buildsDeleted(*conn, "builds_deleted");auto store = openStore(); // FIXME: poolunsigned int lastBuildId = 0;while (true) {getQueuedBuilds(*conn, store, lastBuildId);/* Sleep until we get notification from the database about anevent. */conn->await_notification();nrQueueWakeups++;if (buildsAdded.get())printMsg(lvlTalkative, "got notification: new builds added to the queue");if (buildsRestarted.get()) {printMsg(lvlTalkative, "got notification: builds restarted");lastBuildId = 0; // check all builds}if (buildsCancelled.get() || buildsDeleted.get()) {printMsg(lvlTalkative, "got notification: builds cancelled");removeCancelledBuilds(*conn);}}}void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId){printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);/* Grab the queued builds from the database, but don't processthem yet (since we don't want a long-running transaction). */std::multimap<Path, Build::ptr> newBuilds;{pqxx::work txn(conn);auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();for (auto const & row : res) {auto builds_(builds.lock());BuildID id = row["id"].as<BuildID>();if (buildOne && id != buildOne) continue;if (id > lastBuildId) lastBuildId = id;if (has(*builds_, id)) continue;auto build = std::make_shared<Build>();build->id = id;build->drvPath = row["drvPath"].as<string>();build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();build->maxSilentTime = row["maxsilent"].as<int>();build->buildTimeout = row["timeout"].as<int>();newBuilds.emplace(std::make_pair(build->drvPath, build));}}std::set<Step::ptr> newRunnable;unsigned int nrAdded;std::function<void(Build::ptr)> createBuild;createBuild = [&](Build::ptr build) {printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);nrAdded++;if (!store->isValidPath(build->drvPath)) {/* Derivation has been GC'ed prematurely. */printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);if (!build->finishedInDB) {pqxx::work txn(conn);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")(build->id)((int) bsAborted)(time(0))("derivation was garbage-collected prior to build").exec();txn.commit();build->finishedInDB = true;nrBuildsDone++;}return;}std::set<Step::ptr> newSteps;std::set<Path> finishedDrvs; // FIXME: re-use?Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);/* Some of the new steps may be the top level of builds thatwe haven't processed yet. So do them now. This ensures thatif build A depends on build B with top-level step X, then Xwill be "accounted" to B in doBuildStep(). */for (auto & r : newSteps) {while (true) {auto i = newBuilds.find(r->drvPath);if (i == newBuilds.end()) break;Build::ptr b = i->second;newBuilds.erase(i);createBuild(b);}}/* If we didn't get a step, it means the step's outputs areall valid. So we mark this as a finished, cached build. */if (!step) {Derivation drv = readDerivation(build->drvPath);BuildOutput res = getBuildOutput(store, drv);pqxx::work txn(conn);time_t now = time(0);markSucceededBuild(txn, build, res, true, now, now);txn.commit();build->finishedInDB = true;return;}/* If any step has an unsupported system type or has apreviously failed output path, then fail the build rightaway. */bool badStep = false;for (auto & r : newSteps) {BuildStatus buildStatus = bsSuccess;BuildStepStatus buildStepStatus = bssFailed;if (checkCachedFailure(r, conn)) {printMsg(lvlError, format("marking build %1% as cached failure") % build->id);buildStatus = step == r ? bsFailed : bsDepFailed;buildStepStatus = bssFailed;}if (buildStatus == bsSuccess) {bool supported = false;{auto machines_(machines.lock()); // FIXME: use shared_mutexfor (auto & m : *machines_)if (m.second->supportsStep(r)) { supported = true; break; }}if (!supported) {printMsg(lvlError, format("aborting unsupported build %1%") % build->id);buildStatus = bsUnsupported;buildStepStatus = bssUnsupported;}}if (buildStatus != bsSuccess) {time_t now = time(0);if (!build->finishedInDB) {pqxx::work txn(conn);createBuildStep(txn, 0, build, r, "", buildStepStatus);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")(build->id)((int) buildStatus)(now)(buildStatus != bsUnsupported ? 1 : 0).exec();txn.commit();build->finishedInDB = true;nrBuildsDone++;}badStep = true;break;}}if (badStep) return;/* Note: if we exit this scope prior to this, the build andall newly created steps are destroyed. */{auto builds_(builds.lock());if (!build->finishedInDB) // FIXME: can this happen?(*builds_)[build->id] = build;build->toplevel = step;}printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")% build->id % step->drvPath % newSteps.size());};/* Now instantiate build steps for each new build. The builderthreads can start building the runnable build steps right away,even while we're still processing other new builds. */while (!newBuilds.empty()) {auto build = newBuilds.begin()->second;newBuilds.erase(newBuilds.begin());newRunnable.clear();nrAdded = 0;try {createBuild(build);} catch (Error & e) {e.addPrefix(format("while loading build %1%: ") % build->id);throw;}/* Add the new runnable build steps to ‘runnable’ and wake upthe builder threads. */printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);for (auto & r : newRunnable)makeRunnable(r);nrBuildsRead += nrAdded;}}void State::removeCancelledBuilds(Connection & conn){/* Get the current set of queued builds. */std::set<BuildID> currentIds;{pqxx::work txn(conn);auto res = txn.exec("select id from Builds where finished = 0");for (auto const & row : res)currentIds.insert(row["id"].as<BuildID>());}auto builds_(builds.lock());for (auto i = builds_->begin(); i != builds_->end(); ) {if (currentIds.find(i->first) == currentIds.end()) {printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);i = builds_->erase(i);// FIXME: ideally we would interrupt active build steps here.} else++i;}}Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable){if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;/* Check if the requested step already exists. If not, create anew step. In any case, make the step reachable fromreferringBuild or referringStep. This is done atomically (with‘steps’ locked), to ensure that this step can never becomereachable from a new build after doBuildStep has removed itfrom ‘steps’. */Step::ptr step;bool isNew = false;{auto steps_(steps.lock());/* See if the step already exists in ‘steps’ and is notstale. */auto prev = steps_->find(drvPath);if (prev != steps_->end()) {step = prev->second.lock();/* Since ‘step’ is a strong pointer, the referred Stepobject won't be deleted after this. */if (!step) steps_->erase(drvPath); // remove stale entry}/* If it doesn't exist, create it. */if (!step) {step = std::make_shared<Step>();step->drvPath = drvPath;isNew = true;}auto step_(step->state.lock());assert(step_->created != isNew);if (referringBuild)step_->builds.push_back(referringBuild);if (referringStep)step_->rdeps.push_back(referringStep);(*steps_)[drvPath] = step;}if (!isNew) return step;printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);/* Initialize the step. Note that the step may be visible in‘steps’ before this point, but that doesn't matter becauseit's not runnable yet, and other threads won't make itrunnable while step->created == false. */step->drv = readDerivation(drvPath);{auto i = step->drv.env.find("requiredSystemFeatures");if (i != step->drv.env.end())step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);}auto attr = step->drv.env.find("preferLocalBuild");step->preferLocalBuild =attr != step->drv.env.end() && attr->second == "1"&& has(localPlatforms, step->drv.platform);/* Are all outputs valid? */bool valid = true;for (auto & i : step->drv.outputs) {if (!store->isValidPath(i.second.path)) {valid = false;break;}}// FIXME: check whether all outputs are in the binary cache.if (valid) {finishedDrvs.insert(drvPath);return 0;}/* No, we need to build. */printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);newSteps.insert(step);/* Create steps for the dependencies. */for (auto & i : step->drv.inputDrvs) {auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);if (dep) {auto step_(step->state.lock());step_->deps.insert(dep);}}/* If the step has no (remaining) dependencies, make itrunnable. */{auto step_(step->state.lock());assert(!step_->created);step_->created = true;if (step_->deps.empty())newRunnable.insert(step);}return step; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 204[11.14015]→[11.14015:14019](∅→∅),[11.14019]→[11.2288:2329](∅→∅),[11.2329]→[11.14046:14048](∅→∅),[11.14046]→[11.14046:14048](∅→∅),[11.14048]→[11.1515:1598](∅→∅),[11.1598]→[11.10256:10257](∅→∅),[11.10256]→[11.10256:10257](∅→∅),[11.10257]→[11.6558:6604](∅→∅),[11.14048]→[11.6558:6604](∅→∅),[11.6604]→[11.230:262](∅→∅),[11.262]→[11.2650:2683](∅→∅),[11.2650]→[11.2650:2683](∅→∅),[11.2683]→[11.6604:6647](∅→∅),[11.6604]→[11.6604:6647](∅→∅),[11.6647]→[11.2362:2369](∅→∅),[11.2362]→[11.2362:2369](∅→∅),[11.2369]→[11.6648:6725](∅→∅),[11.2457]→[11.14256:14262](∅→∅),[11.6725]→[11.14256:14262](∅→∅),[11.14256]→[11.14256:14262](∅→∅),[11.14262]→[11.2458:2459](∅→∅),[11.2459]→[11.10258:10280](∅→∅),[11.429]→[11.14262:14266](∅→∅),[11.2488]→[11.14262:14266](∅→∅),[11.10280]→[11.14262:14266](∅→∅),[11.14262]→[11.14262:14266](∅→∅),[11.14266]→[11.10281:10306](∅→∅),[11.10306]→[11.2530:2532](∅→∅),[11.2530]→[11.2530:2532](∅→∅),[11.2532]→[11.20:39](∅→∅),[11.39]→[11.1599:1650](∅→∅),[11.1650]→[11.2577:2578](∅→∅),[11.10387]→[11.2577:2578](∅→∅),[11.2577]→[11.2577:2578](∅→∅),[11.2578]→[11.1868:1915](∅→∅),[11.1915]→[11.135:159](∅→∅),[11.159]→[11.10471:10472](∅→∅),[11.1734]→[11.10471:10472](∅→∅),[11.10471]→[11.10471:10472](∅→∅),[11.10472]→[11.160:173](∅→∅),[11.173]→[11.0:67](∅→∅),[11.67]→[11.0:138](∅→∅),[11.138]→[11.204:397](∅→∅),[11.204]→[11.204:397](∅→∅),[11.397]→[11.284:347](∅→∅),[11.284]→[11.284:347](∅→∅),[11.347]→[11.398:442](∅→∅),[11.442]→[11.3677:3765](∅→∅),[11.509]→[11.466:480](∅→∅),[11.3765]→[11.466:480](∅→∅),[11.466]→[11.466:480](∅→∅),[11.480]→[11.10589:10590](∅→∅),[11.10589]→[11.10589:10590](∅→∅),[11.10590]→[11.481:644](∅→∅),[11.644]→[11.1980:1981](∅→∅),[11.1980]→[11.1980:1981](∅→∅),[11.1981]→[11.645:849](∅→∅),[11.849]→[11.10702:10703](∅→∅),[11.10702]→[11.10702:10703](∅→∅),[11.10703]→[11.850:889](∅→∅),[11.889]→[11.2000:2001](∅→∅),[11.2000]→[11.2000:2001](∅→∅),[11.2001]→[11.890:990](∅→∅),[11.990]→[11.510:583](∅→∅),[11.583]→[11.2104:2122](∅→∅),[11.1065]→[11.2104:2122](∅→∅),[11.2104]→[11.2104:2122](∅→∅),[11.2122]→[11.584:742](∅→∅),[11.742]→[11.1210:1237](∅→∅),[11.1210]→[11.1210:1237](∅→∅),[11.1237]→[11.0:45](∅→∅),[11.45]→[11.743:923](∅→∅),[11.103]→[11.1424:1750](∅→∅),[11.923]→[11.1424:1750](∅→∅),[11.1424]→[11.1424:1750](∅→∅),[11.1750]→[11.924:971](∅→∅),[11.971]→[11.1802:1882](∅→∅),[11.1802]→[11.1802:1882](∅→∅),[11.1882]→[11.3766:3851](∅→∅),[11.1050]→[11.1954:2004](∅→∅),[11.3851]→[11.1954:2004](∅→∅),[11.1954]→[11.1954:2004](∅→∅),[11.2004]→[11.2684:2773](∅→∅),[11.2773]→[11.2091:2580](∅→∅),[11.2091]→[11.2091:2580](∅→∅),[11.2580]→[11.1051:1110](∅→∅),[11.1110]→[11.2350:2435](∅→∅),[11.2636]→[11.2350:2435](∅→∅),[11.2350]→[11.2350:2435](∅→∅),[11.2435]→[11.2637:3244](∅→∅),[11.3244]→[11.3852:3956](∅→∅),[11.1200]→[11.3330:3376](∅→∅),[11.3956]→[11.3330:3376](∅→∅),[11.3330]→[11.3330:3376](∅→∅),[11.3376]→[11.3957:4065](∅→∅),[11.4065]→[11.3472:3526](∅→∅),[11.3472]→[11.3472:3526](∅→∅),[11.3526]→[11.10866:10867](∅→∅),[11.10866]→[11.10866:10867](∅→∅),[11.10867]→[11.3527:3592](∅→∅),[11.3592]→[11.11119:11137](∅→∅),[11.11119]→[11.11119:11137](∅→∅),[11.11346]→[11.11346:11347](∅→∅),[11.11347]→[11.3593:3631](∅→∅),[11.3631]→[11.11489:11503](∅→∅),[11.11489]→[11.11489:11503](∅→∅),[11.11503]→[11.3632:3662](∅→∅),[11.3662]→[11.2852:2853](∅→∅),[11.2852]→[11.2852:2853](∅→∅),[11.2853]→[11.11504:11703](∅→∅),[11.11703]→[11.2436:2724](∅→∅),[11.2724]→[11.11744:11754](∅→∅),[11.11744]→[11.11744:11754](∅→∅),[11.11754]→[11.2985:2992](∅→∅),[11.2985]→[11.2985:2992](∅→∅),[11.2992]→[11.11755:11904](∅→∅),[11.11904]→[11.1031:1066](∅→∅),[11.1066]→[11.12574:12578](∅→∅),[11.12574]→[11.12574:12578](∅→∅),[11.12578]→[11.4066:4168](∅→∅),[11.4168]→[11.12651:12653](∅→∅),[11.12651]→[11.12651:12653](∅→∅),[11.12653]→[11.2725:2749](∅→∅),[11.2749]→[11.238:276](∅→∅),[11.276]→[11.12653:12712](∅→∅),[11.2749]→[11.12653:12712](∅→∅),[11.12653]→[11.12653:12712](∅→∅),[11.12712]→[11.4169:4220](∅→∅),[11.2814]→[11.12768:12803](∅→∅),[11.4220]→[11.12768:12803](∅→∅),[11.12768]→[11.12768:12803](∅→∅),[11.12803]→[11.2815:2908](∅→∅),[11.2908]→[11.4221:4281](∅→∅),[11.2981]→[11.12955:13097](∅→∅),[11.4281]→[11.12955:13097](∅→∅),[11.12955]→[11.12955:13097](∅→∅),[11.13097]→[11.2982:3628](∅→∅),[11.3628]→[11.13139:13143](∅→∅),[11.13139]→[11.13139:13143](∅→∅),[11.13143]→[11.3629:3702](∅→∅),[11.3702]→[11.13216:13242](∅→∅),[11.13216]→[11.13216:13242](∅→∅),[11.3117]→[11.14306:14308](∅→∅),[11.13242]→[11.14306:14308](∅→∅),[11.14306]→[11.14306:14308](∅→∅),[11.14308]→[11.2774:2820](∅→∅),[11.2820]→[11.263:295](∅→∅),[11.295]→[11.2851:2891](∅→∅),[11.2851]→[11.2851:2891](∅→∅),[11.2891]→[11.14341:14410](∅→∅),[11.14341]→[11.14341:14410](∅→∅),[11.14410]→[11.7094:7227](∅→∅),[11.7227]→[11.14547:14812](∅→∅),[11.14547]→[11.14547:14812](∅→∅),[11.14812]→[11.7228:7234](∅→∅),[11.7234]→[11.2892:3016](∅→∅),[11.3016]→[11.14856:14857](∅→∅),[11.7286]→[11.14856:14857](∅→∅),[11.14856]→[11.14856:14857](∅→∅),[11.14857]→[11.7287:7389](∅→∅),[11.7389]→[11.3703:3770](∅→∅),[11.3770]→[11.7457:7581](∅→∅),[11.7457]→[11.7457:7581](∅→∅),[11.7581]→[11.3771:4098](∅→∅),[11.4098]→[11.147:243](∅→∅),[11.243]→[11.4099:4124](∅→∅),[11.7770]→[11.4099:4124](∅→∅),[11.4124]→[11.7828:7838](∅→∅),[11.7828]→[11.7828:7838](∅→∅),[11.7838]→[11.15146:15147](∅→∅),[11.15146]→[11.15146:15147](∅→∅),[11.15147]→[11.7839:7955](∅→∅),[11.7955]→[11.15251:15252](∅→∅),[11.15251]→[11.15251:15252](∅→∅),[11.15252]→[11.7956:8005](∅→∅),[11.8005]→[11.15293:15294](∅→∅),[11.15293]→[11.15293:15294](∅→∅),[11.15294]→[11.796:995](∅→∅),[11.995]→[11.8135:8141](∅→∅),[11.2010]→[11.8135:8141](∅→∅),[11.8135]→[11.8135:8141](∅→∅),[11.8141]→[11.1054:1094](∅→∅),[11.1094]→[11.15415:15416](∅→∅),[11.8141]→[11.15415:15416](∅→∅),[11.15415]→[11.15415:15416](∅→∅),[11.454]→[11.678:707](∅→∅),[11.15497]→[11.678:707](∅→∅),[11.707]→[11.97:98](∅→∅),[11.98]→[11.13243:13268](∅→∅),[11.707]→[11.13243:13268](∅→∅),[11.13268]→[10.2284:2305](∅→∅),[10.2305]→[11.120:141](∅→∅),[11.120]→[11.120:141](∅→∅),[11.141]→[11.244:299](∅→∅),[11.299]→[11.142:143](∅→∅),[11.13300]→[11.142:143](∅→∅),[11.143]→[11.4125:4222](∅→∅),[11.4222]→[11.1233:1291](∅→∅),[11.224]→[11.1233:1291](∅→∅),[11.1291]→[11.15702:15703](∅→∅),[11.15702]→[11.15702:15703](∅→∅),[11.15703]→[11.488:511](∅→∅),[11.511]→[10.2306:2358](∅→∅),[10.2358]→[11.569:580](∅→∅),[11.569]→[11.569:580](∅→∅),[11.580]→[11.15902:15903](∅→∅),[11.15902]→[11.15902:15903](∅→∅),[11.15903]→[11.581:984](∅→∅),[11.984]→[11.15934:15935](∅→∅),[11.13706]→[11.15934:15935](∅→∅),[11.15934]→[11.15934:15935](∅→∅),[11.15935]→[11.1067:1095](∅→∅),[11.1095]→[11.985:999](∅→∅),[11.15935]→[11.985:999](∅→∅),[11.999]→[11.996:1069](∅→∅),[11.1069]→[11.1454:1552](∅→∅),[11.617]→[11.1108:1138](∅→∅),[11.699]→[11.1108:1138](∅→∅),[11.1237]→[11.1108:1138](∅→∅),[11.1307]→[11.1108:1138](∅→∅),[11.1364]→[11.1108:1138](∅→∅),[11.1552]→[11.1108:1138](∅→∅),[11.1108]→[11.1108:1138](∅→∅),[11.1138]→[10.2359:2413](∅→∅),[10.2413]→[11.1195:1234](∅→∅),[11.1195]→[11.1195:1234](∅→∅),[11.2041]→[11.1319:1329](∅→∅),[11.4382]→[11.1319:1329](∅→∅),[11.1319]→[11.1319:1329](∅→∅),[11.1329]→[11.16006:16007](∅→∅),[11.3175]→[11.16006:16007](∅→∅),[11.13797]→[11.16006:16007](∅→∅),[11.16006]→[11.16006:16007](∅→∅),[11.16007]→[10.2414:2484](∅→∅),[10.2484]→[11.1469:1475](∅→∅),[11.1469]→[11.1469:1475](∅→∅),[11.1475]→[11.16047:16048](∅→∅),[11.16047]→[11.16047:16048](∅→∅),[11.16048]→[11.300:393](∅→∅),[11.393]→[11.1096:1360](∅→∅),[11.1529]→[11.1096:1360](∅→∅),[11.1360]→[11.1529:1530](∅→∅),[11.1529]→[11.1529:1530](∅→∅),[11.1530]→[11.0:107](∅→∅),[11.107]→[10.2485:2615](∅→∅),[10.2615]→[11.92:159](∅→∅),[11.92]→[11.92:159](∅→∅),[11.159]→[11.108:474](∅→∅),[11.4463]→[11.108:474](∅→∅),[11.474]→[11.1095:1126](∅→∅),[11.1126]→[11.474:509](∅→∅),[11.474]→[11.474:509](∅→∅),[11.509]→[11.4548:4555](∅→∅),[11.4548]→[11.4548:4555](∅→∅),[11.4555]→[10.2616:2644](∅→∅),[10.2644]→[11.3069:3457](∅→∅),[11.3069]→[11.3069:3457](∅→∅),[11.3457]→[11.0:41](∅→∅),[11.41]→[11.3457:3946](∅→∅),[11.3457]→[11.3457:3946](∅→∅),[11.3946]→[3.0:115](∅→∅),[3.115]→[11.4003:4336](∅→∅),[11.4003]→[11.4003:4336](∅→∅),[11.4371]→[11.4371:4421](∅→∅),[11.4448]→[11.4448:4737](∅→∅),[11.4737]→[10.2645:2748](∅→∅),[10.2748]→[11.4801:4861](∅→∅),[11.4801]→[11.4801:4861](∅→∅),[11.575]→[11.8648:8649](∅→∅),[11.4861]→[11.8648:8649](∅→∅),[11.8648]→[11.8648:8649](∅→∅),[11.8649]→[11.4862:4906](∅→∅),[11.4906]→[11.16115:16116](∅→∅),[11.8746]→[11.16115:16116](∅→∅),[11.16115]→[11.16115:16116](∅→∅),[11.16116]→[3.116:156](∅→∅),[3.156]→[11.4907:5190](∅→∅),[11.16116]→[11.4907:5190](∅→∅),[11.5190]→[11.42:85](∅→∅),[11.85]→[11.5190:5204](∅→∅),[11.5190]→[11.5190:5204](∅→∅),[11.5204]→[11.597:608](∅→∅),[11.608]→[11.86:471](∅→∅),[11.471]→[11.8953:8963](∅→∅),[11.840]→[11.8953:8963](∅→∅),[11.5204]→[11.8953:8963](∅→∅),[11.8953]→[11.8953:8963](∅→∅),[11.888]→[11.8969:8970](∅→∅),[11.8969]→[11.8969:8970](∅→∅),[11.8970]→[11.5205:5478](∅→∅),[11.771]→[11.16152:16153](∅→∅),[11.5478]→[11.16152:16153](∅→∅),[11.16152]→[11.16152:16153](∅→∅),[11.16153]→[11.5479:5633](∅→∅),[11.5633]→[11.139:324](∅→∅),[11.324]→[11.296:376](∅→∅),[11.376]→[11.5695:5713](∅→∅),[11.403]→[11.5695:5713](∅→∅),[11.5695]→[11.5695:5713](∅→∅),[11.5713]→[11.16176:16177](∅→∅),[11.13854]→[11.16176:16177](∅→∅),[11.16176]→[11.16176:16177](∅→∅),[11.16177]→[11.5714:5788](∅→∅),[11.1231]→[11.16252:16253](∅→∅),[11.5788]→[11.16252:16253](∅→∅),[11.13955]→[11.16252:16253](∅→∅),[11.16252]→[11.16252:16253](∅→∅),[11.16253]→[11.5789:5802](∅→∅),[11.124]→[11.16587:16588](∅→∅),[11.5802]→[11.16587:16588](∅→∅),[11.14052]→[11.16587:16588](∅→∅),[11.16587]→[11.16587:16588](∅→∅),[11.16588]→[11.5803:5932](∅→∅),[11.5932]→[11.889:933](∅→∅),[11.933]→[11.199:200](∅→∅),[11.1563]→[11.199:200](∅→∅),[11.5932]→[11.199:200](∅→∅),[11.16983]→[11.199:200](∅→∅),[11.200]→[11.5933:5956](∅→∅),[11.1600]→[11.4841:4842](∅→∅),[11.5956]→[11.4841:4842](∅→∅),[11.4841]→[11.4841:4842](∅→∅),[11.4842]→[11.5957:6223](∅→∅),[11.334]→[11.16983:16984](∅→∅),[11.2131]→[11.16983:16984](∅→∅),[11.6223]→[11.16983:16984](∅→∅),[11.16983]→[11.16983:16984](∅→∅),[11.16984]→[11.6224:6517](∅→∅),[11.6517]→[7.0:99](∅→∅),[7.99]→[11.6658:6776](∅→∅),[11.6658]→[11.6658:6776](∅→∅),[11.1192]→[11.2270:2284](∅→∅),[11.6776]→[11.2270:2284](∅→∅),[11.2270]→[11.2270:2284](∅→∅),[11.2284]→[11.6777:6870](∅→∅),[11.6870]→[11.5100:5101](∅→∅),[11.2284]→[11.5100:5101](∅→∅),[11.5101]→[11.6871:6913](∅→∅),[11.6913]→[10.2749:2905](∅→∅),[10.2905]→[11.7104:7154](∅→∅),[11.7104]→[11.7104:7154](∅→∅),[11.7154]→[10.2906:3065](∅→∅),[10.3065]→[11.7348:7349](∅→∅),[11.7348]→[11.7348:7349](∅→∅),[11.7349]→[10.3066:3138](∅→∅),[10.3138]→[11.7420:7451](∅→∅),[11.7420]→[11.7420:7451](∅→∅),[11.7451]→[10.3139:3448](∅→∅),[10.3448]→[11.7519:7894](∅→∅),[11.7519]→[11.7519:7894](∅→∅),[11.7894]→[11.0:79](∅→∅),[11.79]→[4.0:109](∅→∅),[4.109]→[11.140:174](∅→∅),[11.140]→[11.140:174](∅→∅),[11.174]→[11.8030:8106](∅→∅),[11.8030]→[11.8030:8106](∅→∅),[11.8106]→[5.0:92](∅→∅),[5.92]→[11.8176:8533](∅→∅),[11.8176]→[11.8176:8533](∅→∅),[11.8533]→[4.110:166](∅→∅),[4.166]→[11.8533:8625](∅→∅),[11.8533]→[11.8533:8625](∅→∅),[11.990]→[11.8675:8713](∅→∅),[11.1500]→[11.8675:8713](∅→∅),[11.8675]→[11.8675:8713](∅→∅),[11.8713]→[11.1501:1670](∅→∅),[11.1670]→[11.8865:9334](∅→∅),[11.8865]→[11.8865:9334](∅→∅),[11.9334]→[10.3449:3535](∅→∅),[10.3535]→[11.9423:9581](∅→∅),[11.9423]→[11.9423:9581](∅→∅),[11.5297]→[11.2284:2285](∅→∅),[11.9581]→[11.2284:2285](∅→∅),[11.2284]→[11.2284:2285](∅→∅),[11.2285]→[11.9582:9612](∅→∅),[11.9612]→[11.1890:1904](∅→∅),[11.1890]→[11.1890:1904](∅→∅),[11.1904]→[11.2526:2527](∅→∅),[11.6045]→[11.2526:2527](∅→∅),[11.17548]→[11.2526:2527](∅→∅),[11.2527]→[11.9613:9900](∅→∅),[11.9900]→[11.472:519](∅→∅),[11.519]→[11.1127:1179](∅→∅),[11.519]→[11.9900:9914](∅→∅),[11.1179]→[11.9900:9914](∅→∅),[11.9900]→[11.9900:9914](∅→∅),[11.9914]→[11.991:1243](∅→∅),[11.1243]→[11.17548:17558](∅→∅),[11.2868]→[11.17548:17558](∅→∅),[11.9914]→[11.17548:17558](∅→∅),[11.17548]→[11.17548:17558](∅→∅),[11.17558]→[11.1244:1291](∅→∅),[11.1291]→[11.17558:17559](∅→∅),[11.17558]→[11.17558:17559](∅→∅),[11.17582]→[11.17582:17588](∅→∅),[11.2115]→[11.9569:9570](∅→∅),[11.6292]→[11.9569:9570](∅→∅),[11.9569]→[11.9569:9570](∅→∅),[11.9570]→[11.226:272](∅→∅),[11.272]→[11.394:526](∅→∅),[11.9570]→[11.394:526](∅→∅),[11.526]→[11.4282:4462](∅→∅),[11.432]→[11.526:527](∅→∅),[11.4462]→[11.526:527](∅→∅),[11.526]→[11.526:527](∅→∅),[11.527]→[11.1180:1220](∅→∅),[11.527]→[11.2182:2200](∅→∅),[11.1220]→[11.2182:2200](∅→∅),[11.6379]→[11.2182:2200](∅→∅)
}void State::makeRunnable(Step::ptr step){printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);{auto step_(step->state.lock());assert(step_->created);assert(!step->finished);assert(step_->deps.empty());}{auto runnable_(runnable.lock());runnable_->push_back(step);}wakeDispatcher();}void State::dispatcher(){while (true) {printMsg(lvlDebug, "dispatcher woken up");auto sleepUntil = system_time::max();bool keepGoing;do {/* Copy the currentJobs field of each machine. This isnecessary to ensure that the sort comparator below isan ordering. std::sort() can segfault if it isn't. */struct MachineInfo{Machine::ptr machine;unsigned int currentJobs;};std::vector<MachineInfo> machinesSorted;{auto machines_(machines.lock());for (auto & m : *machines_)machinesSorted.push_back({m.second, m.second->state->currentJobs});}/* Sort the machines by a combination of speed factor andavailable slots. Prioritise the available machines asfollows:- First by load divided by speed factor, rounded to thenearest integer. This causes fast machines to bepreferred over slow machines with similar loads.- Then by speed factor.- Finally by load. */sort(machinesSorted.begin(), machinesSorted.end(),[](const MachineInfo & a, const MachineInfo & b) -> bool{float ta = roundf(a.currentJobs / a.machine->speedFactor);float tb = roundf(b.currentJobs / b.machine->speedFactor);returnta != tb ? ta < tb :a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :a.currentJobs > b.currentJobs;});/* Find a machine with a free slot and find a step to runon it. Once we find such a pair, we restart the outerloop because the machine sorting will have changed. */keepGoing = false;system_time now = std::chrono::system_clock::now();for (auto & mi : machinesSorted) {// FIXME: can we lose a wakeup if a builder exits concurrently?if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;auto runnable_(runnable.lock());//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());/* FIXME: we're holding the runnable lock too longhere. This could be more efficient. */for (auto i = runnable_->begin(); i != runnable_->end(); ) {auto step = i->lock();/* Delete dead steps. */if (!step) {i = runnable_->erase(i);continue;}/* Can this machine do this step? */if (!mi.machine->supportsStep(step)) {++i;continue;}/* Skip previously failed steps that aren't readyto be retried. */{auto step_(step->state.lock());if (step_->tries > 0 && step_->after > now) {if (step_->after < sleepUntil)sleepUntil = step_->after;++i;continue;}}/* Make a slot reservation and start a thread todo the build. */auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);i = runnable_->erase(i);auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);builderThread.detach(); // FIXME?keepGoing = true;break;}if (keepGoing) break;}} while (keepGoing);/* Sleep until we're woken up (either because a runnable buildis added, or because a build finishes). */{std::unique_lock<std::mutex> lock(dispatcherMutex);printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());dispatcherWakeup.wait_until(lock, sleepUntil);nrDispatcherWakeups++;}}printMsg(lvlError, "dispatcher exits");}void State::wakeDispatcher(){{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrierdispatcherWakeup.notify_one();}void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation){bool retry = true;MaintainCount mc(nrActiveSteps);try {auto store = openStore(); // FIXME: poolretry = doBuildStep(store, step, machine);} catch (std::exception & e) {printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")% step->drvPath % machine->sshName % e.what());}/* Release the machine and wake up the dispatcher. */assert(reservation.unique());reservation = 0;wakeDispatcher();/* If there was a temporary failure, retry the step after anexponentially increasing interval. */if (retry) {{auto step_(step->state.lock());step_->tries++;nrRetries++;if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomicint delta = retryInterval * powf(retryBackoff, step_->tries - 1);printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);}makeRunnable(step);}}bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine){{auto step_(step->state.lock());assert(step_->created);assert(!step->finished);}/* There can be any number of builds in the database that dependon this derivation. Arbitrarily pick one (though preferring abuild of which this is the top-level derivation) for thepurpose of creating build steps. We could create a build steprecord for every build, but that could be very expensive(e.g. a stdenv derivation can be a dependency of tens ofthousands of builds), so we don't. */Build::ptr build;{std::set<Build::ptr> dependents;std::set<Step::ptr> steps;getDependents(step, dependents, steps);if (dependents.empty()) {/* Apparently all builds that depend on this derivationare gone (e.g. cancelled). So don't bother. This isvery unlikely to happen, because normally Steps areonly kept alive by being reachable from aBuild. However, it's possible that a new Build justcreated a reference to this step. So to handle thatpossibility, we retry this step (putting it back inthe runnable queue). If there are really no strongpointers to the step, it will be deleted. */printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);return true;}for (auto build2 : dependents)if (build2->drvPath == step->drvPath) { build = build2; break; }if (!build) build = *dependents.begin();printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")% step->drvPath % machine->sshName % build->id % (dependents.size() - 1));}bool quit = build->id == buildOne;auto conn(dbPool.get());RemoteResult result;BuildOutput res;int stepNr = 0;time_t stepStartTime = result.startTime = time(0);/* If any of the outputs have previously failed, then don't botherbuilding again. */bool cachedFailure = checkCachedFailure(step, *conn);if (cachedFailure)result.status = BuildResult::CachedFailure;else {/* Create a build step record indicating that we startedbuilding. Also, mark the selected build as busy. */{pqxx::work txn(*conn);stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();txn.commit();}/* Do the build. */try {/* FIXME: referring builds may have conflicting timeouts. */buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);} catch (Error & e) {result.status = BuildResult::MiscFailure;result.errorMsg = e.msg();}if (result.success()) res = getBuildOutput(store, step->drv);}time_t stepStopTime = time(0);if (!result.stopTime) result.stopTime = stepStopTime;/* Asynchronously compress the log. */if (result.logFile != "") {{auto logCompressorQueue_(logCompressorQueue.lock());logCompressorQueue_->push(result.logFile);}logCompressorWakeup.notify_one();}/* The step had a hopefully temporary failure (e.g. networkissue). Retry a number of times. */if (result.canRetry()) {printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")% step->drvPath % machine->sshName % result.errorMsg);bool retry;{auto step_(step->state.lock());retry = step_->tries + 1 < maxTries;}if (retry) {pqxx::work txn(*conn);finishBuildStep(txn, result.startTime, result.stopTime, build->id,stepNr, machine->sshName, bssAborted, result.errorMsg);txn.commit();if (quit) exit(1);return true;}}if (result.success()) {/* Register success in the database for all Build objects thathave this step as the top-level step. Since the queuemonitor thread may be creating new referring Buildsconcurrently, and updating the database may fail, we dothis in a loop, marking all known builds, repeating untilthere are no unmarked builds.*/std::vector<BuildID> buildIDs;while (true) {/* Get the builds that have this one as the top-level. */std::vector<Build::ptr> direct;{auto steps_(steps.lock());auto step_(step->state.lock());for (auto & b_ : step_->builds) {auto b = b_.lock();if (b && !b->finishedInDB) direct.push_back(b);}/* If there are no builds left to update in the DB,then we're done (except for callingfinishBuildStep()). Delete the step from‘steps’. Since we've been holding the ‘steps’ lock,no new referrers can have been added in themeantime or be added afterwards. */if (direct.empty()) {printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);steps_->erase(step->drvPath);}}/* Update the database. */{pqxx::work txn(*conn);finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);for (auto & b : direct)markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,result.startTime, result.stopTime);txn.commit();}if (direct.empty()) break;/* Remove the direct dependencies from ‘builds’. This willcause them to be destroyed. */for (auto & b : direct) {auto builds_(builds.lock());b->finishedInDB = true;builds_->erase(b->id);buildIDs.push_back(b->id);}}/* Send notification about the builds that have this step asthe top-level. */for (auto id : buildIDs) {{auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));}notificationSenderWakeup.notify_one();}/* Wake up any dependent steps that have no otherdependencies. */{auto step_(step->state.lock());for (auto & rdepWeak : step_->rdeps) {auto rdep = rdepWeak.lock();if (!rdep) continue;bool runnable = false;{auto rdep_(rdep->state.lock());rdep_->deps.erase(step);/* Note: if the step has not finishedinitialisation yet, it will be made runnable increateStep(), if appropriate. */if (rdep_->deps.empty() && rdep_->created) runnable = true;}if (runnable) makeRunnable(rdep);}}} else {/* Register failure in the database for all Build objects thatdirectly or indirectly depend on this step. */std::vector<BuildID> dependentIDs;while (true) {/* Get the builds and steps that depend on this step. */std::set<Build::ptr> indirect;{auto steps_(steps.lock());std::set<Step::ptr> steps;getDependents(step, indirect, steps);/* If there are no builds left, delete all referringsteps from ‘steps’. As for the success case, we canbe certain no new referrers can be added. */if (indirect.empty()) {for (auto & s : steps) {printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);steps_->erase(s->drvPath);}break;}}/* Update the database. */{pqxx::work txn(*conn);BuildStatus buildStatus =result.status == BuildResult::TimedOut ? bsTimedOut :result.canRetry() ? bsAborted :bsFailed;BuildStepStatus buildStepStatus =result.status == BuildResult::TimedOut ? bssTimedOut :result.canRetry() ? bssAborted :bssFailed;/* For standard failures, we don't care about the errormessage. */if (result.status == BuildResult::PermanentFailure ||result.status == BuildResult::TransientFailure ||result.status == BuildResult::CachedFailure ||result.status == BuildResult::TimedOut)result.errorMsg = "";/* Create failed build steps for every build that dependson this. For cached failures, only create a step forbuilds that don't have this step as top-level(otherwise the user won't be able to see what causedthe build to fail). */for (auto & build2 : indirect) {if ((cachedFailure && build2->drvPath == step->drvPath) ||(!cachedFailure && build == build2) ||build2->finishedInDB)continue;createBuildStep(txn, 0, build2, step, machine->sshName,buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);}if (!cachedFailure)finishBuildStep(txn, result.startTime, result.stopTime, build->id,stepNr, machine->sshName, buildStepStatus, result.errorMsg);/* Mark all builds that depend on this derivation as failed. */for (auto & build2 : indirect) {if (build2->finishedInDB) continue;printMsg(lvlError, format("marking build %1% as failed") % build2->id);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")(build2->id)((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))(result.startTime)(result.stopTime)(cachedFailure ? 1 : 0).exec();nrBuildsDone++;}/* Remember failed paths in the database so that theywon't be built again. */if (!cachedFailure && result.status == BuildResult::PermanentFailure)for (auto & path : outputPaths(step->drv))txn.parameterized("insert into FailedPaths values ($1)")(path).exec();txn.commit();}/* Remove the indirect dependencies from ‘builds’. Thiswill cause them to be destroyed. */for (auto & b : indirect) {auto builds_(builds.lock());b->finishedInDB = true;builds_->erase(b->id);dependentIDs.push_back(b->id);if (buildOne == b->id) quit = true;}}/* Send notification about this build and its dependents. */{auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));}notificationSenderWakeup.notify_one();}// FIXME: keep stats about aborted steps?nrStepsDone++;totalStepTime += stepStopTime - stepStartTime;totalStepBuildTime += result.stopTime - result.startTime;machine->state->nrStepsDone++;machine->state->totalStepTime += stepStopTime - stepStartTime;machine->state->totalStepBuildTime += result.stopTime - result.startTime;if (quit) exit(0); // testing hackreturn false; - file addition: queue-monitor.cc[11.187]
#include "state.hh"#include "build-result.hh"using namespace nix;void State::queueMonitor(){while (true) {try {queueMonitorLoop();} catch (std::exception & e) {printMsg(lvlError, format("queue monitor: %1%") % e.what());sleep(10); // probably a DB problem, so don't retry right away}}}void State::queueMonitorLoop(){auto conn(dbPool.get());receiver buildsAdded(*conn, "builds_added");receiver buildsRestarted(*conn, "builds_restarted");receiver buildsCancelled(*conn, "builds_cancelled");receiver buildsDeleted(*conn, "builds_deleted");auto store = openStore(); // FIXME: poolunsigned int lastBuildId = 0;while (true) {getQueuedBuilds(*conn, store, lastBuildId);/* Sleep until we get notification from the database about anevent. */conn->await_notification();nrQueueWakeups++;if (buildsAdded.get())printMsg(lvlTalkative, "got notification: new builds added to the queue");if (buildsRestarted.get()) {printMsg(lvlTalkative, "got notification: builds restarted");lastBuildId = 0; // check all builds}if (buildsCancelled.get() || buildsDeleted.get()) {printMsg(lvlTalkative, "got notification: builds cancelled");removeCancelledBuilds(*conn);}}}void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId){printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);/* Grab the queued builds from the database, but don't processthem yet (since we don't want a long-running transaction). */std::multimap<Path, Build::ptr> newBuilds;{pqxx::work txn(conn);auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();for (auto const & row : res) {auto builds_(builds.lock());BuildID id = row["id"].as<BuildID>();if (buildOne && id != buildOne) continue;if (id > lastBuildId) lastBuildId = id;if (has(*builds_, id)) continue;auto build = std::make_shared<Build>();build->id = id;build->drvPath = row["drvPath"].as<string>();build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();build->maxSilentTime = row["maxsilent"].as<int>();build->buildTimeout = row["timeout"].as<int>();newBuilds.emplace(std::make_pair(build->drvPath, build));}}std::set<Step::ptr> newRunnable;unsigned int nrAdded;std::function<void(Build::ptr)> createBuild;createBuild = [&](Build::ptr build) {printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);nrAdded++;if (!store->isValidPath(build->drvPath)) {/* Derivation has been GC'ed prematurely. */printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);if (!build->finishedInDB) {pqxx::work txn(conn);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")(build->id)((int) bsAborted)(time(0))("derivation was garbage-collected prior to build").exec();txn.commit();build->finishedInDB = true;nrBuildsDone++;}return;}std::set<Step::ptr> newSteps;std::set<Path> finishedDrvs; // FIXME: re-use?Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);/* Some of the new steps may be the top level of builds thatwe haven't processed yet. So do them now. This ensures thatif build A depends on build B with top-level step X, then Xwill be "accounted" to B in doBuildStep(). */for (auto & r : newSteps) {while (true) {auto i = newBuilds.find(r->drvPath);if (i == newBuilds.end()) break;Build::ptr b = i->second;newBuilds.erase(i);createBuild(b);}}/* If we didn't get a step, it means the step's outputs areall valid. So we mark this as a finished, cached build. */if (!step) {Derivation drv = readDerivation(build->drvPath);BuildOutput res = getBuildOutput(store, drv);pqxx::work txn(conn);time_t now = time(0);markSucceededBuild(txn, build, res, true, now, now);txn.commit();build->finishedInDB = true;return;}/* If any step has an unsupported system type or has apreviously failed output path, then fail the build rightaway. */bool badStep = false;for (auto & r : newSteps) {BuildStatus buildStatus = bsSuccess;BuildStepStatus buildStepStatus = bssFailed;if (checkCachedFailure(r, conn)) {printMsg(lvlError, format("marking build %1% as cached failure") % build->id);buildStatus = step == r ? bsFailed : bsDepFailed;buildStepStatus = bssFailed;}if (buildStatus == bsSuccess) {bool supported = false;{auto machines_(machines.lock()); // FIXME: use shared_mutexfor (auto & m : *machines_)if (m.second->supportsStep(r)) { supported = true; break; }}if (!supported) {printMsg(lvlError, format("aborting unsupported build %1%") % build->id);buildStatus = bsUnsupported;buildStepStatus = bssUnsupported;}}if (buildStatus != bsSuccess) {time_t now = time(0);if (!build->finishedInDB) {pqxx::work txn(conn);createBuildStep(txn, 0, build, r, "", buildStepStatus);txn.parameterized("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")(build->id)((int) buildStatus)(now)(buildStatus != bsUnsupported ? 1 : 0).exec();txn.commit();build->finishedInDB = true;nrBuildsDone++;}badStep = true;break;}}if (badStep) return;/* Note: if we exit this scope prior to this, the build andall newly created steps are destroyed. */{auto builds_(builds.lock());if (!build->finishedInDB) // FIXME: can this happen?(*builds_)[build->id] = build;build->toplevel = step;}printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")% build->id % step->drvPath % newSteps.size());};/* Now instantiate build steps for each new build. The builderthreads can start building the runnable build steps right away,even while we're still processing other new builds. */while (!newBuilds.empty()) {auto build = newBuilds.begin()->second;newBuilds.erase(newBuilds.begin());newRunnable.clear();nrAdded = 0;try {createBuild(build);} catch (Error & e) {e.addPrefix(format("while loading build %1%: ") % build->id);throw;}/* Add the new runnable build steps to ‘runnable’ and wake upthe builder threads. */printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);for (auto & r : newRunnable)makeRunnable(r);nrBuildsRead += nrAdded;}}void State::removeCancelledBuilds(Connection & conn){/* Get the current set of queued builds. */std::set<BuildID> currentIds;{pqxx::work txn(conn);auto res = txn.exec("select id from Builds where finished = 0");for (auto const & row : res)currentIds.insert(row["id"].as<BuildID>());}auto builds_(builds.lock());for (auto i = builds_->begin(); i != builds_->end(); ) {if (currentIds.find(i->first) == currentIds.end()) {printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);i = builds_->erase(i);// FIXME: ideally we would interrupt active build steps here.} else++i;}}Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable){if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;/* Check if the requested step already exists. If not, create anew step. In any case, make the step reachable fromreferringBuild or referringStep. This is done atomically (with‘steps’ locked), to ensure that this step can never becomereachable from a new build after doBuildStep has removed itfrom ‘steps’. */Step::ptr step;bool isNew = false;{auto steps_(steps.lock());/* See if the step already exists in ‘steps’ and is notstale. */auto prev = steps_->find(drvPath);if (prev != steps_->end()) {step = prev->second.lock();/* Since ‘step’ is a strong pointer, the referred Stepobject won't be deleted after this. */if (!step) steps_->erase(drvPath); // remove stale entry}/* If it doesn't exist, create it. */if (!step) {step = std::make_shared<Step>();step->drvPath = drvPath;isNew = true;}auto step_(step->state.lock());assert(step_->created != isNew);if (referringBuild)step_->builds.push_back(referringBuild);if (referringStep)step_->rdeps.push_back(referringStep);(*steps_)[drvPath] = step;}if (!isNew) return step;printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);/* Initialize the step. Note that the step may be visible in‘steps’ before this point, but that doesn't matter becauseit's not runnable yet, and other threads won't make itrunnable while step->created == false. */step->drv = readDerivation(drvPath);{auto i = step->drv.env.find("requiredSystemFeatures");if (i != step->drv.env.end())step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);}auto attr = step->drv.env.find("preferLocalBuild");step->preferLocalBuild =attr != step->drv.env.end() && attr->second == "1"&& has(localPlatforms, step->drv.platform);/* Are all outputs valid? */bool valid = true;for (auto & i : step->drv.outputs) {if (!store->isValidPath(i.second.path)) {valid = false;break;}}// FIXME: check whether all outputs are in the binary cache.if (valid) {finishedDrvs.insert(drvPath);return 0;}/* No, we need to build. */printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);newSteps.insert(step);/* Create steps for the dependencies. */for (auto & i : step->drv.inputDrvs) {auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);if (dep) {auto step_(step->state.lock());step_->deps.insert(dep);}}/* If the step has no (remaining) dependencies, make itrunnable. */{auto step_(step->state.lock());assert(!step_->created);step_->created = true;if (step_->deps.empty())newRunnable.insert(step);}return step;} - edit in src/hydra-queue-runner/state.hh at line 119
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps); - edit in src/hydra-queue-runner/state.hh at line 164
// FIXME: Make configurable.const unsigned int maxTries = 5;const unsigned int retryInterval = 60; // secondsconst float retryBackoff = 3.0;const unsigned int maxParallelCopyClosure = 4; - edit in src/hydra-queue-runner/state.hh at line 318[11.9202]
template <class C, class V>bool has(const C & c, const V & v){return c.find(v) != c.end();}