#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <memory>
#include <queue>
#include "db.hh"
#include "parsed-derivations.hh"
#include "pathlocks.hh"
#include "pool.hh"
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
typedef unsigned int BuildID;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
typedef std::atomic<unsigned long> counter;
typedef enum {
bsSuccess = 0,
bsFailed = 1,
bsDepFailed = 2, bsAborted = 3,
bsCancelled = 4,
bsFailedWithOutput = 6, bsTimedOut = 7,
bsCachedFailure = 8, bsUnsupported = 9,
bsLogLimitExceeded = 10,
bsNarSizeLimitExceeded = 11,
bsNotDeterministic = 12,
bsBusy = 100, } BuildStatus;
typedef enum {
ssPreparing = 1,
ssConnecting = 10,
ssSendingInputs = 20,
ssBuilding = 30,
ssReceivingOutputs = 40,
ssPostProcessing = 50,
} StepState;
struct RemoteResult
{
BuildStatus stepStatus = bsAborted;
bool canRetry = false; bool isCached = false; bool canCache = false; std::string errorMsg;
unsigned int timesBuilt = 0;
bool isNonDeterministic = false;
time_t startTime = 0, stopTime = 0;
unsigned int overhead = 0;
nix::Path logFile;
BuildStatus buildStatus() const
{
return stepStatus == bsCachedFailure ? bsFailed : stepStatus;
}
};
struct Step;
struct BuildOutput;
class Jobset
{
public:
typedef std::shared_ptr<Jobset> ptr;
typedef std::weak_ptr<Jobset> wptr;
static const time_t schedulingWindow = 24 * 60 * 60;
private:
std::atomic<time_t> seconds{0};
std::atomic<unsigned int> shares{1};
nix::Sync<std::map<time_t, time_t>> steps;
public:
double shareUsed()
{
return (double) seconds / shares;
}
void setShares(int shares_)
{
assert(shares_ > 0);
shares = shares_;
}
time_t getSeconds() { return seconds; }
void addStep(time_t startTime, time_t duration);
void pruneSteps();
};
struct Build
{
typedef std::shared_ptr<Build> ptr;
typedef std::weak_ptr<Build> wptr;
BuildID id;
nix::StorePath drvPath;
std::map<std::string, nix::StorePath> outputs;
int jobsetId;
std::string projectName, jobsetName, jobName;
time_t timestamp;
unsigned int maxSilentTime, buildTimeout;
int localPriority, globalPriority;
std::shared_ptr<Step> toplevel;
Jobset::ptr jobset;
std::atomic_bool finishedInDB{false};
Build(nix::StorePath && drvPath) : drvPath(std::move(drvPath))
{ }
std::string fullJobName()
{
return projectName + ":" + jobsetName + ":" + jobName;
}
void propagatePriorities();
};
struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
nix::StorePath drvPath;
std::unique_ptr<nix::Derivation> drv;
std::unique_ptr<nix::ParsedDerivation> parsedDrv;
std::set<std::string> requiredSystemFeatures;
bool preferLocalBuild;
bool isDeterministic;
std::string systemType;
struct State
{
bool created = false;
std::set<Step::ptr> deps;
std::vector<Step::wptr> rdeps;
std::vector<Build::wptr> builds;
std::set<Jobset::ptr> jobsets;
unsigned int tries = 0;
system_time after;
int highestGlobalPriority{0};
int highestLocalPriority{0};
BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
system_time runnableSince;
system_time lastSupported = std::chrono::system_clock::now();
};
std::atomic_bool finished{false};
nix::Sync<State> state;
Step(const nix::StorePath & drvPath) : drvPath(drvPath)
{ }
~Step()
{
}
};
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps);
void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);
struct Machine
{
typedef std::shared_ptr<Machine> ptr;
bool enabled{true};
std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
std::string sshPublicHostKey;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
counter nrStepsDone{0};
counter totalStepTime{0}; counter totalStepBuildTime{0}; std::atomic<time_t> idleSince{0};
struct ConnectInfo
{
system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures;
};
nix::Sync<ConnectInfo> connectInfo;
std::timed_mutex sendLock;
};
State::ptr state;
bool supportsStep(Step::ptr step)
{
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false;
for (auto & f : mandatoryFeatures)
if (!step->requiredSystemFeatures.count(f)
&& !(f == "local" && step->preferLocalBuild))
return false;
for (auto & f : step->requiredSystemFeatures)
if (!supportedFeatures.count(f)) return false;
return true;
}
bool isLocalhost()
{
return sshName == "localhost";
}
};
class HydraConfig;
class State
{
private:
std::unique_ptr<HydraConfig> config;
const unsigned int maxTries = 5;
const unsigned int retryInterval = 60; const float retryBackoff = 3.0;
const unsigned int maxParallelCopyClosure = 4;
const unsigned int maxUnsupportedTime = 0;
nix::Path hydraData, logDir;
bool useSubstitutes = false;
typedef std::map<BuildID, Build::ptr> Builds;
nix::Sync<Builds> builds;
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
nix::Sync<Jobsets> jobsets;
typedef std::map<nix::StorePath, Step::wptr> Steps;
nix::Sync<Steps> steps;
typedef std::list<Step::wptr> Runnable;
nix::Sync<Runnable> runnable;
nix::Sync<bool> dispatcherWakeup;
std::condition_variable dispatcherWakeupCV;
nix::Pool<Connection> dbPool;
typedef std::map<std::string, Machine::ptr> Machines;
nix::Sync<Machines> machines;
time_t startedAt;
counter nrBuildsRead{0};
counter buildReadTimeMs{0};
counter nrBuildsDone{0};
counter nrStepsStarted{0};
counter nrStepsDone{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
counter nrStepsWaiting{0};
counter nrUnsupportedSteps{0};
counter nrRetries{0};
counter maxNrRetries{0};
counter totalStepTime{0}; counter totalStepBuildTime{0}; counter nrQueueWakeups{0};
counter nrDispatcherWakeups{0};
counter dispatchTimeMs{0};
counter bytesSent{0};
counter bytesReceived{0};
counter nrActiveDbUpdates{0};
BuildID buildOne;
bool buildOneDone = false;
struct MachineType
{
unsigned int runnable{0}, running{0};
system_time lastActive;
std::chrono::seconds waitTime; };
nix::Sync<std::map<std::string, MachineType>> machineTypes;
struct MachineReservation
{
typedef std::shared_ptr<MachineReservation> ptr;
State & state;
Step::ptr step;
Machine::ptr machine;
MachineReservation(State & state, Step::ptr step, Machine::ptr machine);
~MachineReservation();
};
struct ActiveStep
{
Step::ptr step;
struct State
{
pid_t pid = -1;
bool cancelled = false;
};
nix::Sync<State> state_;
};
nix::Sync<std::set<std::shared_ptr<ActiveStep>>> activeSteps_;
std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> localStore;
std::shared_ptr<nix::Store> _destStore;
size_t maxOutputSize;
size_t maxLogSize;
nix::Sync<std::set<std::pair<BuildID, int>>> orphanedSteps;
std::map<std::pair<std::string, std::string>, unsigned int> jobsetRepeats;
bool uploadLogsToBinaryCache;
nix::Path rootsDir;
public:
State();
private:
nix::MaintainCount<counter> startDbUpdate();
nix::ref<nix::Store> getDestStore();
void clearBusy(Connection & conn, time_t stopTime);
void parseMachines(const std::string & contents);
void monitorMachinesFile();
unsigned int allocBuildStep(pqxx::work & txn, BuildID buildId);
unsigned int createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);
void updateBuildStep(pqxx::work & txn, BuildID buildId, unsigned int stepNr, StepState stepState);
void finishBuildStep(pqxx::work & txn, const RemoteResult & result, BuildID buildId, unsigned int stepNr,
const std::string & machine);
int createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const nix::StorePath & drvPath, const std::string & outputName, const nix::StorePath & storePath);
void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
void queueMonitor();
void queueMonitorLoop();
bool getQueuedBuilds(Connection & conn,
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
void processQueueChange(Connection & conn);
BuildOutput getBuildOutputCached(Connection & conn, nix::ref<nix::Store> destStore,
const nix::Derivation & drv);
Step::ptr createStep(nix::ref<nix::Store> store,
Connection & conn, Build::ptr build, const nix::StorePath & drvPath,
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::StorePath> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void failStep(
Connection & conn,
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
bool & stepFinished);
Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName, const int & jobsetID);
void processJobsetSharesChange(Connection & conn);
void makeRunnable(Step::ptr step);
void dispatcher();
system_time doDispatch();
void wakeDispatcher();
void abortUnsupported();
void builder(MachineReservation::ptr reservation);
enum StepResult { sDone, sRetry, sMaybeCancelled };
StepResult doBuildStep(nix::ref<nix::Store> destStore,
MachineReservation::ptr reservation,
std::shared_ptr<ActiveStep> activeStep);
void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step,
unsigned int maxSilentTime, unsigned int buildTimeout,
unsigned int repeats,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers);
void markSucceededBuild(pqxx::work & txn, Build::ptr build,
const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime);
bool checkCachedFailure(Step::ptr step, Connection & conn);
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);
std::shared_ptr<nix::PathLocks> acquireGlobalLock();
void dumpStatus(Connection & conn);
void addRoot(const nix::StorePath & storePath);
public:
void showStatus();
void unlock();
void run(BuildID buildOne = 0);
};