HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC
HPJKBFZ4C24DNBFESAVVFHOBHXOPYOP4XNPFVGOLA7EPLZCIRS6QC
WFYMBNWBSHJ6GF7MPAGKT5H2CMNLNTQLBVJGVA5N6J4IWEL4ND3QC
FULDVXE2XVKVC3BB2GOE2N2YZ4GW7LT26TIBXKKOVPUXGDOA5WYQC
4D7CHQ345ZSI52GPNW73JOYALRLFEGI2YUDWKG4UCDE55MO5LQ2AC
7VQ4ALFYKJBFR46T3WZDMGOXNRR3QNJEJQVBYJM4HSJUOOUD6WBQC
A2GL5FOZ3UJ2NM5RPRWTNPFTKLBA54B2UC6UIYO4M3N3RFNC4BTAC
SK6WHODMWAUL7LNH6QTUXZW7GQFNKKDYN4BU45ODTMMKYMG6O7KAC
24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC
NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHAC
GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC
OCZ4LSGGSCMSLGC3C32D5JUYYHS5CIPOKOAMADEFAFZOFXJ3YY3AC
5AIYUMTBY6TFQTBRP3MJ2PYWUMRF57I77NIVWYE74UMEVQMBWZVQC
ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC
YZAI5GQU3HNMK5MEGF2Y7WS445AN4YKD3HNJQVQP545ODN3F5DLAC
7LB6QBXYOGU43UDLJDJQZFGT4XDALULXDF3WX3WWHL7X3JTB54CQC
MB3TISH2KYBIGY6XJKMN4HO2S6TCN2GORJENMECCKLXGGIRS2O2AC
PLOZBRTR6USSGJX7GR2RZKNPVYG2Q6QM7LW6IA35MKL63ZTQVD7QC
K5G5GZY7D7KWVR5RAGZFHH3ZPG5OCLZT4HZ6XIJJ7YYVUMC2CTZQC
PQFOMNTLFY4HINJFAQYIFTBTSDRST6W2WNCVKE5ITR2IDF4SWWXQC
WKJFPR77WNFJHZ65NV3DDMCYYUUWUSI3BHTVCZZDWQI6L3FVP5AQC
HHOMBU7GGRAEXODSDY3WUHQGOSQ35OTGRNBWKKAS2D4YEIZTTNUAC
UQQ4IL55WHYMXNSPOXEFBTZAPMP7LQ726THOR7INRCJDSYVOP3ZAC
CCHPYTCPXUHLSQWDBDVZNFTUSMRLW45DKYTVCU3XKYE3OU5RPUEQC
XV4AEKJCFTNCOR52IRFYHORCNNFKIHOADIUARTC2U5Z6ZQBEEFYQC
HUUZFPPKGHTXFZMZCO2UGWYNGEED3E2CFHQRFQVVBJGPQVGVY4UAC
GKZN4UV75GV7GEHQGBM2O6UHN6CVSHA7BMASCUQSDIDYVUKKZL7AC
HLSHCK3CP4RB6VO6N44FBHWMXBJYB6XPNWM5EKCKWU3KISU37N3AC
SODOV2CMWA4JMIKRQNJ6MD3U3BS2XTSLINLRAG4SFY742IIJNI5QC
NNOCZ4ROWC64ZKSAE2MPHZ3LGLI34C5TJJFW4MHXN6OELK6VMWOQC
N5O7VEEOY2IE27VCOYRBG7YCY3K7JMQEDEMRT4OQ2MUE3NWULHHQC
O64P4XJSK56UN73VA5JUAM4RHFDWOZJDDYKVOKMUB736ODOAUYNAC
T2EIYJNGPIANHKJ4HBJIPTINWKG7RDLHR3PVHFYAPPLHZAJQBVWAC
RQUAATWBGEP3YT4F555XLJYRRRGHDTEILHFORES7AM2XAOVMVJSAC
LE4VZIY5VZ52FOP5QQRIJINWIMWTAPRTZTGO77JXUEPGRPRSQYMAC
63W4T5PUSRHU53CVVTVRWAQX6T74RIHH636NCGUGPN3YFVMC3VTAC
IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC
FQQRJUO4C7655SFAKPRPILALVEVRQSIIVBLMQUFPODUBXPIMWYSAC
KBZHIGLGHGLST5AZZDEJTYBJSQNE2XYNHEN2FN6XMAMY5BJYZR6QC
2IQRXLWETU2RLXPYKBSZIUULDHS346BAHE2NLOMOWXC6WDAX2BYQC
ATJ54SPXPE2IIFRERUOBFF42HBSEADP4QOI743ZBUNBQX3PYKRXQC
22LDPAIPRVSZVZXEDCM54GUOH72VQ52EIY47PQV2VELZ3NORC5IQC
IWB3F4Z6QZYHQFJ6FWZTGWLCPBYEUTFLUS3F7QT7JOA4DV4YLYGAC
#pragma once
#include <pqxx/pqxx>
#include "util.hh"
using namespace nix;
struct Connection : pqxx::connection
{
Connection() : pqxx::connection(getFlags()) { };
string getFlags()
{
string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
string prefix = "dbi:Pg:";
if (string(s, 0, prefix.size()) != prefix)
throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
}
};
struct receiver : public pqxx::notification_receiver
{
bool status = false;
receiver(pqxx::connection_base & c, const std::string & channel)
: pqxx::notification_receiver(c, channel) { }
void operator() (const string & payload, int pid) override
{
status = true;
};
bool get() {
bool b = status;
status = false;
return b;
}
};
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2,
bsAborted = 3,
bsFailedWithOutput = 6,
bsTimedOut = 7,
bsUnsupported = 9,
} BuildStatus;
typedef enum {
bssSuccess = 0,
bssFailed = 1,
bssAborted = 4,
bssTimedOut = 7,
bssUnsupported = 9,
bssBusy = 100, // not stored
} BuildStepStatus;
struct Connection : pqxx::connection
{
Connection() : pqxx::connection(getFlags()) { };
string getFlags()
{
string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
string prefix = "dbi:Pg:";
if (string(s, 0, prefix.size()) != prefix)
throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
}
};
struct receiver : public pqxx::notification_receiver
{
bool status = false;
receiver(pqxx::connection_base & c, const std::string & channel)
: pqxx::notification_receiver(c, channel) { }
void operator() (const string & payload, int pid) override
{
status = true;
};
bool get() {
bool b = status;
status = false;
return b;
}
};
typedef unsigned int BuildID;
struct Step;
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
Path drvPath;
std::map<string, Path> outputs;
std::string fullJobName;
unsigned int maxSilentTime, buildTimeout;
std::shared_ptr<Step> toplevel;
std::atomic_bool finishedInDB{false};
~Build()
{
printMsg(lvlDebug, format("destroying build %1%") % id);
}
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
Path drvPath;
Derivation drv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Point in time after which the step can be retried. */
system_time after;
};
std::atomic_bool finished{false}; // debugging
Sync<State> state;
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
for (auto & f : mandatoryFeatures)
if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
&& !(step->preferLocalBuild && f == "local"))
return false;
for (auto & f : step->requiredSystemFeatures)
if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
return true;
}
};
class State
{
private:
Path hydraData, logDir;
StringSet localPlatforms;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<Path, Step::wptr> Steps;
Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
/* The build machines. */
typedef std::map<string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr
Path machinesFile;
struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer{maxParallelCopyClosure};
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter nrBuildsDone{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter bytesSent{0};
counter bytesReceived{0};
/* Log compressor work queue. */
Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
public:
State();
private:
void clearBusy(Connection & conn, time_t stopTime);
/* (Re)load /etc/nix/machines. */
void loadMachinesFile();
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
void wakeDispatcher();
void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else
has it. */
std::shared_ptr<PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn, bool log);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include "db.hh"
#include "counter.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "sync.hh"
#include "token-server.hh"
#include "store-api.hh"
#include "derivations.hh"
using namespace nix;
typedef unsigned int BuildID;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2,
bsAborted = 3,
bsFailedWithOutput = 6,
bsTimedOut = 7,
bsUnsupported = 9,
} BuildStatus;
typedef enum {
bssSuccess = 0,
bssFailed = 1,
bssAborted = 4,
bssTimedOut = 7,
bssUnsupported = 9,
bssBusy = 100, // not stored
} BuildStepStatus;
struct Step;
struct BuildResult;
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
Path drvPath;
std::map<string, Path> outputs;
std::string fullJobName;
unsigned int maxSilentTime, buildTimeout;
std::shared_ptr<Step> toplevel;
std::atomic_bool finishedInDB{false};
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
Path drvPath;
Derivation drv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
struct State
{
/* Whether the step has finished initialisation. */
bool created = false;
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
};
std::atomic_bool finished{false}; // debugging
Sync<State> state;
~Step()
{
//printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
for (auto & f : mandatoryFeatures)
if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
&& !(step->preferLocalBuild && f == "local"))
return false;
for (auto & f : step->requiredSystemFeatures)
if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
return true;
}
};
class State
{
private:
Path hydraData, logDir;
StringSet localPlatforms;
/* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<Path, Step::wptr> Steps;
Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
/* CV for waking up the dispatcher. */
std::condition_variable dispatcherWakeup;
std::mutex dispatcherMutex;
/* PostgreSQL connection pool. */
Pool<Connection> dbPool;
/* The build machines. */
typedef std::map<string, Machine::ptr> Machines;
Sync<Machines> machines; // FIXME: use atomic_shared_ptr
Path machinesFile;
struct stat machinesFileStat;
/* Token server limiting the number of threads copying closures in
parallel to prevent excessive I/O load. */
TokenServer copyClosureTokenServer;
/* Various stats. */
time_t startedAt;
counter nrBuildsRead{0};
counter nrBuildsDone{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps
counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter bytesSent{0};
counter bytesReceived{0};
/* Log compressor work queue. */
Sync<std::queue<Path>> logCompressorQueue;
std::condition_variable_any logCompressorWakeup;
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable_any notificationSenderWakeup;
/* Specific build to do for --build-one (testing only). */
BuildID buildOne;
public:
State();
private:
void clearBusy(Connection & conn, time_t stopTime);
/* (Re)load /etc/nix/machines. */
void loadMachinesFile();
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */
void dispatcher();
void wakeDispatcher();
void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
/* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
/* Thread that asynchronously bzips logs of finished steps. */
void logCompressor();
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
/* Acquire the global queue runner lock, or null if somebody else
has it. */
std::shared_ptr<PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn, bool log);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};