Rate-limit the number of threads copying closures at the same time
[?]
Jun 22, 2015, 11:49 PM
MB3TISH2KYBIGY6XJKMN4HO2S6TCN2GORJENMECCKLXGGIRS2O2ACDependencies
- [2]
5LBMP7GAFix remote building - [3]
7LB6QBXYKeep track of the number of build steps that are being built - [4]
OCZ4LSGGAutomatically retry aborted builds - [5]
ENXUSMSVMake concurrency more robust - [6]
RYTQLATYKeep track of failed paths in the Hydra database - [7]
HHOMBU7Ghydra-queue-runner: Implement timeouts - [8]
5AIYUMTBBasic remote building - [9]
24BMQDZAStart of single-process hydra-queue-runner
Change contents
- edit in src/hydra-queue-runner/build-remote.cc at line 63
TokenServer & copyClosureTokenServer, - edit in src/hydra-queue-runner/build-remote.cc at line 92
/* Ensure that only a limited number of threads can copy closuresat the same time. However, proceed anyway after a timeout toprevent starvation by a handful of really huge closures. */time_t start = time(0);int timeout = 60 * (10 + rand() % 5);auto token(copyClosureTokenServer.get(timeout));time_t stop = time(0);if (token())printMsg(lvlDebug, format("got copy closure token after %1%s") % (stop - start));elseprintMsg(lvlDebug, format("dit not get copy closure token after %1%s") % (stop - start)); - edit in src/hydra-queue-runner/build-remote.cc at line 131
TokenServer & copyClosureTokenServer, - replacement in src/hydra-queue-runner/build-remote.cc at line 181
copyClosureTo(store, from, to, inputs);copyClosureTo(store, from, to, inputs, copyClosureTokenServer); - edit in src/hydra-queue-runner/build-remote.hh at line 7
#include "token-server.hh" - edit in src/hydra-queue-runner/build-remote.hh at line 26
TokenServer & copyClosureTokenServer, - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 23
#include "token-server.hh" - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 35
const int maxTries = 5;const int retryInterval = 60; // seconds// FIXME: Make configurable.const unsigned int maxTries = 5;const unsigned int retryInterval = 60; // seconds - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 39
const unsigned int maxParallelCopyClosure = 4; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 248
/* Token server limiting the number of threads copying closures inparallel to prevent excessive I/O load. */TokenServer copyClosureTokenServer{maxParallelCopyClosure}; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 1110
logDir, build->maxSilentTime, build->buildTimeout, result, nrStepsBuilding);logDir, build->maxSilentTime, build->buildTimeout, copyClosureTokenServer,result, nrStepsBuilding); - edit in src/hydra-queue-runner/sync.hh at line 5
#include <cassert> - edit in src/hydra-queue-runner/sync.hh at line 53
}template<class Rep, class Period, class Predicate>bool wait_for(std::condition_variable_any & cv,const std::chrono::duration<Rep, Period> & duration,Predicate pred){assert(s);return cv.wait_for(s->mutex, duration, pred); - file addition: token-server.hh[4.187]
#pragma once#include <atomic>#include "sync.hh"/* This class hands out tokens. There are only ‘maxTokens’ tokensavailable. Calling get() will return a Token object, representingownership of a token. If no token is available, get() will sleepuntil another thread returns a token. */class TokenServer{unsigned int maxTokens;Sync<unsigned int> curTokens{0};std::condition_variable_any wakeup;public:TokenServer(unsigned int maxTokens) : maxTokens(maxTokens) { }class Token{friend TokenServer;TokenServer * ts;bool acquired = false;Token(TokenServer * ts, unsigned int timeout) : ts(ts){auto curTokens(ts->curTokens.lock());while (*curTokens >= ts->maxTokens)if (timeout) {if (!curTokens.wait_for(ts->wakeup, std::chrono::seconds(timeout),[&]() { return *curTokens < ts->maxTokens; }))return;} elsecurTokens.wait(ts->wakeup);(*curTokens)++;acquired = true;}public:Token(Token && t) : ts(t.ts) { t.ts = 0; }Token(const Token & l) = delete;~Token(){if (!ts || !acquired) return;{auto curTokens(ts->curTokens.lock());assert(*curTokens);(*curTokens)--;}ts->wakeup.notify_one();}bool operator ()() { return acquired; }};Token get(unsigned int timeout = 0){return Token(this, timeout);}};