hydra-queue-runner: Write directly to a binary cache

[?]
Feb 15, 2016, 8:10 PM
73YR46NJNYZQKHA3QDJCAZYAKC2CGEF5LIS44NOIPDZU6FX6BDPQC

Dependencies

  • [2] DWFTK56E Keep track of how many threads are waiting
  • [3] X6FOUYFJ int2String -> std::to_string
  • [4] DKJFD6JN Process Nix API changes
  • [5] 7VQ4ALFY Update "make check" for the new queue runner
  • [6] H7SZRHUB Use nix::willBuildLocally()
  • [7] 6TY4LNHH Finish copyClosure
  • [8] MHVIT4JY Split hydra-queue-runner.cc more
  • [9] WE5Q2NVI Allow build to be bumped to the front of the queue via the web interface
  • [10] HJOEIMLR Refactor
  • [11] EYR3EW6J Keep stats for the Hydra auto scaler
  • [12] IK2UBDAU Revive jobset scheduling
  • [13] MB3TISH2 Rate-limit the number of threads copying closures at the same time
  • [14] 24BMQDZA Start of single-process hydra-queue-runner
  • [15] A2GL5FOZ Moar stats
  • [16] 4CQWOODY Don't abort steps that have an unsupported system type
  • [17] CNLNT3T4 Allow only 1 thread to send a closure to a given machine at the same time
  • [18] ZK76B5ZZ Load the queue in order of global priority
  • [19] LE4VZIY5 More stats
  • [20] YR2IM6Y5 Temporarily disable machines after a connection failure
  • [21] O3NM62IZ Support multiple machines files
  • [22] TPNHTE5V Remove obsolete Builds columns and provide accurate "Running builds"
  • [23] N4IROACV Move buildRemote() into State
  • [24] ACBS7C6Q hydra-queue-runner: Detect changes to the scheduling shares
  • [25] OCZ4LSGG Automatically retry aborted builds
  • [26] VQISTKOP hydra-queue-runner: Use substitutes
  • [27] OG3Z3QGC Namespace cleanup
  • [28] RND7XFNH getQueuedBuilds(): Periodically stop to handle priority bumps
  • [29] SODOV2CM Automatically reload $NIX_REMOTE_SYSTEMS when it changes
  • [30] NAYQT2GT hydra-queue-runner: Use cmdBuildDerivation
  • [31] 5AIYUMTB Basic remote building
  • [*] YHP5DSOO Improve parsing of hydra-build-products
  • [*] T5BIOVJE Add support for tracking custom metrics
  • [*] ENXUSMSV Make concurrency more robust

Change contents

  • replacement in src/hydra-queue-runner/Makefile.am at line 5
    [5.131][5.0:75](),[5.85][5.0:75]()
    build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh
    [5.131]
    [5.322]
    build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh \
    local-binary-cache.hh local-binary-cache.cc
  • replacement in src/hydra-queue-runner/build-remote.cc at line 76
    [5.1290][4.139:183]()
    static void copyClosureTo(ref<Store> store,
    [5.1290]
    [5.1349]
    static void copyClosureTo(ref<Store> destStore,
  • replacement in src/hydra-queue-runner/build-remote.cc at line 83
    [5.1492][4.184:232]()
    store->computeFSClosure(path, closure);
    [5.1492]
    [5.1593]
    destStore->computeFSClosure(path, closure);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 98
    [5.83][4.233:283]()
    Paths sorted = store->topoSortPaths(closure);
    [5.83]
    [5.134]
    Paths sorted = destStore->topoSortPaths(closure);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 107
    [5.94][5.94:148]()
    bytesSent += store->queryPathInfo(p).narSize;
    [5.94]
    [5.285]
    bytesSent += destStore->queryPathInfo(p).narSize;
  • replacement in src/hydra-queue-runner/build-remote.cc at line 110
    [5.91][4.284:328]()
    store->exportPaths(missing, false, to);
    [5.91]
    [5.365]
    destStore->exportPaths(missing, false, to);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 118
    [5.2443][4.329:375]()
    static void copyClosureFrom(ref<Store> store,
    [5.2443]
    [5.149]
    static void copyClosureFrom(ref<Store> destStore,
  • replacement in src/hydra-queue-runner/build-remote.cc at line 123
    [5.2680][5.2680:2717]()
    store->importPaths(false, from);
    [5.2680]
    [5.232]
    destStore->importPaths(false, from);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 126
    [5.260][5.260:318]()
    bytesReceived += store->queryPathInfo(p).narSize;
    [5.260]
    [5.2717]
    bytesReceived += destStore->queryPathInfo(p).narSize;
  • replacement in src/hydra-queue-runner/build-remote.cc at line 130
    [5.2721][4.376:418]()
    void State::buildRemote(ref<Store> store,
    [5.2721]
    [5.173]
    void State::buildRemote(ref<Store> destStore,
  • edit in src/hydra-queue-runner/build-remote.cc at line 224
    [5.755]
    [5.3876]
    /* Ensure that the inputs exist in the destination store. This is
    a no-op for regular stores, but for the binary cache store,
    this will copy the inputs to the binary cache from the local
    store. */
    destStore->buildPaths(basicDrv.inputSrcs);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 232
    [5.3911][5.830:873]()
    if (machine->sshName != "localhost") {
    [5.3911]
    [2.0]
    if (/* machine->sshName != "localhost" */ true) {
  • replacement in src/hydra-queue-runner/build-remote.cc at line 238
    [2.251][5.98:157](),[5.98][5.98:157]()
    copyClosureTo(store, from, to, inputs, bytesSent);
    [2.251]
    [5.277]
    copyClosureTo(destStore, from, to, inputs, bytesSent);
  • replacement in src/hydra-queue-runner/build-remote.cc at line 286
    [5.4825][5.1243:1286]()
    if (machine->sshName != "localhost") {
    [5.4825]
    [5.1286]
    if (/* machine->sshName != "localhost" */ true) {
  • replacement in src/hydra-queue-runner/build-remote.cc at line 292
    [5.336][5.932:998]()
    copyClosureFrom(store, from, to, outputs, bytesReceived);
    [5.336]
    [5.387]
    copyClosureFrom(destStore, from, to, outputs, bytesReceived);
  • edit in src/hydra-queue-runner/build-result.cc at line 44
    [5.1146]
    [33.21]
    #if 0
  • edit in src/hydra-queue-runner/build-result.cc at line 101
    [5.3219]
    [5.3219]
    #endif
  • edit in src/hydra-queue-runner/build-result.cc at line 113
    [5.3665]
    [5.3665]
    #if 0
  • edit in src/hydra-queue-runner/build-result.cc at line 118
    [5.3865]
    [5.3865]
    #endif
  • edit in src/hydra-queue-runner/build-result.cc at line 123
    [5.3931]
    [5.3931]
    #if 0
  • edit in src/hydra-queue-runner/build-result.cc at line 147
    [34.1208]
    [34.1208]
    #endif
  • replacement in src/hydra-queue-runner/builder.cc at line 18
    [5.399][5.399:448](),[5.448][5.95:159]()
    auto store = openStore(); // FIXME: pool
    retry = doBuildStep(store, step, reservation->machine);
    [5.399]
    [5.499]
    auto destStore = getDestStore();
    retry = doBuildStep(destStore, step, reservation->machine);
  • replacement in src/hydra-queue-runner/builder.cc at line 48
    [5.1479][4.631:694]()
    bool State::doBuildStep(nix::ref<Store> store, Step::ptr step,
    [5.1479]
    [5.1552]
    bool State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
  • replacement in src/hydra-queue-runner/builder.cc at line 123
    [5.4376][5.4376:4474]()
    buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result);
    [5.4376]
    [5.4474]
    buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result);
  • replacement in src/hydra-queue-runner/builder.cc at line 129
    [5.4608][5.4608:4678]()
    if (result.success()) res = getBuildOutput(store, step->drv);
    [5.4608]
    [5.4678]
    if (result.success()) res = getBuildOutput(destStore, step->drv);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 10
    [5.20065]
    [35.19]
    #include "local-binary-cache.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 25
    [5.7892]
    [5.7503]
    }
    ref<Store> State::getLocalStore()
    {
    return openStore(); // FIXME: pool
    }
    ref<Store> State::getDestStore()
    {
    return make_ref<LocalBinaryCache>(getLocalStore(), "/tmp/binary-cache");
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 110
    [5.309][5.0:57]()
    parseMachines("localhost " + settings.thisSystem
    [5.309]
    [3.0]
    parseMachines("localhost " +
    (settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem)
  • file addition: local-binary-cache.cc (----------)
    [5.187]
    #include "local-binary-cache.hh"
    #include "archive.hh"
    #include "derivations.hh"
    #include "globals.hh"
    #include "worker-protocol.hh"
    namespace nix {
    LocalBinaryCache::LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir)
    : localStore(localStore), binaryCacheDir(binaryCacheDir)
    {
    createDirs(binaryCacheDir + "/nar");
    }
    Path LocalBinaryCache::narInfoFileFor(const Path & storePath)
    {
    assertStorePath(storePath);
    return binaryCacheDir + "/" + storePathToHash(storePath) + ".narinfo";
    }
    void atomicWrite(const Path & path, const std::string & s)
    {
    Path tmp = path + ".tmp." + std::to_string(getpid());
    AutoDelete del(tmp, false);
    writeFile(tmp, s);
    if (rename(tmp.c_str(), path.c_str()))
    throw SysError(format("renaming ‘%1%’ to ‘%2%’") % tmp % path);
    del.cancel();
    }
    void LocalBinaryCache::addToCache(const ValidPathInfo & info,
    const string & nar)
    {
    size_t narSize = nar.size();
    Hash narHash = hashString(htSHA256, nar);
    if (info.hash.type != htUnknown && info.hash != narHash)
    throw Error(format("refusing to copy corrupted path ‘%1%’ to binary cache") % info.path);
    printMsg(lvlTalkative, format("copying path ‘%1%’ (%2% bytes) to binary cache")
    % info.path % narSize);
    /* Atomically write the NAR file. */
    string narFileRel = "nar/" + printHash(narHash) + ".nar";
    Path narFile = binaryCacheDir + "/" + narFileRel;
    if (!pathExists(narFile)) atomicWrite(narFile, nar);
    /* Atomically write the NAR info file.*/
    Path narInfoFile = narInfoFileFor(info.path);
    if (!pathExists(narInfoFile)) {
    Strings refs;
    for (auto & r : info.references)
    refs.push_back(baseNameOf(r));
    std::string narInfo;
    narInfo += "StorePath: " + info.path + "\n";
    narInfo += "URL: " + narFileRel + "\n";
    narInfo += "Compression: none\n";
    narInfo += "FileHash: sha256:" + printHash(narHash) + "\n";
    narInfo += "FileSize: " + std::to_string(narSize) + "\n";
    narInfo += "NarHash: sha256:" + printHash(narHash) + "\n";
    narInfo += "NarSize: " + std::to_string(narSize) + "\n";
    narInfo += "References: " + concatStringsSep(" ", refs) + "\n";
    // FIXME: add signature
    atomicWrite(narInfoFile, narInfo);
    }
    }
    LocalBinaryCache::NarInfo LocalBinaryCache::readNarInfo(const Path & storePath)
    {
    NarInfo res;
    Path narInfoFile = narInfoFileFor(storePath);
    if (!pathExists(narInfoFile))
    abort();
    std::string narInfo = readFile(narInfoFile);
    auto corrupt = [&]() {
    throw Error(format("corrupt NAR info file ‘%1%’") % narInfoFile);
    };
    size_t pos = 0;
    while (pos < narInfo.size()) {
    size_t colon = narInfo.find(':', pos);
    if (colon == std::string::npos) corrupt();
    std::string name(narInfo, pos, colon - pos);
    size_t eol = narInfo.find('\n', colon + 2);
    if (eol == std::string::npos) corrupt();
    std::string value(narInfo, colon + 2, eol - colon - 2);
    if (name == "StorePath") {
    res.info.path = value;
    if (value != storePath) corrupt();
    res.info.path = value;
    }
    else if (name == "References") {
    auto refs = tokenizeString<Strings>(value, " ");
    if (!res.info.references.empty()) corrupt();
    for (auto & r : refs)
    res.info.references.insert(settings.nixStore + "/" + r);
    }
    else if (name == "URL") {
    res.narUrl = value;
    }
    pos = eol + 1;
    }
    if (res.info.path.empty() || res.narUrl.empty()) corrupt();
    return res;
    }
    bool LocalBinaryCache::isValidPath(const Path & storePath)
    {
    Path narInfoFile = narInfoFileFor(storePath);
    printMsg(lvlDebug, format("checking %1% -> %2%") % storePath % narInfoFile);
    return pathExists(narInfoFile);
    }
    void LocalBinaryCache::exportPath(const Path & storePath, bool sign, Sink & sink)
    {
    assert(!sign);
    auto res = readNarInfo(storePath);
    auto nar = readFile(binaryCacheDir + "/" + res.narUrl);
    printMsg(lvlTalkative, format("exporting path ‘%1%’ (%2% bytes)") % storePath % nar.size());
    assert(nar.size() % 8 == 0);
    sink((unsigned char *) nar.c_str(), nar.size());
    // FIXME: check integrity of NAR.
    sink << exportMagic << storePath << res.info.references << res.info.deriver << 0;
    }
    Paths LocalBinaryCache::importPaths(bool requireSignature, Source & source)
    {
    assert(!requireSignature);
    Paths res;
    while (true) {
    unsigned long long n = readLongLong(source);
    if (n == 0) break;
    if (n != 1) throw Error("input doesn't look like something created by ‘nix-store --export’");
    res.push_back(importPath(source));
    }
    return res;
    }
    struct TeeSource : Source
    {
    Source & readSource;
    std::string data;
    TeeSource(Source & readSource) : readSource(readSource)
    {
    }
    size_t read(unsigned char * data, size_t len)
    {
    size_t n = readSource.read(data, len);
    this->data.append((char *) data, n);
    return n;
    }
    };
    struct NopSink : ParseSink
    {
    };
    Path LocalBinaryCache::importPath(Source & source)
    {
    /* FIXME: some cut&paste of LocalStore::importPath(). */
    /* Extract the NAR from the source. */
    TeeSource tee(source);
    NopSink sink;
    parseDump(sink, tee);
    uint32_t magic = readInt(source);
    if (magic != exportMagic)
    throw Error("Nix archive cannot be imported; wrong format");
    ValidPathInfo info;
    info.path = readStorePath(source);
    info.references = readStorePaths<PathSet>(source);
    readString(source); // deriver, don't care
    bool haveSignature = readInt(source) == 1;
    assert(!haveSignature);
    addToCache(info, tee.data);
    return info.path;
    }
    ValidPathInfo LocalBinaryCache::queryPathInfo(const Path & storePath)
    {
    return readNarInfo(storePath).info;
    }
    void LocalBinaryCache::querySubstitutablePathInfos(const PathSet & paths,
    SubstitutablePathInfos & infos)
    {
    PathSet left;
    for (auto & storePath : paths) {
    if (!localStore->isValidPath(storePath)) {
    left.insert(storePath);
    continue;
    }
    ValidPathInfo info = localStore->queryPathInfo(storePath);
    SubstitutablePathInfo sub;
    sub.references = info.references;
    sub.downloadSize = 0;
    sub.narSize = info.narSize;
    infos.emplace(storePath, sub);
    }
    localStore->querySubstitutablePathInfos(left, infos);
    }
    void LocalBinaryCache::buildPaths(const PathSet & paths, BuildMode buildMode)
    {
    for (auto & storePath : paths) {
    assert(!isDerivation(storePath));
    if (isValidPath(storePath)) continue;
    localStore->addTempRoot(storePath);
    if (!localStore->isValidPath(storePath))
    localStore->ensurePath(storePath);
    ValidPathInfo info = localStore->queryPathInfo(storePath);
    for (auto & ref : info.references)
    if (ref != storePath)
    ensurePath(ref);
    StringSink sink;
    dumpPath(storePath, sink);
    addToCache(info, sink.s);
    }
    }
    void LocalBinaryCache::ensurePath(const Path & path)
    {
    buildPaths({path});
    }
    }
  • file addition: local-binary-cache.hh (----------)
    [5.187]
    #pragma once
    #include "store-api.hh"
    namespace nix {
    class LocalBinaryCache : public nix::Store
    {
    private:
    ref<Store> localStore;
    Path binaryCacheDir;
    public:
    LocalBinaryCache(ref<Store> localStore, const Path & binaryCacheDir);
    private:
    Path narInfoFileFor(const Path & storePath);
    void addToCache(const ValidPathInfo & info, const string & nar);
    struct NarInfo
    {
    ValidPathInfo info;
    std::string narUrl;
    };
    NarInfo readNarInfo(const Path & storePath);
    public:
    bool isValidPath(const Path & path) override;
    PathSet queryValidPaths(const PathSet & paths) override
    { abort(); }
    PathSet queryAllValidPaths() override
    { abort(); }
    ValidPathInfo queryPathInfo(const Path & path) override;
    Hash queryPathHash(const Path & path) override
    { abort(); }
    void queryReferrers(const Path & path,
    PathSet & referrers) override
    { abort(); }
    Path queryDeriver(const Path & path) override
    { abort(); }
    PathSet queryValidDerivers(const Path & path) override
    { abort(); }
    PathSet queryDerivationOutputs(const Path & path) override
    { abort(); }
    StringSet queryDerivationOutputNames(const Path & path) override
    { abort(); }
    Path queryPathFromHashPart(const string & hashPart) override
    { abort(); }
    PathSet querySubstitutablePaths(const PathSet & paths) override
    { abort(); }
    void querySubstitutablePathInfos(const PathSet & paths,
    SubstitutablePathInfos & infos) override;
    Path addToStore(const string & name, const Path & srcPath,
    bool recursive = true, HashType hashAlgo = htSHA256,
    PathFilter & filter = defaultPathFilter, bool repair = false) override
    { abort(); }
    Path addTextToStore(const string & name, const string & s,
    const PathSet & references, bool repair = false) override
    { abort(); }
    void exportPath(const Path & path, bool sign,
    Sink & sink) override;
    Paths importPaths(bool requireSignature, Source & source) override;
    Path importPath(Source & source);
    void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override;
    BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv,
    BuildMode buildMode = bmNormal) override
    { abort(); }
    void ensurePath(const Path & path) override;
    void addTempRoot(const Path & path) override
    { abort(); }
    void addIndirectRoot(const Path & path) override
    { abort(); }
    void syncWithGC() override
    { }
    Roots findRoots() override
    { abort(); }
    void collectGarbage(const GCOptions & options, GCResults & results) override
    { abort(); }
    PathSet queryFailedPaths() override
    { return PathSet(); }
    void clearFailedPaths(const PathSet & paths) override
    { }
    void optimiseStore() override
    { }
    bool verifyStore(bool checkContents, bool repair) override
    { return true; }
    };
    }
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 33
    [5.20720][5.20720:20765]()
    auto store = openStore(); // FIXME: pool
    [5.20720]
    [5.20765]
    auto localStore = getLocalStore();
    auto destStore = getDestStore();
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 39
    [5.20820][5.0:64]()
    bool done = getQueuedBuilds(*conn, store, lastBuildId);
    [5.20820]
    [5.20872]
    bool done = getQueuedBuilds(*conn, localStore, destStore, lastBuildId);
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 67
    [5.21512][4.809:902]()
    bool State::getQueuedBuilds(Connection & conn, ref<Store> store, unsigned int & lastBuildId)
    [5.21512]
    [5.21620]
    bool State::getQueuedBuilds(Connection & conn, ref<Store> localStore,
    ref<Store> destStore, unsigned int & lastBuildId)
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 123
    [5.23144][5.23144:23195]()
    if (!store->isValidPath(build->drvPath)) {
    [5.23144]
    [5.23195]
    if (!localStore->isValidPath(build->drvPath)) {
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 143
    [5.24028][5.1282:1402]()
    Step::ptr step = createStep(store, conn, build, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable);
    [5.24028]
    [5.24135]
    Step::ptr step = createStep(destStore, conn, build, build->drvPath,
    build, 0, finishedDrvs, newSteps, newRunnable);
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 162
    [5.24924][5.24924:24982]()
    BuildOutput res = getBuildOutput(store, drv);
    [5.24924]
    [5.24982]
    BuildOutput res = getBuildOutput(destStore, drv);
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 320
    [5.29402][4.903:949]()
    Step::ptr State::createStep(ref<Store> store,
    [5.29402]
    [5.1464]
    Step::ptr State::createStep(ref<Store> destStore,
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 400
    [5.1670][5.31945:31995](),[5.31945][5.31945:31995]()
    if (!store->isValidPath(i.second.path)) {
    [5.1670]
    [5.31995]
    if (!destStore->isValidPath(i.second.path)) {
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 412
    [5.2120][5.2120:2185]()
    store->querySubstitutablePathInfos(missingPaths, infos);
    [5.2120]
    [5.2185]
    destStore->querySubstitutablePathInfos(missingPaths, infos); // FIXME
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 420
    [5.2491][5.2491:2545]()
    store->ensurePath(i.second.path);
    [5.2491]
    [5.2545]
    destStore->ensurePath(i.second.path);
  • replacement in src/hydra-queue-runner/queue-monitor.cc at line 449
    [5.32426][5.2967:3073]()
    auto dep = createStep(store, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
    [5.32426]
    [5.32519]
    auto dep = createStep(destStore, conn, build, i.first, 0, step, finishedDrvs, newSteps, newRunnable);
  • edit in src/hydra-queue-runner/state.hh at line 352
    [5.6809]
    [5.6809]
    /* Return a store object that can access derivations produced by
    hydra-evaluator. */
    nix::ref<nix::Store> getLocalStore();
    /* Return a store object to store build results. */
    nix::ref<nix::Store> getDestStore();
  • replacement in src/hydra-queue-runner/state.hh at line 387
    [5.2260][4.1058:1159]()
    bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> store, unsigned int & lastBuildId);
    [5.2260]
    [5.7732]
    bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> localStore,
    nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
  • replacement in src/hydra-queue-runner/state.hh at line 416
    [5.8377][4.1214:1279]()
    bool doBuildStep(nix::ref<nix::Store> store, Step::ptr step,
    [5.8377]
    [5.8447]
    bool doBuildStep(nix::ref<nix::Store> destStore, Step::ptr step,
  • replacement in src/hydra-queue-runner/state.hh at line 419
    [5.8479][4.1280:1329]()
    void buildRemote(nix::ref<nix::Store> store,
    [5.8479]
    [5.1875]
    void buildRemote(nix::ref<nix::Store> destStore,