#include <cmath>
#include "state.hh"
#include "build-result.hh"
#include "finally.hh"
#include "binary-cache-store.hh"
using namespace nix;
void setThreadName(const std::string & name)
{
#ifdef __linux__
pthread_setname_np(pthread_self(), std::string(name, 0, 15).c_str());
#endif
}
void State::builder(MachineReservation::ptr reservation)
{
setThreadName("bld~" + std::string(reservation->step->drvPath.to_string()));
StepResult res = sRetry;
nrStepsStarted++;
Step::wptr wstep = reservation->step;
{
auto activeStep = std::make_shared<ActiveStep>();
activeStep->step = reservation->step;
activeSteps_.lock()->insert(activeStep);
Finally removeActiveStep([&]() {
activeSteps_.lock()->erase(activeStep);
});
try {
auto destStore = getDestStore();
res = doBuildStep(destStore, reservation, activeStep);
} catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName,
e.what());
}
}
assert(reservation.unique());
reservation = 0;
wakeDispatcher();
Step::ptr step = wstep.lock();
if (res != sDone && step) {
if (res == sRetry) {
auto step_(step->state.lock());
step_->tries++;
nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; int delta = retryInterval * std::pow(retryBackoff, step_->tries - 1) + (rand() % 10);
printMsg(lvlInfo, "will retry ‘%s’ after %ss", localStore->printStorePath(step->drvPath), delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
}
makeRunnable(step);
}
}
State::StepResult State::doBuildStep(nix::ref<Store> destStore,
MachineReservation::ptr reservation,
std::shared_ptr<ActiveStep> activeStep)
{
auto & step(reservation->step);
auto & machine(reservation->machine);
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
}
BuildID buildId;
std::optional<StorePath> buildDrvPath;
unsigned int maxSilentTime, buildTimeout;
unsigned int repeats = step->isDeterministic ? 1 : 0;
auto conn(dbPool.get());
{
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
if (dependents.empty()) {
printMsg(lvlInfo, "maybe cancelling build step ‘%s’", localStore->printStorePath(step->drvPath));
return sMaybeCancelled;
}
Build::ptr build;
for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath) {
build = build2;
pqxx::work txn(*conn);
notifyBuildStarted(txn, build->id);
txn.commit();
}
{
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
if (i != jobsetRepeats.end())
repeats = std::max(repeats, i->second);
}
}
if (!build) build = *dependents.begin();
buildId = build->id;
buildDrvPath = build->drvPath;
maxSilentTime = build->maxSilentTime;
buildTimeout = build->buildTimeout;
printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), repeats + 1, machine->sshName, buildId, (dependents.size() - 1));
}
if (!buildOneDone)
buildOneDone = buildId == buildOne && step->drvPath == *buildDrvPath;
RemoteResult result;
BuildOutput res;
unsigned int stepNr = 0;
bool stepFinished = false;
Finally clearStep([&]() {
if (stepNr && !stepFinished) {
printError("marking step %d of build %d as orphaned", stepNr, buildId);
auto orphanedSteps_(orphanedSteps.lock());
orphanedSteps_->emplace(buildId, stepNr);
}
if (stepNr) {
try {
auto store = destStore.dynamic_pointer_cast<BinaryCacheStore>();
if (uploadLogsToBinaryCache && store && pathExists(result.logFile)) {
store->upsertFile("log/" + std::string(step->drvPath.to_string()), readFile(result.logFile), "text/plain; charset=utf-8");
unlink(result.logFile.c_str());
}
} catch (...) {
ignoreException();
}
}
});
time_t stepStartTime = result.startTime = time(0);
if (checkCachedFailure(step, *conn))
result.stepStatus = bsCachedFailure;
else {
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy);
txn.commit();
}
auto updateStep = [&](StepState stepState) {
pqxx::work txn(*conn);
updateBuildStep(txn, buildId, stepNr, stepState);
txn.commit();
};
NarMemberDatas narMembers;
try {
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers);
} catch (Error & e) {
if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
result.stepStatus = bsCancelled;
result.canRetry = false;
} else {
result.stepStatus = bsAborted;
result.errorMsg = e.msg();
result.canRetry = true;
}
}
if (result.stepStatus == bsSuccess) {
updateStep(ssPostProcessing);
res = getBuildOutput(destStore, narMembers, *step->drv);
}
}
time_t stepStopTime = time(0);
if (!result.stopTime) result.stopTime = stepStopTime;
if (result.stepStatus != bsAborted)
result.errorMsg = "";
{
auto step_(step->state.lock());
if (!step_->jobsets.empty()) {
time_t charge = (result.stopTime - result.startTime) / step_->jobsets.size();
for (auto & jobset : step_->jobsets)
jobset->addStep(result.startTime, charge);
}
}
if (stepNr) {
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
txn.commit();
}
if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg);
assert(stepNr);
bool retry;
{
auto step_(step->state.lock());
retry = step_->tries + 1 < maxTries;
}
if (retry) {
auto mc = startDbUpdate();
stepFinished = true;
if (buildOneDone) exit(1);
return sRetry;
}
}
if (result.stepStatus == bsSuccess) {
assert(stepNr);
for (auto & i : step->drv->outputsAndOptPaths(*localStore)) {
if (i.second.second)
addRoot(*i.second.second);
}
std::vector<BuildID> buildIDs;
while (true) {
std::vector<Build::ptr> direct;
{
auto steps_(steps.lock());
auto step_(step->state.lock());
for (auto & b_ : step_->builds) {
auto b = b_.lock();
if (b && !b->finishedInDB) direct.push_back(b);
}
if (direct.empty()) {
printMsg(lvlDebug, "finishing build step ‘%s’",
localStore->printStorePath(step->drvPath));
steps_->erase(step->drvPath);
}
}
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
for (auto & b : direct) {
printMsg(lvlInfo, format("marking build %1% as succeeded") % b->id);
markSucceededBuild(txn, b, res, buildId != b->id || result.isCached,
result.startTime, result.stopTime);
}
txn.commit();
}
stepFinished = true;
if (direct.empty()) break;
for (auto & b : direct) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
buildIDs.push_back(b->id);
}
}
{
pqxx::work txn(*conn);
for (auto id : buildIDs)
notifyBuildFinished(txn, id, {});
txn.commit();
}
{
auto step_(step->state.lock());
for (auto & rdepWeak : step_->rdeps) {
auto rdep = rdepWeak.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
rdep_->deps.erase(step);
if (rdep_->deps.empty() && rdep_->created) runnable = true;
}
if (runnable) makeRunnable(rdep);
}
}
} else
failStep(*conn, step, buildId, result, machine, stepFinished);
nrStepsDone++;
totalStepTime += stepStopTime - stepStartTime;
totalStepBuildTime += result.stopTime - result.startTime;
machine->state->nrStepsDone++;
machine->state->totalStepTime += stepStopTime - stepStartTime;
machine->state->totalStepBuildTime += result.stopTime - result.startTime;
if (buildOneDone) exit(0);
return sDone;
}
void State::failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished)
{
std::vector<BuildID> dependentIDs;
while (true) {
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, "finishing build step ‘%s’",
localStore->printStorePath(s->drvPath));
steps_->erase(s->drvPath);
}
}
}
if (indirect.empty() && stepFinished) break;
{
auto mc = startDbUpdate();
pqxx::work txn(conn);
for (auto & build : indirect) {
if ((result.stepStatus == bsCachedFailure && build->drvPath == step->drvPath) ||
((result.stepStatus != bsCachedFailure && result.stepStatus != bsUnsupported) && buildId == build->id) ||
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}
for (auto & build : indirect) {
if (build->finishedInDB) continue;
printMsg(lvlError, format("marking build %1% as failed") % build->id);
txn.exec_params0
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5, notificationPendingSince = $4 where id = $1 and finished = 0",
build->id,
(int) (build->drvPath != step->drvPath && result.buildStatus() == bsFailed ? bsDepFailed : result.buildStatus()),
result.startTime,
result.stopTime,
result.stepStatus == bsCachedFailure ? 1 : 0);
nrBuildsDone++;
}
if (result.stepStatus != bsCachedFailure && result.canCache)
for (auto & i : step->drv->outputsAndOptPaths(*localStore))
if (i.second.second)
txn.exec_params0("insert into FailedPaths values ($1)", localStore->printStorePath(*i.second.second));
txn.commit();
}
stepFinished = true;
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
dependentIDs.push_back(b->id);
if (!buildOneDone && buildOne == b->id) buildOneDone = true;
}
}
{
pqxx::work txn(conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
}
}
void State::addRoot(const StorePath & storePath)
{
auto root = rootsDir + "/" + std::string(storePath.to_string());
if (!pathExists(root)) writeFile(root, "");
}