This removes the need for Nix's
Build logs are now written to $HYDRA_DATA/build-logs because hydra-queue-runner doesn't have write permission to /nix/var/log.
#include <algorithm>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "build-remote.hh"
#include "util.hh"
#include "misc.hh"
#include "serve-protocol.hh"
#include "worker-protocol.hh"
using namespace nix;
struct Child
Pid pid;
AutoCloseFD to, from;
static void openConnection(const string & sshName, const string & sshKey,
int stderrFD, Child & child)
Pipe to, from;
from.create(); = startProcess([&]() {
if (dup2(to.readSide, STDIN_FILENO) == -1)
throw SysError("cannot dup input pipe to stdin");
if (dup2(from.writeSide, STDOUT_FILENO) == -1)
throw SysError("cannot dup output pipe to stdout");
if (dup2(stderrFD, STDERR_FILENO) == -1)
throw SysError("cannot dup stderr");
Strings argv({"ssh", "-x", "-a", sshName, "--", "nix-store", "--serve", "--write"});
execvp("ssh", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start ssh");
from.writeSide.close(); = to.writeSide.borrow();
child.from = from.readSide.borrow();
static void copyClosureTo(std::shared_ptr<StoreAPI> store,
FdSource & from, FdSink & to, const PathSet & paths,
bool useSubstitutes = false)
PathSet closure;
for (auto & path : paths)
computeFSClosure(*store, path, closure);
Paths sorted = topoSortPaths(*store, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
writeInt(cmdQueryValidPaths, to);
writeInt(1, to); // == lock paths
writeInt(useSubstitutes, to);
writeStrings(sorted, to);
/* Get back the set of paths that are already valid on the remote
host. */
auto present = readStorePaths<PathSet>(from);
PathSet missing;
std::set_difference(closure.begin(), closure.end(), present.begin(), present.end(),
std::inserter(missing, missing.end()));
printMsg(lvlError, format("sending %1% missing paths") % missing.size());
if (missing.empty()) return;
throw Error("NOT IMPL 1");
static void copyClosureFrom(std::shared_ptr<StoreAPI> store,
FdSource & from, FdSink & to, const PathSet & paths)
writeInt(cmdExportPaths, to);
writeInt(0, to); // == don't sign
writeStrings(paths, to);
store->importPaths(false, from);
void buildRemote(std::shared_ptr<StoreAPI> store,
const string & sshName, const string & sshKey,
const Path & drvPath, const Derivation & drv,
const nix::Path & logDir, RemoteResult & result)
string base = baseNameOf(drvPath);
Path logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2);
AutoCloseFD logFD(open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666));
if (logFD == -1) throw SysError(format("creating log file ‘%1%’") % logFile);
Child child;
openConnection(sshName, sshKey, logFD, child);
FdSource from(child.from);
FdSink to(;
/* Handshake. */
writeInt(SERVE_MAGIC_1, to);
unsigned int magic = readInt(from);
if (magic != SERVE_MAGIC_2)
throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % sshName);
unsigned int version = readInt(from);
if (GET_PROTOCOL_MAJOR(version) != 0x200)
throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % sshName);
/* Copy the input closure. */
printMsg(lvlError, format("sending closure of ‘%1%’ to ‘%2%’") % drvPath % sshName);
copyClosureTo(store, from, to, PathSet({drvPath}));
/* Do the build. */
printMsg(lvlError, format("building ‘%1%’ on ‘%2%’") % drvPath % sshName);
writeInt(cmdBuildPaths, to);
writeStrings(PathSet({drvPath}), to);
writeInt(3600, to); // == maxSilentTime, FIXME
writeInt(7200, to); // == buildTimeout, FIXME
result.startTime = time(0);
int res = readInt(from);
result.stopTime = time(0);
if (res) {
result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % sshName).str();
if (res == 100) result.status = RemoteResult::rrPermanentFailure;
else if (res == 101) result.status = RemoteResult::rrTimedOut;
else result.status = RemoteResult::rrMiscFailure;
/* Copy the output paths. */
printMsg(lvlError, format("copying outputs of ‘%1%’ from ‘%2%’") % drvPath % sshName);
PathSet outputs;
for (auto & output : drv.outputs)
copyClosureFrom(store, from, to, outputs);
/* Shut down the connection. */;;
result.status = RemoteResult::rrSuccess;
#pragma once
#include "store-api.hh"
#include "derivations.hh"
struct RemoteResult
enum {
rrSuccess = 0,
rrPermanentFailure = 1,
rrTimedOut = 2,
rrMiscFailure = 3
} status = rrMiscFailure;
std::string errorMsg;
time_t startTime = 0, stopTime = 0;
void buildRemote(std::shared_ptr<nix::StoreAPI> store,
const std::string & sshName, const std::string & sshKey,
const nix::Path & drvPath, const nix::Derivation & drv,
const nix::Path & logDir, RemoteResult & result);
struct Machine
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
Sync<unsigned int> currentJobs;
auto currentJobs_(currentJobs.lock());
*currentJobs_ = 0;
/* A RAII helper that manages the currentJobs field of Machine
objects. */
struct MachineReservation
typedef std::shared_ptr<MachineReservation> ptr;
Machine::ptr machine;
MachineReservation(Machine::ptr machine) : machine(machine)
auto currentJobs_(machine->currentJobs.lock());
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ > 0) (*currentJobs_)--;
Machines newMachines;
if (pathExists(machinesFile)) {
for (auto line : tokenizeString<Strings>(readFile(machinesFile), "\n")) {
line = trim(string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2];
if (tokens[3] != "")
string2Int(tokens[3], machine->maxJobs);
machine->maxJobs = 1;
machine->speedFactor = atof(tokens[4].c_str());
machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
} else {
auto machine = std::make_shared<Machine>();
machine->sshName = "localhost";
machine->systemTypes = StringSet({settings.thisSystem});
if (settings.thisSystem == "x86_64-linux")
machine->maxJobs = settings.maxBuildJobs;
auto machines_(machines.lock());
*machines_ = newMachines;
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, stopTime = $6 where build = $2 and stepnr = $3")
("update BuildSteps set busy = 0, status = $1, propagatedFrom = $4, errorMsg = $5, startTime = $6, stopTime = $7 where build = $2 and stepnr = $3")
while (runnable_->empty() && !exitRequested)
if (exitRequested) break;
auto weak = *runnable_->begin();
step = weak.lock();
if (!step) continue;
printMsg(lvlError, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
auto reservation = findMachine(step);
if (!reservation) {
printMsg(lvlError, format("cannot execute step ‘%1%’ right now") % step->drvPath);
printMsg(lvlInfo, format("WOOHOO: starting step ‘%1%’ on machine ‘%2%’")
% step->drvPath % reservation->machine->sshName);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?
/* Build it. */
printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath);
doBuildStep(store, step);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
std::unique_lock<std::mutex> lock(dispatcherMutex);
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
MachineReservation::ptr State::findMachine(Step::ptr step)
auto machines_(machines.lock());
for (auto & machine : *machines_) {
if (!has(machine->systemTypes, step->drv.platform)) continue;
// FIXME: check features
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ >= machine->maxJobs) continue;
return std::make_shared<MachineReservation>(machine);
/* FIXME: distinguish between permanent failures (a matching
machine doesn't exist) and temporary failures (a matching
machine is not available). */
return 0;
void State::builder(Step::ptr step, MachineReservation::ptr reservation)
try {
auto store = openStore(); // FIXME: pool
doBuildStep(store, step, reservation->machine);
} catch (std::exception & e) {
printMsg(lvlError, format("build thread for ‘%1%’: %2%") % step->drvPath % e.what());
// FIXME: put step back in runnable and retry
/* Release the machine and wake up the dispatcher. */
reservation = 0;
printMsg(lvlError, "builder exits");
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine)
std::vector<std::thread> builderThreads;
for (int n = 0; n < 4; n++)
builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));
auto queueMonitorThread = std::thread(&State::queueMonitor, this);
auto dispatcherThread = std::thread(&State::dispatcher, this);
for ($fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
return $_ if (-f $_);
my $fn2 = Hydra::Model::DB::getHydraPath . "/build-logs/";
for ($fn2 . $bucketed, $fn . $bucketed . ".bz2", $fn . $bucketed, $fn . $base . ".bz2", $fn . $base) {
return $_ if -f $_;