Refactor
[?]
Jul 7, 2015, 8:17 AM
HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NACDependencies
- [2]
HPJKBFZ4Handle concurrent finishing of the same build - [3]
WFYMBNWBMove "created" field into Step::State - [4]
FULDVXE2Periodically dump/log status - [5]
4D7CHQ34createStep(): Cache finished derivations - [6]
7VQ4ALFYUpdate "make check" for the new queue runner - [7]
A2GL5FOZMoar stats - [8]
SK6WHODMSupport preferLocalBuild - [9]
WKJFPR77hydra-queue-runner: Maintain count of active build steps - [10]
XV4AEKJChydra-queue-runner: Handle status queries on the main thread - [11]
SODOV2CMAutomatically reload $NIX_REMOTE_SYSTEMS when it changes - [12]
UQQ4IL55Add a error type for "unsupported system type" - [13]
22LDPAIPCheck non-runnable steps for unsupported system type - [14]
RQUAATWBAdd status dump facility - [15]
GKZN4UV7Make the queue monitor more robust, and better debug output - [16]
MB3TISH2Rate-limit the number of threads copying closures at the same time - [17]
ATJ54SPXUse PostgreSQL notifications for queue events - [18]
ENXUSMSVMake concurrency more robust - [19]
OCZ4LSGGAutomatically retry aborted builds - [20]
NNOCZ4ROhydra-queue-runner: Improve dispatcher - [21]
LE4VZIY5More stats - [22]
CCHPYTCPhydra-queue-runner: Handle $HYDRA_DBI - [23]
PLOZBRTRAdd command ‘hydra-queue-runner --status’ to show current status - [24]
K5G5GZY7Guard against concurrent invocations of hydra-queue-runner - [25]
63W4T5PUhydra-queue-runner: More stats - [26]
T2EIYJNGOn SIGINT, shut down the builder threads - [27]
C6HOMHZWDon't try to handle SIGINT - [28]
O64P4XJSKeep per-machine stats - [29]
HUUZFPPKFix race between the queue monitor and the builder threads - [30]
HHOMBU7Ghydra-queue-runner: Implement timeouts - [31]
FKLICOHYPrefer cached failure over unsupported system type - [32]
GS4BE6TBAsynchronously compress build logs - [33]
PQFOMNTLhydra-queue-runner: More stats - [34]
24BMQDZAStart of single-process hydra-queue-runner - [35]
IE2PRAQUhydra-queue-runner: Send build notifications - [36]
YZAI5GQUImplement a database connection pool - [37]
IWB3F4Z6Fail builds with previously failed steps early - [38]
N5O7VEEOImmediately abort builds that require an unsupported system type - [39]
7LB6QBXYKeep track of the number of build steps that are being built - [40]
FQQRJUO4Mark builds as busy - [41]
5AIYUMTBBasic remote building - [42]
KBZHIGLGRecord the machine used for a build step - [43]
NJJ7H64SVery basic multi-threaded queue runner - [44]
HLSHCK3CSupport requiredSystemFeatures - [45]
2IQRXLWESupport cancelling builds
Change contents
- replacement in src/hydra-queue-runner/Makefile.am at line 4
build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hhbuild-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh - file addition: db.hh[9.187]
#pragma once#include <pqxx/pqxx>#include "util.hh"using namespace nix;struct Connection : pqxx::connection{Connection() : pqxx::connection(getFlags()) { };string getFlags(){string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");string prefix = "dbi:Pg:";if (string(s, 0, prefix.size()) != prefix)throw Error("$HYDRA_DBI does not denote a PostgreSQL database");return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));}};struct receiver : public pqxx::notification_receiver{bool status = false;receiver(pqxx::connection_base & c, const std::string & channel): pqxx::notification_receiver(c, channel) { }void operator() (const string & payload, int pid) override{status = true;};bool get() {bool b = status;status = false;return b;}}; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1
#include <atomic>#include <condition_variable> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 2[9.4879]→[9.4879:4894](∅→∅),[9.4894]→[9.630:647](∅→∅),[9.647]→[9.250:268](∅→∅),[9.4894]→[9.250:268](∅→∅)
#include <map>#include <queue>#include <memory> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 4
#include <chrono> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 6
#include <pqxx/pqxx> - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 12[9.5851]→[9.0:19](∅→∅),[9.4943]→[9.0:19](∅→∅),[9.19]→[9.0:19](∅→∅),[9.19]→[9.468:490](∅→∅),[9.490]→[9.798:825](∅→∅)
#include "sync.hh"#include "pool.hh"#include "counter.hh"#include "token-server.hh"#include "state.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 14
#include "store-api.hh"#include "derivations.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 17
#include "pathlocks.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 26
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 27
typedef std::chrono::time_point<std::chrono::system_clock> system_time; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 28
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 33[9.1]→[9.1:2](∅→∅),[9.45]→[9.907:908](∅→∅),[9.907]→[9.907:908](∅→∅),[9.206]→[9.5060:5180](∅→∅),[9.909]→[9.5060:5180](∅→∅),[9.5060]→[9.5060:5180](∅→∅),[9.5180]→[9.324:344](∅→∅),[9.344]→[9.0:23](∅→∅),[9.5180]→[9.0:23](∅→∅),[9.23]→[9.5180:5271](∅→∅),[9.5180]→[9.5180:5271](∅→∅),[9.5271]→[9.345:366](∅→∅),[9.366]→[9.24:48](∅→∅),[9.5271]→[9.24:48](∅→∅),[9.48]→[9.5271:5364](∅→∅),[9.5271]→[9.5271:5364](∅→∅),[9.5364]→[9.0:403](∅→∅),[9.403]→[9.0:397](∅→∅),[9.397]→[9.403:409](∅→∅),[9.403]→[9.403:409](∅→∅),[9.409]→[9.5421:5456](∅→∅),[9.5421]→[9.5421:5456](∅→∅),[9.5456]→[9.124:125](∅→∅),[9.125]→[9.5456:5457](∅→∅),[9.5456]→[9.5456:5457](∅→∅),[9.5457]→[9.126:139](∅→∅),[9.139]→[9.5457:5458](∅→∅),[9.5457]→[9.5457:5458](∅→∅),[9.5458]→[9.140:141](∅→∅),[9.141]→[9.5458:5623](∅→∅),[9.5458]→[9.5458:5623](∅→∅),[9.5623]→[9.142:171](∅→∅),[9.171]→[9.367:413](∅→∅),[9.413]→[9.171:208](∅→∅),[9.171]→[9.171:208](∅→∅),[9.5647]→[9.5647:5648](∅→∅),[9.5648]→[9.0:42](∅→∅),[9.42]→[9.209:229](∅→∅),[9.5686]→[9.209:229](∅→∅),[9.229]→[9.0:65](∅→∅),[9.65]→[9.294:300](∅→∅),[9.294]→[9.294:300](∅→∅),[9.300]→[9.5686:5782](∅→∅),[9.5686]→[9.5686:5782](∅→∅),[9.5782]→[9.301:302](∅→∅),[9.302]→[9.5782:5820](∅→∅),[9.5782]→[9.5782:5820](∅→∅),[9.5820]→[9.0:50](∅→∅),[9.50]→[8.0:27](∅→∅),[8.27]→[9.5820:5821](∅→∅),[9.50]→[9.5820:5821](∅→∅),[9.5820]→[9.5820:5821](∅→∅),[9.5821]→[9.303:326](∅→∅),[9.326]→[3.0:91](∅→∅),[3.91]→[9.326:626](∅→∅),[9.326]→[9.326:626](∅→∅)
typedef enum {bsSuccess = 0,bsFailed = 1,bsDepFailed = 2,bsAborted = 3,bsFailedWithOutput = 6,bsTimedOut = 7,bsUnsupported = 9,} BuildStatus;typedef enum {bssSuccess = 0,bssFailed = 1,bssAborted = 4,bssTimedOut = 7,bssUnsupported = 9,bssBusy = 100, // not stored} BuildStepStatus;struct Connection : pqxx::connection{Connection() : pqxx::connection(getFlags()) { };string getFlags(){string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");string prefix = "dbi:Pg:";if (string(s, 0, prefix.size()) != prefix)throw Error("$HYDRA_DBI does not denote a PostgreSQL database");return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));}};struct receiver : public pqxx::notification_receiver{bool status = false;receiver(pqxx::connection_base & c, const std::string & channel): pqxx::notification_receiver(c, channel) { }void operator() (const string & payload, int pid) override{status = true;};bool get() {bool b = status;status = false;return b;}};typedef unsigned int BuildID;struct Step;struct Build{typedef std::shared_ptr<Build> ptr;typedef std::weak_ptr<Build> wptr;BuildID id;Path drvPath;std::map<string, Path> outputs;std::string fullJobName;unsigned int maxSilentTime, buildTimeout;std::shared_ptr<Step> toplevel;std::atomic_bool finishedInDB{false};~Build(){printMsg(lvlDebug, format("destroying build %1%") % id);}};struct Step{typedef std::shared_ptr<Step> ptr;typedef std::weak_ptr<Step> wptr;Path drvPath;Derivation drv;std::set<std::string> requiredSystemFeatures;bool preferLocalBuild;struct State{/* Whether the step has finished initialisation. */bool created = false;/* The build steps on which this step depends. */std::set<Step::ptr> deps;/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 34
/* Number of times we've tried this step. */unsigned int tries = 0; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 35[9.1341]→[9.1341:1433](∅→∅),[9.1433]→[9.626:633](∅→∅),[9.626]→[9.626:633](∅→∅),[9.633]→[9.5905:5906](∅→∅),[9.5905]→[9.5905:5906](∅→∅),[9.93]→[9.93:144](∅→∅),[9.144]→[9.5852:5853](∅→∅),[9.657]→[9.5852:5853](∅→∅),[9.5853]→[9.145:168](∅→∅),[9.168]→[9.5885:5886](∅→∅),[9.5885]→[9.5885:5886](∅→∅),[9.5886]→[9.169:187](∅→∅),[9.187]→[2.0:71](∅→∅),[2.71]→[9.256:262](∅→∅),[9.256]→[9.256:262](∅→∅),[9.262]→[9.5927:6162](∅→∅),[9.1450]→[9.5927:6162](∅→∅),[9.5927]→[9.5927:6162](∅→∅),[9.6162]→[9.0:311](∅→∅),[9.68]→[9.6:136](∅→∅),[9.202]→[9.6:136](∅→∅),[9.311]→[9.6:136](∅→∅),[9.6]→[9.6:136](∅→∅),[9.136]→[9.51:94](∅→∅),[9.94]→[8.28:211](∅→∅),[8.211]→[9.200:338](∅→∅),[9.200]→[9.200:338](∅→∅),[9.338]→[9.169:190](∅→∅),[9.169]→[9.169:190](∅→∅),[9.134]→[9.6776:6787](∅→∅),[9.6776]→[9.6776:6787](∅→∅),[9.6787]→[9.6102:6125](∅→∅),[9.6102]→[9.6102:6125](∅→∅),[9.6125]→[9.910:911](∅→∅),[9.911]→[9.6788:6816](∅→∅),[9.6816]→[8.212:243](∅→∅),[9.94]→[9.947:948](∅→∅),[9.123]→[9.947:948](∅→∅),[8.243]→[9.947:948](∅→∅),[9.947]→[9.947:948](∅→∅),[9.948]→[9.6125:6154](∅→∅),[9.6125]→[9.6125:6154](∅→∅),[9.6154]→[9.752:827](∅→∅),[9.827]→[9.6196:6264](∅→∅),[9.6196]→[9.6196:6264](∅→∅),[9.6264]→[9.828:1049](∅→∅),[9.1049]→[9.6327:6385](∅→∅),[9.6327]→[9.6327:6385](∅→∅),[9.6385]→[9.1050:1123](∅→∅),[9.172]→[9.95:96](∅→∅),[9.96]→[9.6817:6939](∅→∅),[9.6939]→[9.96:163](∅→∅),[9.96]→[9.96:163](∅→∅),[9.163]→[9.1019:1020](∅→∅),[9.172]→[9.1019:1020](∅→∅),[9.1168]→[9.1019:1020](∅→∅),[9.1019]→[9.1019:1020](∅→∅),[9.1020]→[9.6940:6970](∅→∅),[9.6970]→[9.312:365](∅→∅),[9.365]→[6.813:874](∅→∅),[6.874]→[9.366:424](∅→∅),[9.7045]→[9.366:424](∅→∅),[9.424]→[9.987:1173](∅→∅),[9.7045]→[9.987:1173](∅→∅),[9.1173]→[9.7045:7046](∅→∅),[9.7045]→[9.7045:7046](∅→∅),[9.2]→[9.2:27](∅→∅),[9.27]→[9.203:225](∅→∅),[9.225]→[9.46:132](∅→∅),[9.27]→[9.46:132](∅→∅),[9.132]→[9.207:237](∅→∅),[9.237]→[9.491:523](∅→∅),[9.523]→[9.550:618](∅→∅),[9.237]→[9.132:187](∅→∅),[9.523]→[9.132:187](∅→∅),[9.618]→[9.132:187](∅→∅),[9.132]→[9.132:187](∅→∅),[9.187]→[9.0:146](∅→∅),[9.146]→[9.187:254](∅→∅),[9.187]→[9.187:254](∅→∅),[9.254]→[7.1185:1241](∅→∅),[7.1241]→[9.714:852](∅→∅),[9.254]→[9.714:852](∅→∅),[9.852]→[9.0:469](∅→∅),[9.64]→[9.7253:7254](∅→∅),[9.254]→[9.7253:7254](∅→∅),[9.469]→[9.7253:7254](∅→∅),[9.852]→[9.7253:7254](∅→∅),[9.1633]→[9.7253:7254](∅→∅),[9.7253]→[9.7253:7254](∅→∅),[9.7254]→[6.875:961](∅→∅),[6.961]→[9.6420:6441](∅→∅),[9.1020]→[9.6420:6441](∅→∅),[9.7254]→[9.6420:6441](∅→∅),[9.6420]→[9.6420:6441](∅→∅),[9.6456]→[9.6456:6457](∅→∅),[9.6457]→[9.29:38](∅→∅),[9.38]→[9.7280:7281](∅→∅),[9.890]→[9.7280:7281](∅→∅),[9.7280]→[9.7280:7281](∅→∅),[9.7281]→[9.39:95](∅→∅),[9.95]→[9.900:901](∅→∅),[9.900]→[9.900:901](∅→∅),[9.901]→[9.425:584](∅→∅),[9.37]→[9.6539:6634](∅→∅),[9.221]→[9.6539:6634](∅→∅),[9.584]→[9.6539:6634](∅→∅),[9.926]→[9.6539:6634](∅→∅),[9.6539]→[9.6539:6634](∅→∅),[9.6634]→[9.0:133](∅→∅),[9.133]→[9.6730:6731](∅→∅),[9.6730]→[9.6730:6731](∅→∅),[9.6731]→[9.7282:7389](∅→∅),[9.7389]→[9.134:262](∅→∅),[9.262]→[9.6911:6991](∅→∅),[9.6911]→[9.6911:6991](∅→∅),[9.6991]→[9.173:198](∅→∅),[9.198]→[9.0:1](∅→∅),[9.1]→[9.136:166](∅→∅),[9.166]→[9.1:107](∅→∅),[9.1]→[9.1:107](∅→∅),[9.107]→[9.1057:1058](∅→∅),[9.198]→[9.1057:1058](∅→∅),[9.1057]→[9.1057:1058](∅→∅),[9.1058]→[9.108:159](∅→∅),[9.87]→[9.7042:7043](∅→∅),[9.159]→[9.7042:7043](∅→∅),[9.281]→[9.7042:7043](∅→∅),[9.1142]→[9.7042:7043](∅→∅),[9.7042]→[9.7042:7043](∅→∅),[9.7043]→[9.1169:1249](∅→∅),[9.1249]→[5.0:91](∅→∅),[5.91]→[9.0:76](∅→∅),[9.323]→[9.0:76](∅→∅),[9.1249]→[9.0:76](∅→∅),[9.76]→[9.7091:7092](∅→∅),[9.1224]→[9.7091:7092](∅→∅),[9.1293]→[9.7091:7092](∅→∅),[9.7091]→[9.7091:7092](∅→∅),[9.7263]→[9.1225:1264](∅→∅),[9.1264]→[9.7288:7289](∅→∅),[9.7288]→[9.7288:7289](∅→∅),[9.7289]→[9.7390:7475](∅→∅),[9.7475]→[9.1304:1305](∅→∅),[9.1304]→[9.1304:1305](∅→∅),[9.1305]→[9.7476:7503](∅→∅),[9.1376]→[9.7327:7328](∅→∅),[9.7503]→[9.7327:7328](∅→∅),[9.7327]→[9.7327:7328](∅→∅),[9.7328]→[9.585:685](∅→∅),[9.685]→[9.7633:7634](∅→∅),[9.7633]→[9.7633:7634](∅→∅),[9.7634]→[9.1634:1793](∅→∅),[9.1793]→[9.7704:7736](∅→∅),[9.7704]→[9.7704:7736](∅→∅),[9.7736]→[9.7328:7481](∅→∅),[9.7328]→[9.7328:7481](∅→∅),[9.7481]→[9.0:65](∅→∅),[9.65]→[9.65:66](∅→∅),[9.66]→[9.927:1020](∅→∅),[9.1020]→[9.470:596](∅→∅),[9.596]→[9.25:166](∅→∅),[9.1020]→[9.25:166](∅→∅),[9.166]→[9.96:97](∅→∅),[9.1020]→[9.96:97](∅→∅),[9.97]→[4.0:50](∅→∅),[4.50]→[9.1020:1029](∅→∅),[9.137]→[9.1020:1029](∅→∅),[9.1020]→[9.1020:1029](∅→∅),[9.1029]→[9.138:162](∅→∅),[9.162]→[9.1029:1030](∅→∅),[9.1029]→[9.1029:1030](∅→∅),[9.1030]→[9.163:182](∅→∅),[9.65]→[9.1377:1378](∅→∅),[9.89]→[9.1377:1378](∅→∅),[9.182]→[9.1377:1378](∅→∅),[9.7481]→[9.1377:1378](∅→∅),[9.1378]→[6.962:998](∅→∅),[6.998]→[9.7481:7486](∅→∅),[9.1394]→[9.7481:7486](∅→∅),[9.7481]→[9.7481:7486](∅→∅)
/* Point in time after which the step can be retried. */system_time after;};std::atomic_bool finished{false}; // debuggingSync<State> state;~Step(){//printMsg(lvlError, format("destroying step %1%") % drvPath);}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;struct State {typedef std::shared_ptr<State> ptr;counter currentJobs{0};counter nrStepsDone{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for steps};State::ptr state;bool supportsStep(Step::ptr step){if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;for (auto & f : mandatoryFeatures)if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()&& !(step->preferLocalBuild && f == "local"))return false;for (auto & f : step->requiredSystemFeatures)if (supportedFeatures.find(f) == supportedFeatures.end()) return false;return true;}};class State{private:Path hydraData, logDir;StringSet localPlatforms;/* The queued builds. */typedef std::map<BuildID, Build::ptr> Builds;Sync<Builds> builds;/* All active or pending build steps (i.e. dependencies of thequeued builds). Note that these are weak pointers. Steps arekept alive by being reachable from Builds or by being inprogress. */typedef std::map<Path, Step::wptr> Steps;Sync<Steps> steps;/* Build steps that have no unbuilt dependencies. */typedef std::list<Step::wptr> Runnable;Sync<Runnable> runnable;/* CV for waking up the dispatcher. */std::condition_variable dispatcherWakeup;std::mutex dispatcherMutex;/* PostgreSQL connection pool. */Pool<Connection> dbPool;/* The build machines. */typedef std::map<string, Machine::ptr> Machines;Sync<Machines> machines; // FIXME: use atomic_shared_ptrPath machinesFile;struct stat machinesFileStat;/* Token server limiting the number of threads copying closures inparallel to prevent excessive I/O load. */TokenServer copyClosureTokenServer{maxParallelCopyClosure};/* Various stats. */time_t startedAt;counter nrBuildsRead{0};counter nrBuildsDone{0};counter nrStepsDone{0};counter nrActiveSteps{0};counter nrStepsBuilding{0};counter nrStepsCopyingTo{0};counter nrStepsCopyingFrom{0};counter nrRetries{0};counter maxNrRetries{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for stepscounter nrQueueWakeups{0};counter nrDispatcherWakeups{0};counter bytesSent{0};counter bytesReceived{0};/* Log compressor work queue. */Sync<std::queue<Path>> logCompressorQueue;std::condition_variable_any logCompressorWakeup;/* Notification sender work queue. FIXME: if hydra-queue-runner iskilled before it has finished sending notifications about abuild, then the notifications may be lost. It would be betterto mark builds with pending notification in the database. */typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;Sync<std::queue<NotificationItem>> notificationSenderQueue;std::condition_variable_any notificationSenderWakeup;/* Specific build to do for --build-one (testing only). */BuildID buildOne;public:State();private:void clearBusy(Connection & conn, time_t stopTime);/* (Re)load /etc/nix/machines. */void loadMachinesFile();/* Thread to reload /etc/nix/machines periodically. */void monitorMachinesFile();int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",BuildID propagatedFrom = 0);void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,const std::string & machine, BuildStepStatus status, const string & errorMsg = "",BuildID propagatedFrom = 0);void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);void queueMonitor();void queueMonitorLoop();void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);void removeCancelledBuilds(Connection & conn);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);void makeRunnable(Step::ptr step);/* The thread that selects and starts runnable builds. */void dispatcher();void wakeDispatcher();void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);/* Perform the given build step. Return true if the step is to beretried. */bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine);void markSucceededBuild(pqxx::work & txn, Build::ptr build,const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);bool checkCachedFailure(Step::ptr step, Connection & conn);/* Thread that asynchronously bzips logs of finished steps. */void logCompressor();/* Thread that asynchronously invokes hydra-notify to send buildnotifications. */void notificationSender();/* Acquire the global queue runner lock, or null if somebody elsehas it. */std::shared_ptr<PathLocks> acquireGlobalLock();void dumpStatus(Connection & conn, bool log);public:void showStatus();void unlock();void run(BuildID buildOne = 0);}; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 36
: copyClosureTokenServer{maxParallelCopyClosure} - file addition: state.hh[9.187]
#pragma once#include <atomic>#include <chrono>#include <condition_variable>#include <map>#include <memory>#include <queue>#include "db.hh"#include "counter.hh"#include "pathlocks.hh"#include "pool.hh"#include "sync.hh"#include "token-server.hh"#include "store-api.hh"#include "derivations.hh"using namespace nix;typedef unsigned int BuildID;typedef std::chrono::time_point<std::chrono::system_clock> system_time;typedef enum {bsSuccess = 0,bsFailed = 1,bsDepFailed = 2,bsAborted = 3,bsFailedWithOutput = 6,bsTimedOut = 7,bsUnsupported = 9,} BuildStatus;typedef enum {bssSuccess = 0,bssFailed = 1,bssAborted = 4,bssTimedOut = 7,bssUnsupported = 9,bssBusy = 100, // not stored} BuildStepStatus;struct Step;struct BuildResult;struct Build{typedef std::shared_ptr<Build> ptr;typedef std::weak_ptr<Build> wptr;BuildID id;Path drvPath;std::map<string, Path> outputs;std::string fullJobName;unsigned int maxSilentTime, buildTimeout;std::shared_ptr<Step> toplevel;std::atomic_bool finishedInDB{false};};struct Step{typedef std::shared_ptr<Step> ptr;typedef std::weak_ptr<Step> wptr;Path drvPath;Derivation drv;std::set<std::string> requiredSystemFeatures;bool preferLocalBuild;struct State{/* Whether the step has finished initialisation. */bool created = false;/* The build steps on which this step depends. */std::set<Step::ptr> deps;/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds;/* Number of times we've tried this step. */unsigned int tries = 0;/* Point in time after which the step can be retried. */system_time after;};std::atomic_bool finished{false}; // debuggingSync<State> state;~Step(){//printMsg(lvlError, format("destroying step %1%") % drvPath);}};struct Machine{typedef std::shared_ptr<Machine> ptr;std::string sshName, sshKey;std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;unsigned int maxJobs = 1;float speedFactor = 1.0;struct State {typedef std::shared_ptr<State> ptr;counter currentJobs{0};counter nrStepsDone{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for steps};State::ptr state;bool supportsStep(Step::ptr step){if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;for (auto & f : mandatoryFeatures)if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()&& !(step->preferLocalBuild && f == "local"))return false;for (auto & f : step->requiredSystemFeatures)if (supportedFeatures.find(f) == supportedFeatures.end()) return false;return true;}};class State{private:Path hydraData, logDir;StringSet localPlatforms;/* The queued builds. */typedef std::map<BuildID, Build::ptr> Builds;Sync<Builds> builds;/* All active or pending build steps (i.e. dependencies of thequeued builds). Note that these are weak pointers. Steps arekept alive by being reachable from Builds or by being inprogress. */typedef std::map<Path, Step::wptr> Steps;Sync<Steps> steps;/* Build steps that have no unbuilt dependencies. */typedef std::list<Step::wptr> Runnable;Sync<Runnable> runnable;/* CV for waking up the dispatcher. */std::condition_variable dispatcherWakeup;std::mutex dispatcherMutex;/* PostgreSQL connection pool. */Pool<Connection> dbPool;/* The build machines. */typedef std::map<string, Machine::ptr> Machines;Sync<Machines> machines; // FIXME: use atomic_shared_ptrPath machinesFile;struct stat machinesFileStat;/* Token server limiting the number of threads copying closures inparallel to prevent excessive I/O load. */TokenServer copyClosureTokenServer;/* Various stats. */time_t startedAt;counter nrBuildsRead{0};counter nrBuildsDone{0};counter nrStepsDone{0};counter nrActiveSteps{0};counter nrStepsBuilding{0};counter nrStepsCopyingTo{0};counter nrStepsCopyingFrom{0};counter nrRetries{0};counter maxNrRetries{0};counter totalStepTime{0}; // total time for steps, including closure copyingcounter totalStepBuildTime{0}; // total build time for stepscounter nrQueueWakeups{0};counter nrDispatcherWakeups{0};counter bytesSent{0};counter bytesReceived{0};/* Log compressor work queue. */Sync<std::queue<Path>> logCompressorQueue;std::condition_variable_any logCompressorWakeup;/* Notification sender work queue. FIXME: if hydra-queue-runner iskilled before it has finished sending notifications about abuild, then the notifications may be lost. It would be betterto mark builds with pending notification in the database. */typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;Sync<std::queue<NotificationItem>> notificationSenderQueue;std::condition_variable_any notificationSenderWakeup;/* Specific build to do for --build-one (testing only). */BuildID buildOne;public:State();private:void clearBusy(Connection & conn, time_t stopTime);/* (Re)load /etc/nix/machines. */void loadMachinesFile();/* Thread to reload /etc/nix/machines periodically. */void monitorMachinesFile();int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",BuildID propagatedFrom = 0);void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,const std::string & machine, BuildStepStatus status, const string & errorMsg = "",BuildID propagatedFrom = 0);void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);void queueMonitor();void queueMonitorLoop();void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);void removeCancelledBuilds(Connection & conn);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);void makeRunnable(Step::ptr step);/* The thread that selects and starts runnable builds. */void dispatcher();void wakeDispatcher();void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);/* Perform the given build step. Return true if the step is to beretried. */bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,Machine::ptr machine);void markSucceededBuild(pqxx::work & txn, Build::ptr build,const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);bool checkCachedFailure(Step::ptr step, Connection & conn);/* Thread that asynchronously bzips logs of finished steps. */void logCompressor();/* Thread that asynchronously invokes hydra-notify to send buildnotifications. */void notificationSender();/* Acquire the global queue runner lock, or null if somebody elsehas it. */std::shared_ptr<PathLocks> acquireGlobalLock();void dumpStatus(Connection & conn, bool log);public:void showStatus();void unlock();void run(BuildID buildOne = 0);};