hydra-queue-runner: Write directly to a binary cache
[?]
Feb 15, 2016, 8:10 PM
73YR46NJNYZQKHA3QDJCAZYAKC2CGEF5LIS44NOIPDZU6FX6BDPQCDependencies
- [2]
DWFTK56EKeep track of how many threads are waiting - [3]
X6FOUYFJint2String -> std::to_string - [4]
DKJFD6JNProcess Nix API changes - [5]
7VQ4ALFYUpdate "make check" for the new queue runner - [6]
H7SZRHUBUse nix::willBuildLocally() - [7]
6TY4LNHHFinish copyClosure - [8]
MHVIT4JYSplit hydra-queue-runner.cc more - [9]
WE5Q2NVIAllow build to be bumped to the front of the queue via the web interface - [10]
HJOEIMLRRefactor - [11]
EYR3EW6JKeep stats for the Hydra auto scaler - [12]
IK2UBDAURevive jobset scheduling - [13]
MB3TISH2Rate-limit the number of threads copying closures at the same time - [14]
24BMQDZAStart of single-process hydra-queue-runner - [15]
A2GL5FOZMoar stats - [16]
4CQWOODYDon't abort steps that have an unsupported system type - [17]
CNLNT3T4Allow only 1 thread to send a closure to a given machine at the same time - [18]
ZK76B5ZZLoad the queue in order of global priority - [19]
LE4VZIY5More stats - [20]
YR2IM6Y5Temporarily disable machines after a connection failure - [21]
O3NM62IZSupport multiple machines files - [22]
TPNHTE5VRemove obsolete Builds columns and provide accurate "Running builds" - [23]
N4IROACVMove buildRemote() into State - [24]
ACBS7C6Qhydra-queue-runner: Detect changes to the scheduling shares - [25]
OCZ4LSGGAutomatically retry aborted builds - [26]
VQISTKOPhydra-queue-runner: Use substitutes - [27]
OG3Z3QGCNamespace cleanup - [28]
RND7XFNHgetQueuedBuilds(): Periodically stop to handle priority bumps - [29]
SODOV2CMAutomatically reload $NIX_REMOTE_SYSTEMS when it changes - [30]
NAYQT2GThydra-queue-runner: Use cmdBuildDerivation - [31]
5AIYUMTBBasic remote building - [*]
YHP5DSOOImprove parsing of hydra-build-products - [*]
T5BIOVJEAdd support for tracking custom metrics - [*]
ENXUSMSVMake concurrency more robust
Change contents
- replacement in src/hydra-queue-runner/Makefile.am at line 5
build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hhbuild-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh \local-binary-cache.hh local-binary-cache.cc - replacement in src/hydra-queue-runner/build-remote.cc at line 76
static void copyClosureTo(ref<Store> store,static void copyClosureTo(ref<Store> destStore, - replacement in src/hydra-queue-runner/build-remote.cc at line 83
store->computeFSClosure(path, closure);destStore->computeFSClosure(path, closure); - replacement in src/hydra-queue-runner/build-remote.cc at line 98
Paths sorted = store->topoSortPaths(closure);Paths sorted = destStore->topoSortPaths(closure); - replacement in src/hydra-queue-runner/build-remote.cc at line 107
bytesSent += store->queryPathInfo(p).narSize;bytesSent += destStore->queryPathInfo(p).narSize; - replacement in src/hydra-queue-runner/build-remote.cc at line 110
store->exportPaths(missing, false, to);destStore->exportPaths(missing, false, to); - replacement in src/hydra-queue-runner/build-remote.cc at line 118
static void copyClosureFrom(ref<Store> store,static void copyClosureFrom(ref<Store> destStore, - replacement in src/hydra-queue-runner/build-remote.cc at line 123
store->importPaths(false, from);destStore->importPaths(false, from); - replacement in src/hydra-queue-runner/build-remote.cc at line 126
bytesReceived += store->queryPathInfo(p).narSize;bytesReceived += destStore->queryPathInfo(p).narSize; - replacement in src/hydra-queue-runner/build-remote.cc at line 130
void State::buildRemote(ref<Store> store,void State::buildRemote(ref<Store> destStore, - edit in src/hydra-queue-runner/build-remote.cc at line 224
/* Ensure that the inputs exist in the destination store. This isa no-op for regular stores, but for the binary cache store,this will copy the inputs to the binary cache from the localstore. */destStore->buildPaths(basicDrv.inputSrcs); - replacement in src/hydra-queue-runner/build-remote.cc at line 232
if (machine->sshName != "localhost") {if (/* machine->sshName != "localhost" */ true) { - replacement in src/hydra-queue-runner/build-remote.cc at line 238
copyClosureTo(store, from, to, inputs, bytesSent);copyClosureTo(destStore, from, to, inputs, bytesSent); - replacement in src/hydra-queue-runner/build-remote.cc at line 286
if (machine->sshName != "localhost") {if (/* machine->sshName != "localhost" */ true) { - replacement in src/hydra-queue-runner/build-remote.cc at line 292
copyClosureFrom(store, from, to, outputs, bytesReceived);copyClosureFrom(destStore, from, to, outputs, bytesReceived); - edit in src/hydra-queue-runner/build-result.cc at line 44
#if 0 - edit in src/hydra-queue-runner/build-result.cc at line 101
#endif - edit in src/hydra-queue-runner/build-result.cc at line 113
#if 0 - edit in src/hydra-queue-runner/build-result.cc at line 118
#endif - edit in src/hydra-queue-runner/build-result.cc at line 123
#if 0 - edit in src/hydra-queue-runner/build-result.cc at line 147
#endif - replacement in src/hydra-queue-runner/builder.cc at line 18
auto store = openStore(); // FIXME: poolretry = doBuildStep(store, step, reservation->machine);auto destStore = getDestStore();retry = doBuildStep(destStore, step, reservation->machine); - replacement in src/hydra-queue-runner/builder.cc at line 48
bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step, - replacement in src/hydra-queue-runner/builder.cc at line 123
buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result); - replacement in src/hydra-queue-runner/builder.cc at line 129
if (result.success()) res = getBuildOutput(store, step->drv);if (result.success()) res = getBuildOutput(destStore, step->drv); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 10[5.20065][35.19]
#include "local-binary-cache.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 25
}ref<Store> State::getLocalStore(){return openStore(); // FIXME: pool}ref<Store> State::getDestStore(){return make_ref<LocalBinaryCache>(getLocalStore(), "/tmp/binary-cache"); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 110
parseMachines("localhost " + settings.thisSystemparseMachines("localhost " +(settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem) - file addition: local-binary-cache.cc[5.187]
#include "local-binary-cache.hh"#include "archive.hh"#include "derivations.hh"#include "globals.hh"#include "worker-protocol.hh"namespace nix {LocalBinaryCache::LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir): localStore(localStore), binaryCacheDir(binaryCacheDir){createDirs(binaryCacheDir + "/nar");}Path LocalBinaryCache::narInfoFileFor(const Path & storePath){assertStorePath(storePath);return binaryCacheDir + "/" + storePathToHash(storePath) + ".narinfo";}void atomicWrite(const Path & path, const std::string & s){Path tmp = path + ".tmp." + std::to_string(getpid());AutoDelete del(tmp, false);writeFile(tmp, s);if (rename(tmp.c_str(), path.c_str()))throw SysError(format("renaming ‘%1%’ to ‘%2%’") % tmp % path);del.cancel();}void LocalBinaryCache::addToCache(const ValidPathInfo & info,const string & nar){size_t narSize = nar.size();Hash narHash = hashString(htSHA256, nar);if (info.hash.type != htUnknown && info.hash != narHash)throw Error(format("refusing to copy corrupted path ‘%1%’ to binary cache") % info.path);printMsg(lvlTalkative, format("copying path ‘%1%’ (%2% bytes) to binary cache")% info.path % narSize);/* Atomically write the NAR file. */string narFileRel = "nar/" + printHash(narHash) + ".nar";Path narFile = binaryCacheDir + "/" + narFileRel;if (!pathExists(narFile)) atomicWrite(narFile, nar);/* Atomically write the NAR info file.*/Path narInfoFile = narInfoFileFor(info.path);if (!pathExists(narInfoFile)) {Strings refs;for (auto & r : info.references)refs.push_back(baseNameOf(r));std::string narInfo;narInfo += "StorePath: " + info.path + "\n";narInfo += "URL: " + narFileRel + "\n";narInfo += "Compression: none\n";narInfo += "FileHash: sha256:" + printHash(narHash) + "\n";narInfo += "FileSize: " + std::to_string(narSize) + "\n";narInfo += "NarHash: sha256:" + printHash(narHash) + "\n";narInfo += "NarSize: " + std::to_string(narSize) + "\n";narInfo += "References: " + concatStringsSep(" ", refs) + "\n";// FIXME: add signatureatomicWrite(narInfoFile, narInfo);}}LocalBinaryCache::NarInfo LocalBinaryCache::readNarInfo(const Path & storePath){NarInfo res;Path narInfoFile = narInfoFileFor(storePath);if (!pathExists(narInfoFile))abort();std::string narInfo = readFile(narInfoFile);auto corrupt = [&]() {throw Error(format("corrupt NAR info file ‘%1%’") % narInfoFile);};size_t pos = 0;while (pos < narInfo.size()) {size_t colon = narInfo.find(':', pos);if (colon == std::string::npos) corrupt();std::string name(narInfo, pos, colon - pos);size_t eol = narInfo.find('\n', colon + 2);if (eol == std::string::npos) corrupt();std::string value(narInfo, colon + 2, eol - colon - 2);if (name == "StorePath") {res.info.path = value;if (value != storePath) corrupt();res.info.path = value;}else if (name == "References") {auto refs = tokenizeString<Strings>(value, " ");if (!res.info.references.empty()) corrupt();for (auto & r : refs)res.info.references.insert(settings.nixStore + "/" + r);}else if (name == "URL") {res.narUrl = value;}pos = eol + 1;}if (res.info.path.empty() || res.narUrl.empty()) corrupt();return res;}bool LocalBinaryCache::isValidPath(const Path & storePath){Path narInfoFile = narInfoFileFor(storePath);printMsg(lvlDebug, format("checking %1% -> %2%") % storePath % narInfoFile);return pathExists(narInfoFile);}void LocalBinaryCache::exportPath(const Path & storePath, bool sign, Sink & sink){assert(!sign);auto res = readNarInfo(storePath);auto nar = readFile(binaryCacheDir + "/" + res.narUrl);printMsg(lvlTalkative, format("exporting path ‘%1%’ (%2% bytes)") % storePath % nar.size());assert(nar.size() % 8 == 0);sink((unsigned char *) nar.c_str(), nar.size());// FIXME: check integrity of NAR.sink << exportMagic << storePath << res.info.references << res.info.deriver << 0;}Paths LocalBinaryCache::importPaths(bool requireSignature, Source & source){assert(!requireSignature);Paths res;while (true) {unsigned long long n = readLongLong(source);if (n == 0) break;if (n != 1) throw Error("input doesn't look like something created by ‘nix-store --export’");res.push_back(importPath(source));}return res;}struct TeeSource : Source{Source & readSource;std::string data;TeeSource(Source & readSource) : readSource(readSource){}size_t read(unsigned char * data, size_t len){size_t n = readSource.read(data, len);this->data.append((char *) data, n);return n;}};struct NopSink : ParseSink{};Path LocalBinaryCache::importPath(Source & source){/* FIXME: some cut&paste of LocalStore::importPath(). *//* Extract the NAR from the source. */TeeSource tee(source);NopSink sink;parseDump(sink, tee);uint32_t magic = readInt(source);if (magic != exportMagic)throw Error("Nix archive cannot be imported; wrong format");ValidPathInfo info;info.path = readStorePath(source);info.references = readStorePaths<PathSet>(source);readString(source); // deriver, don't carebool haveSignature = readInt(source) == 1;assert(!haveSignature);addToCache(info, tee.data);return info.path;}ValidPathInfo LocalBinaryCache::queryPathInfo(const Path & storePath){return readNarInfo(storePath).info;}void LocalBinaryCache::querySubstitutablePathInfos(const PathSet & paths,SubstitutablePathInfos & infos){PathSet left;for (auto & storePath : paths) {if (!localStore->isValidPath(storePath)) {left.insert(storePath);continue;}ValidPathInfo info = localStore->queryPathInfo(storePath);SubstitutablePathInfo sub;sub.references = info.references;sub.downloadSize = 0;sub.narSize = info.narSize;infos.emplace(storePath, sub);}localStore->querySubstitutablePathInfos(left, infos);}void LocalBinaryCache::buildPaths(const PathSet & paths, BuildMode buildMode){for (auto & storePath : paths) {assert(!isDerivation(storePath));if (isValidPath(storePath)) continue;localStore->addTempRoot(storePath);if (!localStore->isValidPath(storePath))localStore->ensurePath(storePath);ValidPathInfo info = localStore->queryPathInfo(storePath);for (auto & ref : info.references)if (ref != storePath)ensurePath(ref);StringSink sink;dumpPath(storePath, sink);addToCache(info, sink.s);}}void LocalBinaryCache::ensurePath(const Path & path){buildPaths({path});}} - file addition: local-binary-cache.hh[5.187]
#pragma once#include "store-api.hh"namespace nix {class LocalBinaryCache : public nix::Store{private:ref<Store> localStore;Path binaryCacheDir;public:LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir);private:Path narInfoFileFor(const Path & storePath);void addToCache(const ValidPathInfo & info, const string & nar);struct NarInfo{ValidPathInfo info;std::string narUrl;};NarInfo readNarInfo(const Path & storePath);public:bool isValidPath(const Path & path) override;PathSet queryValidPaths(const PathSet & paths) override{ abort(); }PathSet queryAllValidPaths() override{ abort(); }ValidPathInfo queryPathInfo(const Path & path) override;Hash queryPathHash(const Path & path) override{ abort(); }void queryReferrers(const Path & path,PathSet & referrers) override{ abort(); }Path queryDeriver(const Path & path) override{ abort(); }PathSet queryValidDerivers(const Path & path) override{ abort(); }PathSet queryDerivationOutputs(const Path & path) override{ abort(); }StringSet queryDerivationOutputNames(const Path & path) override{ abort(); }Path queryPathFromHashPart(const string & hashPart) override{ abort(); }PathSet querySubstitutablePaths(const PathSet & paths) override{ abort(); }void querySubstitutablePathInfos(const PathSet & paths,SubstitutablePathInfos & infos) override;Path addToStore(const string & name, const Path & srcPath,bool recursive = true, HashType hashAlgo = htSHA256,PathFilter & filter = defaultPathFilter, bool repair = false) override{ abort(); }Path addTextToStore(const string & name, const string & s,const PathSet & references, bool repair = false) override{ abort(); }void exportPath(const Path & path, bool sign,Sink & sink) override;Paths importPaths(bool requireSignature, Source & source) override;Path importPath(Source & source);void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override;BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv,BuildMode buildMode = bmNormal) override{ abort(); }void ensurePath(const Path & path) override;void addTempRoot(const Path & path) override{ abort(); }void addIndirectRoot(const Path & path) override{ abort(); }void syncWithGC() override{ }Roots findRoots() override{ abort(); }void collectGarbage(const GCOptions & options, GCResults & results) override{ abort(); }PathSet queryFailedPaths() override{ return PathSet(); }void clearFailedPaths(const PathSet & paths) override{ }void optimiseStore() override{ }bool verifyStore(bool checkContents, bool repair) override{ return true; }};} - replacement in src/hydra-queue-runner/queue-monitor.cc at line 33
auto store = openStore(); // FIXME: poolauto localStore = getLocalStore();auto destStore = getDestStore(); - replacement in src/hydra-queue-runner/queue-monitor.cc at line 39
bool done = getQueuedBuilds(*conn, store, lastBuildId);bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId); - replacement in src/hydra-queue-runner/queue-monitor.cc at line 67
bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int & lastBuildId)bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,ref<Store> destStore, unsigned int & lastBuildId) - replacement in src/hydra-queue-runner/queue-monitor.cc at line 123
if (!store->isValidPath(build->drvPath)) {if (!localStore->isValidPath(build->drvPath)) { - replacement in src/hydra-queue-runner/queue-monitor.cc at line 143
Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);Step::ptr step = createStep(destStore, conn, build, build->drvPath,build, 0, finishedDrvs, newSteps, newRunnable); - replacement in src/hydra-queue-runner/queue-monitor.cc at line 162
BuildOutput res = getBuildOutput(store, drv);BuildOutput res = getBuildOutput(destStore, drv); - replacement in src/hydra-queue-runner/queue-monitor.cc at line 320
Step::ptr State::createStep(ref<Store> store,Step::ptr State::createStep(ref<Store> destStore, - replacement in src/hydra-queue-runner/queue-monitor.cc at line 400
if (!store->isValidPath(i.second.path)) {if (!destStore->isValidPath(i.second.path)) { - replacement in src/hydra-queue-runner/queue-monitor.cc at line 412
store->querySubstitutablePathInfos(missingPaths, infos);destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME - replacement in src/hydra-queue-runner/queue-monitor.cc at line 420
store->ensurePath(i.second.path);destStore->ensurePath(i.second.path); - replacement in src/hydra-queue-runner/queue-monitor.cc at line 449
auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable); - edit in src/hydra-queue-runner/state.hh at line 352
/* Return a store object that can access derivations produced byhydra-evaluator. */nix::ref<nix::Store> getLocalStore();/* Return a store object to store build results. */nix::ref<nix::Store> getDestStore(); - replacement in src/hydra-queue-runner/state.hh at line 387
bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> store, unsigned int & lastBuildId);bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,nix::ref<nix::Store> destStore, unsigned int & lastBuildId); - replacement in src/hydra-queue-runner/state.hh at line 416
bool doBuildStep(nix::ref<nix::Store> store, Step::ptr step,bool doBuildStep(nix::ref<nix::Store> destStore, Step::ptr step, - replacement in src/hydra-queue-runner/state.hh at line 419
void buildRemote(nix::ref<nix::Store> store,void buildRemote(nix::ref<nix::Store> destStore,