Rate-limit the number of threads copying closures at the same time

[?]
Jun 22, 2015, 11:49 PM
MB3TISH2KYBIGY6XJKMN4HO2S6TCN2GORJENMECCKLXGGIRS2O2AC

Dependencies

  • [2] 5LBMP7GA Fix remote building
  • [3] 7LB6QBXY Keep track of the number of build steps that are being built
  • [4] OCZ4LSGG Automatically retry aborted builds
  • [5] ENXUSMSV Make concurrency more robust
  • [6] RYTQLATY Keep track of failed paths in the Hydra database
  • [7] HHOMBU7G hydra-queue-runner: Implement timeouts
  • [8] 5AIYUMTB Basic remote building
  • [9] 24BMQDZA Start of single-process hydra-queue-runner

Change contents

  • edit in src/hydra-queue-runner/build-remote.cc at line 63
    [4.1406]
    [4.1406]
    TokenServer & copyClosureTokenServer,
  • edit in src/hydra-queue-runner/build-remote.cc at line 92
    [4.2296]
    [4.0]
    /* Ensure that only a limited number of threads can copy closures
    at the same time. However, proceed anyway after a timeout to
    prevent starvation by a handful of really huge closures. */
    time_t start = time(0);
    int timeout = 60 * (10 + rand() % 5);
    auto token(copyClosureTokenServer.get(timeout));
    time_t stop = time(0);
    if (token())
    printMsg(lvlDebug, format("got copy closure token after %1%s") % (stop - start));
    else
    printMsg(lvlDebug, format("dit not get copy closure token after %1%s") % (stop - start));
  • edit in src/hydra-queue-runner/build-remote.cc at line 131
    [4.85]
    [3.0]
    TokenServer & copyClosureTokenServer,
  • replacement in src/hydra-queue-runner/build-remote.cc at line 181
    [4.853][2.335:379]()
    copyClosureTo(store, from, to, inputs);
    [4.853]
    [4.4064]
    copyClosureTo(store, from, to, inputs, copyClosureTokenServer);
  • edit in src/hydra-queue-runner/build-remote.hh at line 7
    [3.176]
    [4.5314]
    #include "token-server.hh"
  • edit in src/hydra-queue-runner/build-remote.hh at line 26
    [4.295]
    [3.177]
    TokenServer & copyClosureTokenServer,
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 23
    [3.490]
    [4.19]
    #include "token-server.hh"
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 35
    [4.22][4.1080:1145]()
    const int maxTries = 5;
    const int retryInterval = 60; // seconds
    [4.22]
    [4.1145]
    // FIXME: Make configurable.
    const unsigned int maxTries = 5;
    const unsigned int retryInterval = 60; // seconds
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 39
    [4.1177]
    [4.22]
    const unsigned int maxParallelCopyClosure = 4;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 248
    [4.7045]
    [4.7045]
    /* Token server limiting the number of threads copying closures in
    parallel to prevent excessive I/O load. */
    TokenServer copyClosureTokenServer{maxParallelCopyClosure};
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 1110
    [4.1161][3.524:617]()
    logDir, build->maxSilentTime, build->buildTimeout, result, nrStepsBuilding);
    [4.1161]
    [4.1108]
    logDir, build->maxSilentTime, build->buildTimeout, copyClosureTokenServer,
    result, nrStepsBuilding);
  • edit in src/hydra-queue-runner/sync.hh at line 5
    [4.9782]
    [4.9782]
    #include <cassert>
  • edit in src/hydra-queue-runner/sync.hh at line 53
    [4.10721]
    [4.10721]
    }
    template<class Rep, class Period, class Predicate>
    bool wait_for(std::condition_variable_any & cv,
    const std::chrono::duration<Rep, Period> & duration,
    Predicate pred)
    {
    assert(s);
    return cv.wait_for(s->mutex, duration, pred);
  • file addition: token-server.hh (----------)
    [4.187]
    #pragma once
    #include <atomic>
    #include "sync.hh"
    /* This class hands out tokens. There are only ‘maxTokens’ tokens
    available. Calling get() will return a Token object, representing
    ownership of a token. If no token is available, get() will sleep
    until another thread returns a token. */
    class TokenServer
    {
    unsigned int maxTokens;
    Sync<unsigned int> curTokens{0};
    std::condition_variable_any wakeup;
    public:
    TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }
    class Token
    {
    friend TokenServer;
    TokenServer * ts;
    bool acquired = false;
    Token(TokenServer * ts, unsigned int timeout) : ts(ts)
    {
    auto curTokens(ts->curTokens.lock());
    while (*curTokens >= ts->maxTokens)
    if (timeout) {
    if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout),
    [&]() { return *curTokens < ts->maxTokens; }))
    return;
    } else
    curTokens.wait(ts->wakeup);
    (*curTokens)++;
    acquired = true;
    }
    public:
    Token(Token && t) : ts(t.ts) { t.ts = 0; }
    Token(const Token & l) = delete;
    ~Token()
    {
    if (!ts || !acquired) return;
    {
    auto curTokens(ts->curTokens.lock());
    assert(*curTokens);
    (*curTokens)--;
    }
    ts->wakeup.notify_one();
    }
    bool operator ()() { return acquired; }
    };
    Token get(unsigned int timeout = 0)
    {
    return Token(this, timeout);
    }
    };