Basic remote building
[?]
Jun 9, 2015, 12:21 PM
5AIYUMTBY6TFQTBRP3MJ2PYWUMRF57I77NIVWYE74UMEVQMBWZVQCDependencies
- [2]
O776XDS2Make getDrvLogPath work with both bucketed and non-bucketed nix logs. - [3]
YZAI5GQUImplement a database connection pool - [4]
NJJ7H64SVery basic multi-threaded queue runner - [5]
T2EIYJNGOn SIGINT, shut down the builder threads - [6]
Y6AHH4THRemove the logfile and logSize columns from the database - [7]
24BMQDZAStart of single-process hydra-queue-runner - [8]
ZH6B56XRTry harder to find build logs - [9]
62MQPRXCPass null values to libpqxx properly - [10]
ENXUSMSVMake concurrency more robust - [*]
2GK5DOU7* Downloading closures.
Change contents
- replacement in src/hydra-queue-runner/Makefile.am at line 3
hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cchydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc - file addition: build-remote.cc[4.187]
#include <algorithm>#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include "build-remote.hh"#include "util.hh"#include "misc.hh"#include "serve-protocol.hh"#include "worker-protocol.hh"using namespace nix;struct Child{Pid pid;AutoCloseFD to, from;};static void openConnection(const string & sshName, const string & sshKey,int stderrFD, Child & child){Pipe to, from;to.create();from.create();child.pid = startProcess([&]() {if (dup2(to.readSide, STDIN_FILENO) == -1)throw SysError("cannot dup input pipe to stdin");if (dup2(from.writeSide, STDOUT_FILENO) == -1)throw SysError("cannot dup output pipe to stdout");if (dup2(stderrFD, STDERR_FILENO) == -1)throw SysError("cannot dup stderr");Strings argv({"ssh", "-x", "-a", sshName, "--", "nix-store", "--serve", "--write"});execvp("ssh", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove castthrow SysError("cannot start ssh");});to.readSide.close();from.writeSide.close();child.to = to.writeSide.borrow();child.from = from.readSide.borrow();}static void copyClosureTo(std::shared_ptr<StoreAPI> store,FdSource & from, FdSink & to, const PathSet & paths,bool useSubstitutes = false){PathSet closure;for (auto & path : paths)computeFSClosure(*store, path, closure);Paths sorted = topoSortPaths(*store, closure);/* Send the "query valid paths" command with the "lock" optionenabled. This prevents a race where the remote hostgarbage-collect paths that are already there. Optionally, askthe remote host to substitute missing paths. */writeInt(cmdQueryValidPaths, to);writeInt(1, to); // == lock pathswriteInt(useSubstitutes, to);writeStrings(sorted, to);to.flush();/* Get back the set of paths that are already valid on the remotehost. */auto present = readStorePaths<PathSet>(from);PathSet missing;std::set_difference(closure.begin(), closure.end(), present.begin(), present.end(),std::inserter(missing, missing.end()));printMsg(lvlError, format("sending %1% missing paths") % missing.size());if (missing.empty()) return;throw Error("NOT IMPL 1");}static void copyClosureFrom(std::shared_ptr<StoreAPI> store,FdSource & from, FdSink & to, const PathSet & paths){writeInt(cmdExportPaths, to);writeInt(0, to); // == don't signwriteStrings(paths, to);to.flush();store->importPaths(false, from);}void buildRemote(std::shared_ptr<StoreAPI> store,const string & sshName, const string & sshKey,const Path & drvPath, const Derivation & drv,const nix::Path & logDir, RemoteResult & result){string base = baseNameOf(drvPath);Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);createDirs(dirOf(logFile));AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile);Child child;openConnection(sshName, sshKey, logFD, child);logFD.close();FdSource from(child.from);FdSink to(child.to);/* Handshake. */writeInt(SERVE_MAGIC_1, to);writeInt(SERVE_PROTOCOL_VERSION, to);to.flush();unsigned int magic = readInt(from);if (magic != SERVE_MAGIC_2)throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % sshName);unsigned int version = readInt(from);if (GET_PROTOCOL_MAJOR(version) != 0x200)throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % sshName);/* Copy the input closure. */printMsg(lvlError, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName);copyClosureTo(store, from, to, PathSet({drvPath}));/* Do the build. */printMsg(lvlError, format("building ‘%1%’ on ‘%2%’") % drvPath % sshName);writeInt(cmdBuildPaths, to);writeStrings(PathSet({drvPath}), to);writeInt(3600, to); // == maxSilentTime, FIXMEwriteInt(7200, to); // == buildTimeout, FIXMEto.flush();result.startTime = time(0);int res = readInt(from);result.stopTime = time(0);if (res) {result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % sshName).str();if (res == 100) result.status = RemoteResult::rrPermanentFailure;else if (res == 101) result.status = RemoteResult::rrTimedOut;else result.status = RemoteResult::rrMiscFailure;return;}/* Copy the output paths. */printMsg(lvlError, format("copying outputs of ‘%1%’ from ‘%2%’") % drvPath % sshName);PathSet outputs;for (auto & output : drv.outputs)outputs.insert(output.second.path);copyClosureFrom(store, from, to, outputs);/* Shut down the connection. */child.to.close();child.pid.wait(true);result.status = RemoteResult::rrSuccess;} - file addition: build-remote.hh[4.187]
#pragma once#include "store-api.hh"#include "derivations.hh"struct RemoteResult{enum {rrSuccess = 0,rrPermanentFailure = 1,rrTimedOut = 2,rrMiscFailure = 3} status = rrMiscFailure;std::string errorMsg;time_t startTime = 0, stopTime = 0;};void buildRemote(std::shared_ptr<nix::StoreAPI> store,const std::string & sshName, const std::string & sshKey,const nix::Path & drvPath, const nix::Derivation & drv,const nix::Path & logDir, RemoteResult & result); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 11
#include "build-remote.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 137
std::atomic_bool destroyed;Step() : destroyed(false) { } - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 145
}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;Sync<unsigned int> currentJobs;Machine(){auto currentJobs_(currentJobs.lock());*currentJobs_ = 0; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 168
/* A RAII helper that manages the currentJobs field of Machineobjects. */struct MachineReservation{typedef std::shared_ptr<MachineReservation> ptr;Machine::ptr machine;MachineReservation(Machine::ptr machine) : machine(machine){auto currentJobs_(machine->currentJobs.lock());(*currentJobs_)++;}~MachineReservation(){auto currentJobs_(machine->currentJobs.lock());if (*currentJobs_ > 0) (*currentJobs_)--;}}; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 191
std::thread queueMonitorThread;Path hydraData, logDir; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 214
/* CV for waking up the dispatcher. */std::condition_variable dispatcherWakeup;std::mutex dispatcherMutex; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 221
/* The build machines. */typedef std::list<Machine::ptr> Machines;Sync<Machines> machines;/* The currently active builder threads. FIXME: We could re-usethese, but since they're fairly long-running, it's probably notworth it. */// std::vector<std::thread> builderThreads; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 235
void loadMachines(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 242
void finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr,void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 261
void builderThreadEntry(int slot);/* The thread that selects and starts runnable builds. */void dispatcher(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 264
void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step);void wakeDispatcher(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 266
MachineReservation::ptr findMachine(Step::ptr step);void builder(Step::ptr step, MachineReservation::ptr reservation);void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 282
hydraData = getEnv("HYDRA_DATA");if (hydraData == "") throw Error("$HYDRA_DATA must be set");logDir = canonPath(hydraData + "/build-logs"); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 298
void State::loadMachines(){Path machinesFile = getEnv("NIX_REMOTE_SYSTEMS", "/etc/nix/machines"); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 304
Machines newMachines;if (pathExists(machinesFile)) {for (auto line : tokenizeString<Strings>(readFile(machinesFile), "\n")) {line = trim(string(line, 0, line.find('#')));auto tokens = tokenizeString<std::vector<std::string>>(line);if (tokens.size() < 3) continue;tokens.resize(7);auto machine = std::make_shared<Machine>();machine->sshName = tokens[0];machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");machine->sshKey = tokens[2];if (tokens[3] != "")string2Int(tokens[3], machine->maxJobs);elsemachine->maxJobs = 1;machine->speedFactor = atof(tokens[4].c_str());machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");newMachines.push_back(machine);} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 328
} else {auto machine = std::make_shared<Machine>();machine->sshName = "localhost";machine->systemTypes = StringSet({settings.thisSystem});if (settings.thisSystem == "x86_64-linux")machine->systemTypes.insert("i686-linux");machine->maxJobs = settings.maxBuildJobs;newMachines.push_back(machine);}auto machines_(machines.lock());*machines_ = newMachines;} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 378
void State::finishBuildStep(pqxx::work & txn, time_t stopTime, BuildID buildId, int stepNr,void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr, - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 381
assert(startTime);assert(stopTime); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 384
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, stopTime = $6 where build = $2 and stepnr = $3")("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7 where build = $2 and stepnr = $3") - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 388
(stopTime, stopTime != 0).exec();(startTime)(stopTime).exec(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 464
printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 484
printMsg(lvlInfo, format("cached build %1%") % build->id); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 506
printMsg(lvlInfo, format("added build %1% (top-level step %2%, %3% new runnable steps)")% build->id % step->drvPath % newRunnable.size()); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 586
if (step->destroyed) return;step->destroyed = true; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 674
printMsg(lvlInfo, format("step ‘%1%’ is now runnable") % step->drvPath); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 686
runnableWakeup.notify_one();wakeDispatcher(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 690
void State::builderThreadEntry(int slot)void State::dispatcher() - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 692
auto store = openStore(); // FIXME: poolwhile (!exitRequested) {printMsg(lvlError, "dispatcher woken up"); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 695[4.2578]→[4.2578:2598](∅→∅),[4.2598]→[4.6726:6793](∅→∅),[4.6793]→[4.2598:2622](∅→∅),[4.2598]→[4.2598:2622](∅→∅)
while (true) {/* Sleep until a runnable build step becomes available. */Step::ptr step; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 697
while (runnable_->empty() && !exitRequested)runnable_.wait(runnableWakeup);if (exitRequested) break;auto weak = *runnable_->begin();runnable_->pop_front();step = weak.lock();if (!step) continue;printMsg(lvlError, format("%1% runnable builds") % runnable_->size());/* FIXME: we're holding the runnable lock too longhere. This could be more efficient. */for (auto i = runnable_->begin(); i != runnable_->end(); ) {auto step = i->lock();/* Delete dead steps. */if (!step) {i = runnable_->erase(i);continue;}auto reservation = findMachine(step);if (!reservation) {printMsg(lvlError, format("cannot execute step ‘%1%’ right now") % step->drvPath);++i;continue;}printMsg(lvlInfo, format("WOOHOO: starting step ‘%1%’ on machine ‘%2%’")% step->drvPath % reservation->machine->sshName);i = runnable_->erase(i);auto builderThread = std::thread(&State::builder, this, step, reservation);builderThread.detach(); // FIXME?} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 727[4.2853]→[4.7069:7093](∅→∅),[4.7093]→[4.2853:2950](∅→∅),[4.2853]→[4.2853:2950](∅→∅),[4.2951]→[4.2951:2985](∅→∅)
/* Build it. */printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath);doBuildStep(store, step);/* Sleep until we're woken up (either because a runnable buildis added, or because a build finishes). */{std::unique_lock<std::mutex> lock(dispatcherMutex);dispatcherWakeup.wait(lock);} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 735
printMsg(lvlError, "builder thread exits");printMsg(lvlError, "dispatcher exits");}void State::wakeDispatcher(){{ std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrierdispatcherWakeup.notify_all(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 746
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)MachineReservation::ptr State::findMachine(Step::ptr step){auto machines_(machines.lock());for (auto & machine : *machines_) {if (!has(machine->systemTypes, step->drv.platform)) continue;// FIXME: check features{auto currentJobs_(machine->currentJobs.lock());if (*currentJobs_ >= machine->maxJobs) continue;}return std::make_shared<MachineReservation>(machine);}/* FIXME: distinguish between permanent failures (a matchingmachine doesn't exist) and temporary failures (a matchingmachine is not available). */return 0;}void State::builder(Step::ptr step, MachineReservation::ptr reservation){try {auto store = openStore(); // FIXME: pooldoBuildStep(store, step, reservation->machine);} catch (std::exception & e) {printMsg(lvlError, format("build thread for ‘%1%’: %2%") % step->drvPath % e.what());// FIXME: put step back in runnable and retry}/* Release the machine and wake up the dispatcher. */assert(reservation.unique());reservation = 0;wakeDispatcher();printMsg(lvlError, "builder exits");}void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 825
time_t startTime = time(0);RemoteResult result;result.startTime = time(0); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 830
stepNr = createBuildStep(txn, startTime, build, step, bssBusy);stepNr = createBuildStep(txn, result.startTime, build, step, bssBusy); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 834
bool success = false;std::string errorMsg; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 835
store->buildPaths(PathSet({step->drvPath}));success = true;buildRemote(store, machine->sshName, machine->sshKey, step->drvPath, step->drv, logDir, result); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 837
errorMsg = e.msg();result.status = RemoteResult::rrMiscFailure;result.errorMsg = e.msg();printMsg(lvlError, format("ERROR: %1%") % e.msg());abort(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 843
time_t stopTime = time(0);if (!result.stopTime) result.stopTime = time(0); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 846
if (success) res = getBuildResult(store, step->drv);if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 877
if (success) {if (result.status == RemoteResult::rrSuccess) { - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 879
finishBuildStep(txn, stopTime, build->id, stepNr, bssSuccess);finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssSuccess); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 884
markSucceededBuild(txn, build2, res, false, startTime, stopTime);markSucceededBuild(txn, build2, res, false, result.startTime, result.stopTime); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 889
finishBuildStep(txn, stopTime, build->id, stepNr, bssFailed, errorMsg);finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, bssFailed, result.errorMsg); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 893
createBuildStep(txn, stopTime, build2, step, bssFailed, errorMsg, build->id);createBuildStep(txn, result.stopTime, build2, step, bssFailed, result.errorMsg, build->id); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 898
printMsg(lvlError, format("marking build %1% as failed") % build2->id); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 903
(startTime)(stopTime).exec();(result.startTime)(result.stopTime).exec(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 917
if (build2->toplevel == step || !success) {if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) { - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 925
destroyStep(step, success);destroyStep(step, result.status == RemoteResult::rrSuccess); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 932
printMsg(lvlError, format("marking build %1% as succeeded") % build->id); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 969
queueMonitorThread = std::thread(&State::queueMonitor, this);loadMachines(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 971[4.3364]→[4.641:686](∅→∅),[4.686]→[4.3379:3411](∅→∅),[4.3379]→[4.3379:3411](∅→∅),[4.3411]→[4.687:771](∅→∅)
std::vector<std::thread> builderThreads;for (int n = 0; n < 4; n++)builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));auto queueMonitorThread = std::thread(&State::queueMonitor, this);auto dispatcherThread = std::thread(&State::dispatcher, this); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 989
{ auto runnable_(runnable.lock()); } // barrierrunnableWakeup.notify_all();for (auto & thread : builderThreads) thread.join();wakeDispatcher();dispatcherThread.join(); - replacement in src/lib/Hydra/Helper/Nix.pm at line 136
for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {return $_ if (-f $_);my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/";for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {return $_ if -f $_;