MHVIT4JYWUYD4UCGB2AHLXWLX6B5SYE22BREERNGANT7RGGDUFOAC
GYT6YOZND2F2XXYZYRLDEAZLBQMEW7JOAINGH2NZ6EASI4DF63JAC
NIXRSHRKEGTFYKVF3GF2TZO444PSVCB6AJJDPJWFZWE3OWRGOHUAC
O7MWBXF6EEZCQQV4EXUVEEZMHEHDVJIXN6OWPNFFLH4YQTTHRZ2AC
EXBGZYRJVE3S4WRICS3ZOLLKEWEPB6D2AOMPHEFOW7XHOTTD3LIQC
XHBBT2HEM4O6YL5BAM7HFUNBKOYPWIEWRGDMZFZXZC3Y4FPDQUNQC
WDWPHWQ2MGKFXQDNQIVCJEBGGWRR2KI4BXEAUDQUL5K6LL3365WQC
2AZSF326LF44FIXRMC3QU3D2TA3ZCNNAJYI23226GLMKVEGOPFUQC
RVHBV3B3RA3QTPQFCTICFFG5QGRUZ7EGXZNR4W2MOM2HPBHPZV7AC
NAYQT2GTCJPBFRSK7CBFX655F2NGTBPICJSCYG2CSCQ5NRDHZG6QC
24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC
7VQ4ALFYKJBFR46T3WZDMGOXNRR3QNJEJQVBYJM4HSJUOOUD6WBQC
N4IROACVZ4MU73J5SM6WXJMKQSFR3VN5SOKENNNZNEGMTGB2Q3HAC
NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHAC
OCZ4LSGGSCMSLGC3C32D5JUYYHS5CIPOKOAMADEFAFZOFXJ3YY3AC
NNOCZ4ROWC64ZKSAE2MPHZ3LGLI34C5TJJFW4MHXN6OELK6VMWOQC
HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC
ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC
MB3TISH2KYBIGY6XJKMN4HO2S6TCN2GORJENMECCKLXGGIRS2O2AC
62MQPRXCZCP7ZQKOOAFU5V36P7DBS6RCFDYK53PDCFGI4FAOQWMQC
KBZHIGLGHGLST5AZZDEJTYBJSQNE2XYNHEN2FN6XMAMY5BJYZR6QC
5AIYUMTBY6TFQTBRP3MJ2PYWUMRF57I77NIVWYE74UMEVQMBWZVQC
T2EIYJNGPIANHKJ4HBJIPTINWKG7RDLHR3PVHFYAPPLHZAJQBVWAC
GKZN4UV75GV7GEHQGBM2O6UHN6CVSHA7BMASCUQSDIDYVUKKZL7AC
ATJ54SPXPE2IIFRERUOBFF42HBSEADP4QOI743ZBUNBQX3PYKRXQC
RQUAATWBGEP3YT4F555XLJYRRRGHDTEILHFORES7AM2XAOVMVJSAC
C6HOMHZWMSC7ORGFUF5YG2ACKV2SCP26HL3UH6VXH6RNDYRXH5DAC
2IQRXLWETU2RLXPYKBSZIUULDHS346BAHE2NLOMOWXC6WDAX2BYQC
YZAI5GQU3HNMK5MEGF2Y7WS445AN4YKD3HNJQVQP545ODN3F5DLAC
JAUB2FT5SNQWXR24TWTOHVT6UUJ6OQPAFPBZIQAQBHVWHWWFHLYAC
HHOMBU7GGRAEXODSDY3WUHQGOSQ35OTGRNBWKKAS2D4YEIZTTNUAC
WDNUKCTNTDUCQMZYHZIGKFGREVQPHW5DAZ5SASANIHBMFQF7GPQQC
N5O7VEEOY2IE27VCOYRBG7YCY3K7JMQEDEMRT4OQ2MUE3NWULHHQC
HPJKBFZ4C24DNBFESAVVFHOBHXOPYOP4XNPFVGOLA7EPLZCIRS6QC
PQFOMNTLFY4HINJFAQYIFTBTSDRST6W2WNCVKE5ITR2IDF4SWWXQC
4D7CHQ345ZSI52GPNW73JOYALRLFEGI2YUDWKG4UCDE55MO5LQ2AC
HUUZFPPKGHTXFZMZCO2UGWYNGEED3E2CFHQRFQVVBJGPQVGVY4UAC
22LDPAIPRVSZVZXEDCM54GUOH72VQ52EIY47PQV2VELZ3NORC5IQC
IWB3F4Z6QZYHQFJ6FWZTGWLCPBYEUTFLUS3F7QT7JOA4DV4YLYGAC
HLSHCK3CP4RB6VO6N44FBHWMXBJYB6XPNWM5EKCKWU3KISU37N3AC
FKLICOHYATLMF3Q6YBQAOVKBRTCDY2TLKHML3B4DUI7YX4H4MQCAC
SODOV2CMWA4JMIKRQNJ6MD3U3BS2XTSLINLRAG4SFY742IIJNI5QC
UQQ4IL55WHYMXNSPOXEFBTZAPMP7LQ726THOR7INRCJDSYVOP3ZAC
TKA75HWSC2NRTRYQU6UNT7V55VPH25CQIE7H3CKI34E4ZJEDMQQAC
WFYMBNWBSHJ6GF7MPAGKT5H2CMNLNTQLBVJGVA5N6J4IWEL4ND3QC
4LAUAXO5L34OXSUDUQI656GA7YGU54LIAG7GSPC5YJ4ZK24W7EAQC
SK6WHODMWAUL7LNH6QTUXZW7GQFNKKDYN4BU45ODTMMKYMG6O7KAC
ODCBSLFGJGNOMKPZV6TS2SVTQMVUXR3K2BQJHNAJHTR4HECDBBNAC
A3IIKGSGLY6TI4YCUMV6YCOE3DKXW6UTCDCLMQ6RRNGDOIZUWHFAC
UPZXMQDENQB7CDYOYRQ322OTFUIWEPESAKANJ23LA6QAYDIL25HQC
GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC
WKJFPR77WNFJHZ65NV3DDMCYYUUWUSI3BHTVCZZDWQI6L3FVP5AQC
63W4T5PUSRHU53CVVTVRWAQX6T74RIHH636NCGUGPN3YFVMC3VTAC
FQQRJUO4C7655SFAKPRPILALVEVRQSIIVBLMQUFPODUBXPIMWYSAC
RYTQLATYOZ6ODIKYVJ63TC4OIQBXHSCV3NA2YD4NFP7443GQVSRQC
7LB6QBXYOGU43UDLJDJQZFGT4XDALULXDF3WX3WWHL7X3JTB54CQC
LE4VZIY5VZ52FOP5QQRIJINWIMWTAPRTZTGO77JXUEPGRPRSQYMAC
A2GL5FOZ3UJ2NM5RPRWTNPFTKLBA54B2UC6UIYO4M3N3RFNC4BTAC
QJRDO2B4RGTXPSSK2SM6PQ6VEJNOLNMG2EFNMBGNRSEI74KKLVRAC
UFUAO7NDDGSCKOALXA62AVDYHTMX4RWUDSYJ6H4QHOAGZRMELGAAC
7I7XHQAE62GIKLJ2JUBT4TYX4GTZRZVIQSDKRT52CM6SVUPNRDKAC
IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC
TM6WKSP3QPVJOZULDTV4ZKBT446IFYU76HJWXOFZ2H2JSKZRVLDAC
LJILHOJ7QYXNTMKDQS6OU4PVCXPQ27C5KMQWHDBELCYP7FG3ZFYAC
E7WP35SFE6G3UWCYUH7HLUUIXSVXPPQ5BQVYS6OD6IMIK7EX25CAC
O64P4XJSK56UN73VA5JUAM4RHFDWOZJDDYKVOKMUB736ODOAUYNAC
#include <cmath>
#include "state.hh"
#include "build-result.hh"
using namespace nix;
void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation)
{
bool retry = true;
MaintainCount mc(nrActiveSteps);
try {
auto store = openStore(); // FIXME: pool
retry = doBuildStep(store, step, machine);
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
% step->drvPath % machine->sshName % e.what());
}
/* Release the machine and wake up the dispatcher. */
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
/* If there was a temporary failure, retry the step after an
exponentially increasing interval. */
if (retry) {
{
auto step_(step->state.lock());
step_->tries++;
nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
int delta = retryInterval * powf(retryBackoff, step_->tries - 1);
printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
}
makeRunnable(step);
}
}
bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine)
{
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
}
/* There can be any number of builds in the database that depend
on this derivation. Arbitrarily pick one (though preferring a
build of which this is the top-level derivation) for the
purpose of creating build steps. We could create a build step
record for every build, but that could be very expensive
(e.g. a stdenv derivation can be a dependency of tens of
thousands of builds), so we don't. */
Build::ptr build;
{
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
if (dependents.empty()) {
/* Apparently all builds that depend on this derivation
are gone (e.g. cancelled). So don't bother. This is
very unlikely to happen, because normally Steps are
only kept alive by being reachable from a
Build. However, it's possible that a new Build just
created a reference to this step. So to handle that
possibility, we retry this step (putting it back in
the runnable queue). If there are really no strong
pointers to the step, it will be deleted. */
printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);
return true;
}
for (auto build2 : dependents)
if (build2->drvPath == step->drvPath) { build = build2; break; }
if (!build) build = *dependents.begin();
printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")
% step->drvPath % machine->sshName % build->id % (dependents.size() - 1));
}
bool quit = build->id == buildOne;
auto conn(dbPool.get());
RemoteResult result;
BuildOutput res;
int stepNr = 0;
time_t stepStartTime = result.startTime = time(0);
/* If any of the outputs have previously failed, then don't bother
building again. */
bool cachedFailure = checkCachedFailure(step, *conn);
if (cachedFailure)
result.status = BuildResult::CachedFailure;
else {
/* Create a build step record indicating that we started
building. Also, mark the selected build as busy. */
{
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();
txn.commit();
}
/* Do the build. */
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
} catch (Error & e) {
result.status = BuildResult::MiscFailure;
result.errorMsg = e.msg();
}
if (result.success()) res = getBuildOutput(store, step->drv);
}
time_t stepStopTime = time(0);
if (!result.stopTime) result.stopTime = stepStopTime;
/* Asynchronously compress the log. */
if (result.logFile != "") {
{
auto logCompressorQueue_(logCompressorQueue.lock());
logCompressorQueue_->push(result.logFile);
}
logCompressorWakeup.notify_one();
}
/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry()) {
printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")
% step->drvPath % machine->sshName % result.errorMsg);
bool retry;
{
auto step_(step->state.lock());
retry = step_->tries + 1 < maxTries;
}
if (retry) {
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, bssAborted, result.errorMsg);
txn.commit();
if (quit) exit(1);
return true;
}
}
if (result.success()) {
/* Register success in the database for all Build objects that
have this step as the top-level step. Since the queue
monitor thread may be creating new referring Builds
concurrently, and updating the database may fail, we do
this in a loop, marking all known builds, repeating until
there are no unmarked builds.
*/
std::vector<BuildID> buildIDs;
while (true) {
/* Get the builds that have this one as the top-level. */
std::vector<Build::ptr> direct;
{
auto steps_(steps.lock());
auto step_(step->state.lock());
for (auto & b_ : step_->builds) {
auto b = b_.lock();
if (b && !b->finishedInDB) direct.push_back(b);
}
/* If there are no builds left to update in the DB,
then we're done (except for calling
finishBuildStep()). Delete the step from
‘steps’. Since we've been holding the ‘steps’ lock,
no new referrers can have been added in the
meantime or be added afterwards. */
if (direct.empty()) {
printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);
steps_->erase(step->drvPath);
}
}
/* Update the database. */
{
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);
for (auto & b : direct)
markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,
result.startTime, result.stopTime);
txn.commit();
}
if (direct.empty()) break;
/* Remove the direct dependencies from ‘builds’. This will
cause them to be destroyed. */
for (auto & b : direct) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
buildIDs.push_back(b->id);
}
}
/* Send notification about the builds that have this step as
the top-level. */
for (auto id : buildIDs) {
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));
}
notificationSenderWakeup.notify_one();
}
/* Wake up any dependent steps that have no other
dependencies. */
{
auto step_(step->state.lock());
for (auto & rdepWeak : step_->rdeps) {
auto rdep = rdepWeak.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
rdep_->deps.erase(step);
/* Note: if the step has not finished
initialisation yet, it will be made runnable in
createStep(), if appropriate. */
if (rdep_->deps.empty() && rdep_->created) runnable = true;
}
if (runnable) makeRunnable(rdep);
}
}
} else {
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
std::vector<BuildID> dependentIDs;
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
/* If there are no builds left, delete all referring
steps from ‘steps’. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);
steps_->erase(s->drvPath);
}
break;
}
}
/* Update the database. */
{
pqxx::work txn(*conn);
BuildStatus buildStatus =
result.status == BuildResult::TimedOut ? bsTimedOut :
result.canRetry() ? bsAborted :
bsFailed;
BuildStepStatus buildStepStatus =
result.status == BuildResult::TimedOut ? bssTimedOut :
result.canRetry() ? bssAborted :
bssFailed;
/* For standard failures, we don't care about the error
message. */
if (result.status == BuildResult::PermanentFailure ||
result.status == BuildResult::TransientFailure ||
result.status == BuildResult::CachedFailure ||
result.status == BuildResult::TimedOut)
result.errorMsg = "";
/* Create failed build steps for every build that depends
on this. For cached failures, only create a step for
builds that don't have this step as top-level
(otherwise the user won't be able to see what caused
the build to fail). */
for (auto & build2 : indirect) {
if ((cachedFailure && build2->drvPath == step->drvPath) ||
(!cachedFailure && build == build2) ||
build2->finishedInDB)
continue;
createBuildStep(txn, 0, build2, step, machine->sshName,
buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);
}
if (!cachedFailure)
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, buildStepStatus, result.errorMsg);
/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
if (build2->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
(build2->id)
((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
(result.startTime)
(result.stopTime)
(cachedFailure ? 1 : 0).exec();
nrBuildsDone++;
}
/* Remember failed paths in the database so that they
won't be built again. */
if (!cachedFailure && result.status == BuildResult::PermanentFailure)
for (auto & path : outputPaths(step->drv))
txn.parameterized("insert into FailedPaths values ($1)")(path).exec();
txn.commit();
}
/* Remove the indirect dependencies from ‘builds’. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
}
}
/* Send notification about this build and its dependents. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));
}
notificationSenderWakeup.notify_one();
}
// FIXME: keep stats about aborted steps?
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;
if (quit) exit(0); // testing hack
return false;
}
#include <algorithm>
#include <thread>
#include "state.hh"
using namespace nix;
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
assert(step_->deps.empty());
}
{
auto runnable_(runnable.lock());
runnable_->push_back(step);
}
wakeDispatcher();
}
void State::dispatcher()
{
while (true) {
printMsg(lvlDebug, "dispatcher woken up");
auto sleepUntil = system_time::max();
bool keepGoing;
do {
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_)
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Then by speed factor.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
system_time now = std::chrono::system_clock::now();
for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
++i;
continue;
}
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
builderThread.detach(); // FIXME?
keepGoing = true;
break;
}
if (keepGoing) break;
}
} while (keepGoing);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
std::unique_lock<std::mutex> lock(dispatcherMutex);
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup.wait_until(lock, sleepUntil);
nrDispatcherWakeups++;
}
}
printMsg(lvlError, "dispatcher exits");
}
void State::wakeDispatcher()
{
{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
dispatcherWakeup.notify_one();
}
// FIXME: Make configurable.
const unsigned int maxTries = 5;
const unsigned int retryInterval = 60; // seconds
const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
template <class C, class V>
bool has(const C & c, const V & v)
{
return c.find(v) != c.end();
}
}
void State::queueMonitor()
{
while (true) {
try {
queueMonitorLoop();
} catch (std::exception & e) {
printMsg(lvlError, format("queue monitor: %1%") % e.what());
sleep(10); // probably a DB problem, so don't retry right away
}
}
}
void State::queueMonitorLoop()
{
auto conn(dbPool.get());
receiver buildsAdded(*conn, "builds_added");
receiver buildsRestarted(*conn, "builds_restarted");
receiver buildsCancelled(*conn, "builds_cancelled");
receiver buildsDeleted(*conn, "builds_deleted");
auto store = openStore(); // FIXME: pool
unsigned int lastBuildId = 0;
while (true) {
getQueuedBuilds(*conn, store, lastBuildId);
/* Sleep until we get notification from the database about an
event. */
conn->await_notification();
nrQueueWakeups++;
if (buildsAdded.get())
printMsg(lvlTalkative, "got notification: new builds added to the queue");
if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
}
if (buildsCancelled.get() || buildsDeleted.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled");
removeCancelledBuilds(*conn);
}
}
}
void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
{
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::multimap<Path, Build::ptr> newBuilds;
{
pqxx::work txn(conn);
auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
for (auto const & row : res) {
auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue;
if (id > lastBuildId) lastBuildId = id;
if (has(*builds_, id)) continue;
auto build = std::make_shared<Build>();
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
build->maxSilentTime = row["maxsilent"].as<int>();
build->buildTimeout = row["timeout"].as<int>();
newBuilds.emplace(std::make_pair(build->drvPath, build));
}
}
std::set<Step::ptr> newRunnable;
unsigned int nrAdded;
std::function<void(Build::ptr)> createBuild;
createBuild = [&](Build::ptr build) {
printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);
nrAdded++;
if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
if (!build->finishedInDB) {
pqxx::work txn(conn);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
(build->id)
((int) bsAborted)
(time(0))
("derivation was garbage-collected prior to build").exec();
txn.commit();
build->finishedInDB = true;
nrBuildsDone++;
}
return;
}
std::set<Step::ptr> newSteps;
std::set<Path> finishedDrvs; // FIXME: re-use?
Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that
if build A depends on build B with top-level step X, then X
will be "accounted" to B in doBuildStep(). */
for (auto & r : newSteps) {
while (true) {
auto i = newBuilds.find(r->drvPath);
if (i == newBuilds.end()) break;
Build::ptr b = i->second;
newBuilds.erase(i);
createBuild(b);
}
}
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
if (!step) {
Derivation drv = readDerivation(build->drvPath);
BuildOutput res = getBuildOutput(store, drv);
pqxx::work txn(conn);
time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now);
txn.commit();
build->finishedInDB = true;
return;
}
/* If any step has an unsupported system type or has a
previously failed output path, then fail the build right
away. */
bool badStep = false;
for (auto & r : newSteps) {
BuildStatus buildStatus = bsSuccess;
BuildStepStatus buildStepStatus = bssFailed;
if (checkCachedFailure(r, conn)) {
printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
buildStatus = step == r ? bsFailed : bsDepFailed;
buildStepStatus = bssFailed;
}
if (buildStatus == bsSuccess) {
bool supported = false;
{
auto machines_(machines.lock()); // FIXME: use shared_mutex
for (auto & m : *machines_)
if (m.second->supportsStep(r)) { supported = true; break; }
}
if (!supported) {
printMsg(lvlError, format("aborting unsupported build %1%") % build->id);
buildStatus = bsUnsupported;
buildStepStatus = bssUnsupported;
}
}
if (buildStatus != bsSuccess) {
time_t now = time(0);
if (!build->finishedInDB) {
pqxx::work txn(conn);
createBuildStep(txn, 0, build, r, "", buildStepStatus);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")
(build->id)
((int) buildStatus)
(now)
(buildStatus != bsUnsupported ? 1 : 0).exec();
txn.commit();
build->finishedInDB = true;
nrBuildsDone++;
}
badStep = true;
break;
}
}
if (badStep) return;
/* Note: if we exit this scope prior to this, the build and
all newly created steps are destroyed. */
{
auto builds_(builds.lock());
if (!build->finishedInDB) // FIXME: can this happen?
(*builds_)[build->id] = build;
build->toplevel = step;
}
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
% build->id % step->drvPath % newSteps.size());
};
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away,
even while we're still processing other new builds. */
while (!newBuilds.empty()) {
auto build = newBuilds.begin()->second;
newBuilds.erase(newBuilds.begin());
newRunnable.clear();
nrAdded = 0;
try {
createBuild(build);
} catch (Error & e) {
e.addPrefix(format("while loading build %1%: ") % build->id);
throw;
}
/* Add the new runnable build steps to ‘runnable’ and wake up
the builder threads. */
printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
for (auto & r : newRunnable)
makeRunnable(r);
nrBuildsRead += nrAdded;
}
}
void State::removeCancelledBuilds(Connection & conn)
{
/* Get the current set of queued builds. */
std::set<BuildID> currentIds;
{
pqxx::work txn(conn);
auto res = txn.exec("select id from Builds where finished = 0");
for (auto const & row : res)
currentIds.insert(row["id"].as<BuildID>());
}
auto builds_(builds.lock());
for (auto i = builds_->begin(); i != builds_->end(); ) {
if (currentIds.find(i->first) == currentIds.end()) {
printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
i = builds_->erase(i);
// FIXME: ideally we would interrupt active build steps here.
} else
++i;
}
}
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
{
if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;
/* Check if the requested step already exists. If not, create a
new step. In any case, make the step reachable from
referringBuild or referringStep. This is done atomically (with
‘steps’ locked), to ensure that this step can never become
reachable from a new build after doBuildStep has removed it
from ‘steps’. */
Step::ptr step;
bool isNew = false;
{
auto steps_(steps.lock());
/* See if the step already exists in ‘steps’ and is not
stale. */
auto prev = steps_->find(drvPath);
if (prev != steps_->end()) {
step = prev->second.lock();
/* Since ‘step’ is a strong pointer, the referred Step
object won't be deleted after this. */
if (!step) steps_->erase(drvPath); // remove stale entry
}
/* If it doesn't exist, create it. */
if (!step) {
step = std::make_shared<Step>();
step->drvPath = drvPath;
isNew = true;
}
auto step_(step->state.lock());
assert(step_->created != isNew);
if (referringBuild)
step_->builds.push_back(referringBuild);
if (referringStep)
step_->rdeps.push_back(referringStep);
(*steps_)[drvPath] = step;
}
if (!isNew) return step;
printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);
/* Initialize the step. Note that the step may be visible in
‘steps’ before this point, but that doesn't matter because
it's not runnable yet, and other threads won't make it
runnable while step->created == false. */
step->drv = readDerivation(drvPath);
{
auto i = step->drv.env.find("requiredSystemFeatures");
if (i != step->drv.env.end())
step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
}
auto attr = step->drv.env.find("preferLocalBuild");
step->preferLocalBuild =
attr != step->drv.env.end() && attr->second == "1"
&& has(localPlatforms, step->drv.platform);
/* Are all outputs valid? */
bool valid = true;
for (auto & i : step->drv.outputs) {
if (!store->isValidPath(i.second.path)) {
valid = false;
break;
}
}
// FIXME: check whether all outputs are in the binary cache.
if (valid) {
finishedDrvs.insert(drvPath);
return 0;
}
/* No, we need to build. */
printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);
newSteps.insert(step);
/* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) {
auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) {
auto step_(step->state.lock());
step_->deps.insert(dep);
}
}
/* If the step has no (remaining) dependencies, make it
runnable. */
{
auto step_(step->state.lock());
assert(!step_->created);
step_->created = true;
if (step_->deps.empty())
newRunnable.insert(step);
}
return step;
}
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath);
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
assert(step_->deps.empty());
}
{
auto runnable_(runnable.lock());
runnable_->push_back(step);
}
wakeDispatcher();
}
void State::dispatcher()
{
while (true) {
printMsg(lvlDebug, "dispatcher woken up");
auto sleepUntil = system_time::max();
bool keepGoing;
do {
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_)
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Then by speed factor.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
system_time now = std::chrono::system_clock::now();
for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
++i;
continue;
}
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
builderThread.detach(); // FIXME?
keepGoing = true;
break;
}
if (keepGoing) break;
}
} while (keepGoing);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
std::unique_lock<std::mutex> lock(dispatcherMutex);
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup.wait_until(lock, sleepUntil);
nrDispatcherWakeups++;
}
}
printMsg(lvlError, "dispatcher exits");
}
void State::wakeDispatcher()
{
{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
dispatcherWakeup.notify_one();
}
void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation)
{
bool retry = true;
MaintainCount mc(nrActiveSteps);
try {
auto store = openStore(); // FIXME: pool
retry = doBuildStep(store, step, machine);
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%")
% step->drvPath % machine->sshName % e.what());
}
/* Release the machine and wake up the dispatcher. */
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
/* If there was a temporary failure, retry the step after an
exponentially increasing interval. */
if (retry) {
{
auto step_(step->state.lock());
step_->tries++;
nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
int delta = retryInterval * powf(retryBackoff, step_->tries - 1);
printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
}
makeRunnable(step);
}
}
bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine)
{
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
}
/* There can be any number of builds in the database that depend
on this derivation. Arbitrarily pick one (though preferring a
build of which this is the top-level derivation) for the
purpose of creating build steps. We could create a build step
record for every build, but that could be very expensive
(e.g. a stdenv derivation can be a dependency of tens of
thousands of builds), so we don't. */
Build::ptr build;
{
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
if (dependents.empty()) {
/* Apparently all builds that depend on this derivation
are gone (e.g. cancelled). So don't bother. This is
very unlikely to happen, because normally Steps are
only kept alive by being reachable from a
Build. However, it's possible that a new Build just
created a reference to this step. So to handle that
possibility, we retry this step (putting it back in
the runnable queue). If there are really no strong
pointers to the step, it will be deleted. */
printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath);
return true;
}
for (auto build2 : dependents)
if (build2->drvPath == step->drvPath) { build = build2; break; }
if (!build) build = *dependents.begin();
printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)")
% step->drvPath % machine->sshName % build->id % (dependents.size() - 1));
}
bool quit = build->id == buildOne;
auto conn(dbPool.get());
RemoteResult result;
BuildOutput res;
int stepNr = 0;
time_t stepStartTime = result.startTime = time(0);
/* If any of the outputs have previously failed, then don't bother
building again. */
bool cachedFailure = checkCachedFailure(step, *conn);
if (cachedFailure)
result.status = BuildResult::CachedFailure;
else {
/* Create a build step record indicating that we started
building. Also, mark the selected build as busy. */
{
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy);
txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec();
txn.commit();
}
/* Do the build. */
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
} catch (Error & e) {
result.status = BuildResult::MiscFailure;
result.errorMsg = e.msg();
}
if (result.success()) res = getBuildOutput(store, step->drv);
}
time_t stepStopTime = time(0);
if (!result.stopTime) result.stopTime = stepStopTime;
/* Asynchronously compress the log. */
if (result.logFile != "") {
{
auto logCompressorQueue_(logCompressorQueue.lock());
logCompressorQueue_->push(result.logFile);
}
logCompressorWakeup.notify_one();
}
/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry()) {
printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%")
% step->drvPath % machine->sshName % result.errorMsg);
bool retry;
{
auto step_(step->state.lock());
retry = step_->tries + 1 < maxTries;
}
if (retry) {
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, bssAborted, result.errorMsg);
txn.commit();
if (quit) exit(1);
return true;
}
}
if (result.success()) {
/* Register success in the database for all Build objects that
have this step as the top-level step. Since the queue
monitor thread may be creating new referring Builds
concurrently, and updating the database may fail, we do
this in a loop, marking all known builds, repeating until
there are no unmarked builds.
*/
std::vector<BuildID> buildIDs;
while (true) {
/* Get the builds that have this one as the top-level. */
std::vector<Build::ptr> direct;
{
auto steps_(steps.lock());
auto step_(step->state.lock());
for (auto & b_ : step_->builds) {
auto b = b_.lock();
if (b && !b->finishedInDB) direct.push_back(b);
}
/* If there are no builds left to update in the DB,
then we're done (except for calling
finishBuildStep()). Delete the step from
‘steps’. Since we've been holding the ‘steps’ lock,
no new referrers can have been added in the
meantime or be added afterwards. */
if (direct.empty()) {
printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath);
steps_->erase(step->drvPath);
}
}
/* Update the database. */
{
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);
for (auto & b : direct)
markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built,
result.startTime, result.stopTime);
txn.commit();
}
if (direct.empty()) break;
/* Remove the direct dependencies from ‘builds’. This will
cause them to be destroyed. */
for (auto & b : direct) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
buildIDs.push_back(b->id);
}
}
/* Send notification about the builds that have this step as
the top-level. */
for (auto id : buildIDs) {
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(id, std::vector<BuildID>()));
}
notificationSenderWakeup.notify_one();
}
/* Wake up any dependent steps that have no other
dependencies. */
{
auto step_(step->state.lock());
for (auto & rdepWeak : step_->rdeps) {
auto rdep = rdepWeak.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
rdep_->deps.erase(step);
/* Note: if the step has not finished
initialisation yet, it will be made runnable in
createStep(), if appropriate. */
if (rdep_->deps.empty() && rdep_->created) runnable = true;
}
if (runnable) makeRunnable(rdep);
}
}
} else {
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
std::vector<BuildID> dependentIDs;
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
/* If there are no builds left, delete all referring
steps from ‘steps’. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath);
steps_->erase(s->drvPath);
}
break;
}
}
/* Update the database. */
{
pqxx::work txn(*conn);
BuildStatus buildStatus =
result.status == BuildResult::TimedOut ? bsTimedOut :
result.canRetry() ? bsAborted :
bsFailed;
BuildStepStatus buildStepStatus =
result.status == BuildResult::TimedOut ? bssTimedOut :
result.canRetry() ? bssAborted :
bssFailed;
/* For standard failures, we don't care about the error
message. */
if (result.status == BuildResult::PermanentFailure ||
result.status == BuildResult::TransientFailure ||
result.status == BuildResult::CachedFailure ||
result.status == BuildResult::TimedOut)
result.errorMsg = "";
/* Create failed build steps for every build that depends
on this. For cached failures, only create a step for
builds that don't have this step as top-level
(otherwise the user won't be able to see what caused
the build to fail). */
for (auto & build2 : indirect) {
if ((cachedFailure && build2->drvPath == step->drvPath) ||
(!cachedFailure && build == build2) ||
build2->finishedInDB)
continue;
createBuildStep(txn, 0, build2, step, machine->sshName,
buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id);
}
if (!cachedFailure)
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, buildStepStatus, result.errorMsg);
/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
if (build2->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0")
(build2->id)
((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
(result.startTime)
(result.stopTime)
(cachedFailure ? 1 : 0).exec();
nrBuildsDone++;
}
/* Remember failed paths in the database so that they
won't be built again. */
if (!cachedFailure && result.status == BuildResult::PermanentFailure)
for (auto & path : outputPaths(step->drv))
txn.parameterized("insert into FailedPaths values ($1)")(path).exec();
txn.commit();
}
/* Remove the indirect dependencies from ‘builds’. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (buildOne == b->id) quit = true;
}
}
/* Send notification about this build and its dependents. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs));
}
notificationSenderWakeup.notify_one();
}
// FIXME: keep stats about aborted steps?
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;
if (quit) exit(0); // testing hack
return false;
#include "state.hh"
#include "build-result.hh"
using namespace nix;
void State::queueMonitor()
{
while (true) {
try {
queueMonitorLoop();
} catch (std::exception & e) {
printMsg(lvlError, format("queue monitor: %1%") % e.what());
sleep(10); // probably a DB problem, so don't retry right away
}
}
}
void State::queueMonitorLoop()
{
auto conn(dbPool.get());
receiver buildsAdded(*conn, "builds_added");
receiver buildsRestarted(*conn, "builds_restarted");
receiver buildsCancelled(*conn, "builds_cancelled");
receiver buildsDeleted(*conn, "builds_deleted");
auto store = openStore(); // FIXME: pool
unsigned int lastBuildId = 0;
while (true) {
getQueuedBuilds(*conn, store, lastBuildId);
/* Sleep until we get notification from the database about an
event. */
conn->await_notification();
nrQueueWakeups++;
if (buildsAdded.get())
printMsg(lvlTalkative, "got notification: new builds added to the queue");
if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
}
if (buildsCancelled.get() || buildsDeleted.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled");
removeCancelledBuilds(*conn);
}
}
}
void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
{
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::multimap<Path, Build::ptr> newBuilds;
{
pqxx::work txn(conn);
auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
for (auto const & row : res) {
auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue;
if (id > lastBuildId) lastBuildId = id;
if (has(*builds_, id)) continue;
auto build = std::make_shared<Build>();
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
build->maxSilentTime = row["maxsilent"].as<int>();
build->buildTimeout = row["timeout"].as<int>();
newBuilds.emplace(std::make_pair(build->drvPath, build));
}
}
std::set<Step::ptr> newRunnable;
unsigned int nrAdded;
std::function<void(Build::ptr)> createBuild;
createBuild = [&](Build::ptr build) {
printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName);
nrAdded++;
if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
if (!build->finishedInDB) {
pqxx::work txn(conn);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0")
(build->id)
((int) bsAborted)
(time(0))
("derivation was garbage-collected prior to build").exec();
txn.commit();
build->finishedInDB = true;
nrBuildsDone++;
}
return;
}
std::set<Step::ptr> newSteps;
std::set<Path> finishedDrvs; // FIXME: re-use?
Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that
if build A depends on build B with top-level step X, then X
will be "accounted" to B in doBuildStep(). */
for (auto & r : newSteps) {
while (true) {
auto i = newBuilds.find(r->drvPath);
if (i == newBuilds.end()) break;
Build::ptr b = i->second;
newBuilds.erase(i);
createBuild(b);
}
}
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
if (!step) {
Derivation drv = readDerivation(build->drvPath);
BuildOutput res = getBuildOutput(store, drv);
pqxx::work txn(conn);
time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now);
txn.commit();
build->finishedInDB = true;
return;
}
/* If any step has an unsupported system type or has a
previously failed output path, then fail the build right
away. */
bool badStep = false;
for (auto & r : newSteps) {
BuildStatus buildStatus = bsSuccess;
BuildStepStatus buildStepStatus = bssFailed;
if (checkCachedFailure(r, conn)) {
printMsg(lvlError, format("marking build %1% as cached failure") % build->id);
buildStatus = step == r ? bsFailed : bsDepFailed;
buildStepStatus = bssFailed;
}
if (buildStatus == bsSuccess) {
bool supported = false;
{
auto machines_(machines.lock()); // FIXME: use shared_mutex
for (auto & m : *machines_)
if (m.second->supportsStep(r)) { supported = true; break; }
}
if (!supported) {
printMsg(lvlError, format("aborting unsupported build %1%") % build->id);
buildStatus = bsUnsupported;
buildStepStatus = bssUnsupported;
}
}
if (buildStatus != bsSuccess) {
time_t now = time(0);
if (!build->finishedInDB) {
pqxx::work txn(conn);
createBuildStep(txn, 0, build, r, "", buildStepStatus);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0")
(build->id)
((int) buildStatus)
(now)
(buildStatus != bsUnsupported ? 1 : 0).exec();
txn.commit();
build->finishedInDB = true;
nrBuildsDone++;
}
badStep = true;
break;
}
}
if (badStep) return;
/* Note: if we exit this scope prior to this, the build and
all newly created steps are destroyed. */
{
auto builds_(builds.lock());
if (!build->finishedInDB) // FIXME: can this happen?
(*builds_)[build->id] = build;
build->toplevel = step;
}
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
% build->id % step->drvPath % newSteps.size());
};
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away,
even while we're still processing other new builds. */
while (!newBuilds.empty()) {
auto build = newBuilds.begin()->second;
newBuilds.erase(newBuilds.begin());
newRunnable.clear();
nrAdded = 0;
try {
createBuild(build);
} catch (Error & e) {
e.addPrefix(format("while loading build %1%: ") % build->id);
throw;
}
/* Add the new runnable build steps to ‘runnable’ and wake up
the builder threads. */
printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
for (auto & r : newRunnable)
makeRunnable(r);
nrBuildsRead += nrAdded;
}
}
void State::removeCancelledBuilds(Connection & conn)
{
/* Get the current set of queued builds. */
std::set<BuildID> currentIds;
{
pqxx::work txn(conn);
auto res = txn.exec("select id from Builds where finished = 0");
for (auto const & row : res)
currentIds.insert(row["id"].as<BuildID>());
}
auto builds_(builds.lock());
for (auto i = builds_->begin(); i != builds_->end(); ) {
if (currentIds.find(i->first) == currentIds.end()) {
printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
i = builds_->erase(i);
// FIXME: ideally we would interrupt active build steps here.
} else
++i;
}
}
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
{
if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0;
/* Check if the requested step already exists. If not, create a
new step. In any case, make the step reachable from
referringBuild or referringStep. This is done atomically (with
‘steps’ locked), to ensure that this step can never become
reachable from a new build after doBuildStep has removed it
from ‘steps’. */
Step::ptr step;
bool isNew = false;
{
auto steps_(steps.lock());
/* See if the step already exists in ‘steps’ and is not
stale. */
auto prev = steps_->find(drvPath);
if (prev != steps_->end()) {
step = prev->second.lock();
/* Since ‘step’ is a strong pointer, the referred Step
object won't be deleted after this. */
if (!step) steps_->erase(drvPath); // remove stale entry
}
/* If it doesn't exist, create it. */
if (!step) {
step = std::make_shared<Step>();
step->drvPath = drvPath;
isNew = true;
}
auto step_(step->state.lock());
assert(step_->created != isNew);
if (referringBuild)
step_->builds.push_back(referringBuild);
if (referringStep)
step_->rdeps.push_back(referringStep);
(*steps_)[drvPath] = step;
}
if (!isNew) return step;
printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath);
/* Initialize the step. Note that the step may be visible in
‘steps’ before this point, but that doesn't matter because
it's not runnable yet, and other threads won't make it
runnable while step->created == false. */
step->drv = readDerivation(drvPath);
{
auto i = step->drv.env.find("requiredSystemFeatures");
if (i != step->drv.env.end())
step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
}
auto attr = step->drv.env.find("preferLocalBuild");
step->preferLocalBuild =
attr != step->drv.env.end() && attr->second == "1"
&& has(localPlatforms, step->drv.platform);
/* Are all outputs valid? */
bool valid = true;
for (auto & i : step->drv.outputs) {
if (!store->isValidPath(i.second.path)) {
valid = false;
break;
}
}
// FIXME: check whether all outputs are in the binary cache.
if (valid) {
finishedDrvs.insert(drvPath);
return 0;
}
/* No, we need to build. */
printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath);
newSteps.insert(step);
/* Create steps for the dependencies. */
for (auto & i : step->drv.inputDrvs) {
auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
if (dep) {
auto step_(step->state.lock());
step_->deps.insert(dep);
}
}
/* If the step has no (remaining) dependencies, make it
runnable. */
{
auto step_(step->state.lock());
assert(!step_->created);
step_->created = true;
if (step_->deps.empty())
newRunnable.insert(step);
}
return step;
}
template <class C, class V>
bool has(const C & c, const V & v)
{
return c.find(v) != c.end();
}