Refactor

[?]
Jul 7, 2015, 8:17 AM
HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC

Dependencies

  • [2] HPJKBFZ4 Handle concurrent finishing of the same build
  • [3] WFYMBNWB Move "created" field into Step::State
  • [4] FULDVXE2 Periodically dump/log status
  • [5] 4D7CHQ34 createStep(): Cache finished derivations
  • [6] 7VQ4ALFY Update "make check" for the new queue runner
  • [7] A2GL5FOZ Moar stats
  • [8] SK6WHODM Support preferLocalBuild
  • [9] GS4BE6TB Asynchronously compress build logs
  • [10] HUUZFPPK Fix race between the queue monitor and the builder threads
  • [11] T2EIYJNG On SIGINT, shut down the builder threads
  • [12] YZAI5GQU Implement a database connection pool
  • [13] 2IQRXLWE Support cancelling builds
  • [14] XV4AEKJC hydra-queue-runner: Handle status queries on the main thread
  • [15] SODOV2CM Automatically reload $NIX_REMOTE_SYSTEMS when it changes
  • [16] PQFOMNTL hydra-queue-runner: More stats
  • [17] 5AIYUMTB Basic remote building
  • [18] UQQ4IL55 Add a error type for "unsupported system type"
  • [19] O64P4XJS Keep per-machine stats
  • [20] 24BMQDZA Start of single-process hydra-queue-runner
  • [21] PLOZBRTR Add command ‘hydra-queue-runner --status’ to show current status
  • [22] K5G5GZY7 Guard against concurrent invocations of hydra-queue-runner
  • [23] ATJ54SPX Use PostgreSQL notifications for queue events
  • [24] HLSHCK3C Support requiredSystemFeatures
  • [25] N5O7VEEO Immediately abort builds that require an unsupported system type
  • [26] ENXUSMSV Make concurrency more robust
  • [27] IWB3F4Z6 Fail builds with previously failed steps early
  • [28] HHOMBU7G hydra-queue-runner: Implement timeouts
  • [29] OCZ4LSGG Automatically retry aborted builds
  • [30] NJJ7H64S Very basic multi-threaded queue runner
  • [31] NNOCZ4RO hydra-queue-runner: Improve dispatcher
  • [32] 22LDPAIP Check non-runnable steps for unsupported system type
  • [33] C6HOMHZW Don't try to handle SIGINT
  • [34] 7LB6QBXY Keep track of the number of build steps that are being built
  • [35] GKZN4UV7 Make the queue monitor more robust, and better debug output
  • [36] RQUAATWB Add status dump facility
  • [37] LE4VZIY5 More stats
  • [38] FQQRJUO4 Mark builds as busy
  • [39] MB3TISH2 Rate-limit the number of threads copying closures at the same time
  • [40] 63W4T5PU hydra-queue-runner: More stats
  • [41] IE2PRAQU hydra-queue-runner: Send build notifications
  • [42] KBZHIGLG Record the machine used for a build step
  • [43] FKLICOHY Prefer cached failure over unsupported system type
  • [44] CCHPYTCP hydra-queue-runner: Handle $HYDRA_DBI
  • [45] WKJFPR77 hydra-queue-runner: Maintain count of active build steps

Change contents

  • replacement in src/hydra-queue-runner/Makefile.am at line 4
    [6.85][6.85:161]()
    build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh
    [6.85]
    [9.322]
    build-remote.hh build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh
  • file addition: db.hh (----------)
    [9.187]
    #pragma once
    #include <pqxx/pqxx>
    #include "util.hh"
    using namespace nix;
    struct Connection : pqxx::connection
    {
    Connection() : pqxx::connection(getFlags()) { };
    string getFlags()
    {
    string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
    string prefix = "dbi:Pg:";
    if (string(s, 0, prefix.size()) != prefix)
    throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
    return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
    }
    };
    struct receiver : public pqxx::notification_receiver
    {
    bool status = false;
    receiver(pqxx::connection_base & c, const std::string & channel)
    : pqxx::notification_receiver(c, channel) { }
    void operator() (const string & payload, int pid) override
    {
    status = true;
    };
    bool get() {
    bool b = status;
    status = false;
    return b;
    }
    };
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1
    [9.4840][9.201:249]()
    #include <atomic>
    #include <condition_variable>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 2
    [9.4879][9.4879:4894](),[9.4894][9.630:647](),[9.647][9.250:268](),[9.4894][9.250:268]()
    #include <map>
    #include <queue>
    #include <memory>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 4
    [9.1059][9.1059:1077]()
    #include <chrono>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 6
    [9.287][9.4894:4915](),[9.4894][9.4894:4915](),[9.4915][9.648:649]()
    #include <pqxx/pqxx>
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 12
    [9.5851][9.0:19](),[9.4943][9.0:19](),[9.19][9.0:19](),[9.19][9.468:490](),[9.490][9.798:825]()
    #include "sync.hh"
    #include "pool.hh"
    #include "counter.hh"
    #include "token-server.hh"
    [9.5851]
    [9.19]
    #include "state.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 14
    [9.20][9.4943:4993](),[9.4943][9.4943:4993]()
    #include "store-api.hh"
    #include "derivations.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 17
    [9.28][9.0:24]()
    #include "pathlocks.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 26
    [9.986][9.22:23](),[9.1177][9.22:23](),[9.22][9.22:23]()
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 27
    [9.1179][9.1179:1251]()
    typedef std::chrono::time_point<std::chrono::system_clock> system_time;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 28
    [9.1252][9.1252:1253]()
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 33
    [9.1][9.1:2](),[9.45][9.907:908](),[9.907][9.907:908](),[9.206][9.5060:5180](),[9.909][9.5060:5180](),[9.5060][9.5060:5180](),[9.5180][9.324:344](),[9.344][9.0:23](),[9.5180][9.0:23](),[9.23][9.5180:5271](),[9.5180][9.5180:5271](),[9.5271][9.345:366](),[9.366][9.24:48](),[9.5271][9.24:48](),[9.48][9.5271:5364](),[9.5271][9.5271:5364](),[9.5364][9.0:403](),[9.403][9.0:397](),[9.397][9.403:409](),[9.403][9.403:409](),[9.409][9.5421:5456](),[9.5421][9.5421:5456](),[9.5456][9.124:125](),[9.125][9.5456:5457](),[9.5456][9.5456:5457](),[9.5457][9.126:139](),[9.139][9.5457:5458](),[9.5457][9.5457:5458](),[9.5458][9.140:141](),[9.141][9.5458:5623](),[9.5458][9.5458:5623](),[9.5623][9.142:171](),[9.171][9.367:413](),[9.413][9.171:208](),[9.171][9.171:208](),[9.5647][9.5647:5648](),[9.5648][9.0:42](),[9.42][9.209:229](),[9.5686][9.209:229](),[9.229][9.0:65](),[9.65][9.294:300](),[9.294][9.294:300](),[9.300][9.5686:5782](),[9.5686][9.5686:5782](),[9.5782][9.301:302](),[9.302][9.5782:5820](),[9.5782][9.5782:5820](),[9.5820][9.0:50](),[9.50][8.0:27](),[8.27][9.5820:5821](),[9.50][9.5820:5821](),[9.5820][9.5820:5821](),[9.5821][9.303:326](),[9.326][3.0:91](),[3.91][9.326:626](),[9.326][9.326:626]()
    typedef enum {
    bsSuccess = 0,
    bsFailed = 1,
    bsDepFailed = 2,
    bsAborted = 3,
    bsFailedWithOutput = 6,
    bsTimedOut = 7,
    bsUnsupported = 9,
    } BuildStatus;
    typedef enum {
    bssSuccess = 0,
    bssFailed = 1,
    bssAborted = 4,
    bssTimedOut = 7,
    bssUnsupported = 9,
    bssBusy = 100, // not stored
    } BuildStepStatus;
    struct Connection : pqxx::connection
    {
    Connection() : pqxx::connection(getFlags()) { };
    string getFlags()
    {
    string s = getEnv("HYDRA_DBI", "dbi:Pg:dbname=hydra;");
    string prefix = "dbi:Pg:";
    if (string(s, 0, prefix.size()) != prefix)
    throw Error("$HYDRA_DBI does not denote a PostgreSQL database");
    return concatStringsSep(" ", tokenizeString<Strings>(string(s, prefix.size()), ";"));
    }
    };
    struct receiver : public pqxx::notification_receiver
    {
    bool status = false;
    receiver(pqxx::connection_base & c, const std::string & channel)
    : pqxx::notification_receiver(c, channel) { }
    void operator() (const string & payload, int pid) override
    {
    status = true;
    };
    bool get() {
    bool b = status;
    status = false;
    return b;
    }
    };
    typedef unsigned int BuildID;
    struct Step;
    struct Build
    {
    typedef std::shared_ptr<Build> ptr;
    typedef std::weak_ptr<Build> wptr;
    BuildID id;
    Path drvPath;
    std::map<string, Path> outputs;
    std::string fullJobName;
    unsigned int maxSilentTime, buildTimeout;
    std::shared_ptr<Step> toplevel;
    std::atomic_bool finishedInDB{false};
    ~Build()
    {
    printMsg(lvlDebug, format("destroying build %1%") % id);
    }
    };
    struct Step
    {
    typedef std::shared_ptr<Step> ptr;
    typedef std::weak_ptr<Step> wptr;
    Path drvPath;
    Derivation drv;
    std::set<std::string> requiredSystemFeatures;
    bool preferLocalBuild;
    struct State
    {
    /* Whether the step has finished initialisation. */
    bool created = false;
    /* The build steps on which this step depends. */
    std::set<Step::ptr> deps;
    /* The build steps that depend on this step. */
    std::vector<Step::wptr> rdeps;
    /* Builds that have this step as the top-level derivation. */
    std::vector<Build::wptr> builds;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 34
    [9.1255][9.1255:1340]()
    /* Number of times we've tried this step. */
    unsigned int tries = 0;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 35
    [9.1341][9.1341:1433](),[9.1433][9.626:633](),[9.626][9.626:633](),[9.633][9.5905:5906](),[9.5905][9.5905:5906](),[9.93][9.93:144](),[9.144][9.5852:5853](),[9.657][9.5852:5853](),[9.5853][9.145:168](),[9.168][9.5885:5886](),[9.5885][9.5885:5886](),[9.5886][9.169:187](),[9.187][2.0:71](),[2.71][9.256:262](),[9.256][9.256:262](),[9.262][9.5927:6162](),[9.1450][9.5927:6162](),[9.5927][9.5927:6162](),[9.6162][9.0:311](),[9.68][9.6:136](),[9.202][9.6:136](),[9.311][9.6:136](),[9.6][9.6:136](),[9.136][9.51:94](),[9.94][8.28:211](),[8.211][9.200:338](),[9.200][9.200:338](),[9.338][9.169:190](),[9.169][9.169:190](),[9.134][9.6776:6787](),[9.6776][9.6776:6787](),[9.6787][9.6102:6125](),[9.6102][9.6102:6125](),[9.6125][9.910:911](),[9.911][9.6788:6816](),[9.6816][8.212:243](),[9.94][9.947:948](),[9.123][9.947:948](),[8.243][9.947:948](),[9.947][9.947:948](),[9.948][9.6125:6154](),[9.6125][9.6125:6154](),[9.6154][9.752:827](),[9.827][9.6196:6264](),[9.6196][9.6196:6264](),[9.6264][9.828:1049](),[9.1049][9.6327:6385](),[9.6327][9.6327:6385](),[9.6385][9.1050:1123](),[9.172][9.95:96](),[9.96][9.6817:6939](),[9.6939][9.96:163](),[9.96][9.96:163](),[9.163][9.1019:1020](),[9.172][9.1019:1020](),[9.1168][9.1019:1020](),[9.1019][9.1019:1020](),[9.1020][9.6940:6970](),[9.6970][9.312:365](),[9.365][6.813:874](),[6.874][9.366:424](),[9.7045][9.366:424](),[9.424][9.987:1173](),[9.7045][9.987:1173](),[9.1173][9.7045:7046](),[9.7045][9.7045:7046](),[9.2][9.2:27](),[9.27][9.203:225](),[9.225][9.46:132](),[9.27][9.46:132](),[9.132][9.207:237](),[9.237][9.491:523](),[9.523][9.550:618](),[9.237][9.132:187](),[9.523][9.132:187](),[9.618][9.132:187](),[9.132][9.132:187](),[9.187][9.0:146](),[9.146][9.187:254](),[9.187][9.187:254](),[9.254][7.1185:1241](),[7.1241][9.714:852](),[9.254][9.714:852](),[9.852][9.0:469](),[9.64][9.7253:7254](),[9.254][9.7253:7254](),[9.469][9.7253:7254](),[9.852][9.7253:7254](),[9.1633][9.7253:7254](),[9.7253][9.7253:7254](),[9.7254][6.875:961](),[6.961][9.6420:6441](),[9.1020][9.6420:6441](),[9.7254][9.6420:6441](),[9.6420][9.6420:6441](),[9.6456][9.6456:6457](),[9.6457][9.29:38](),[9.38][9.7280:7281](),[9.890][9.7280:7281](),[9.7280][9.7280:7281](),[9.7281][9.39:95](),[9.95][9.900:901](),[9.900][9.900:901](),[9.901][9.425:584](),[9.37][9.6539:6634](),[9.221][9.6539:6634](),[9.584][9.6539:6634](),[9.926][9.6539:6634](),[9.6539][9.6539:6634](),[9.6634][9.0:133](),[9.133][9.6730:6731](),[9.6730][9.6730:6731](),[9.6731][9.7282:7389](),[9.7389][9.134:262](),[9.262][9.6911:6991](),[9.6911][9.6911:6991](),[9.6991][9.173:198](),[9.198][9.0:1](),[9.1][9.136:166](),[9.166][9.1:107](),[9.1][9.1:107](),[9.107][9.1057:1058](),[9.198][9.1057:1058](),[9.1057][9.1057:1058](),[9.1058][9.108:159](),[9.87][9.7042:7043](),[9.159][9.7042:7043](),[9.281][9.7042:7043](),[9.1142][9.7042:7043](),[9.7042][9.7042:7043](),[9.7043][9.1169:1249](),[9.1249][5.0:91](),[5.91][9.0:76](),[9.323][9.0:76](),[9.1249][9.0:76](),[9.76][9.7091:7092](),[9.1224][9.7091:7092](),[9.1293][9.7091:7092](),[9.7091][9.7091:7092](),[9.7263][9.1225:1264](),[9.1264][9.7288:7289](),[9.7288][9.7288:7289](),[9.7289][9.7390:7475](),[9.7475][9.1304:1305](),[9.1304][9.1304:1305](),[9.1305][9.7476:7503](),[9.1376][9.7327:7328](),[9.7503][9.7327:7328](),[9.7327][9.7327:7328](),[9.7328][9.585:685](),[9.685][9.7633:7634](),[9.7633][9.7633:7634](),[9.7634][9.1634:1793](),[9.1793][9.7704:7736](),[9.7704][9.7704:7736](),[9.7736][9.7328:7481](),[9.7328][9.7328:7481](),[9.7481][9.0:65](),[9.65][9.65:66](),[9.66][9.927:1020](),[9.1020][9.470:596](),[9.596][9.25:166](),[9.1020][9.25:166](),[9.166][9.96:97](),[9.1020][9.96:97](),[9.97][4.0:50](),[4.50][9.1020:1029](),[9.137][9.1020:1029](),[9.1020][9.1020:1029](),[9.1029][9.138:162](),[9.162][9.1029:1030](),[9.1029][9.1029:1030](),[9.1030][9.163:182](),[9.65][9.1377:1378](),[9.89][9.1377:1378](),[9.182][9.1377:1378](),[9.7481][9.1377:1378](),[9.1378][6.962:998](),[6.998][9.7481:7486](),[9.1394][9.7481:7486](),[9.7481][9.7481:7486]()
    /* Point in time after which the step can be retried. */
    system_time after;
    };
    std::atomic_bool finished{false}; // debugging
    Sync<State> state;
    ~Step()
    {
    //printMsg(lvlError, format("destroying step %1%") % drvPath);
    }
    };
    struct Machine
    {
    typedef std::shared_ptr<Machine> ptr;
    std::string sshName, sshKey;
    std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
    unsigned int maxJobs = 1;
    float speedFactor = 1.0;
    struct State {
    typedef std::shared_ptr<State> ptr;
    counter currentJobs{0};
    counter nrStepsDone{0};
    counter totalStepTime{0}; // total time for steps, including closure copying
    counter totalStepBuildTime{0}; // total build time for steps
    };
    State::ptr state;
    bool supportsStep(Step::ptr step)
    {
    if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
    for (auto & f : mandatoryFeatures)
    if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
    && !(step->preferLocalBuild && f == "local"))
    return false;
    for (auto & f : step->requiredSystemFeatures)
    if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
    return true;
    }
    };
    class State
    {
    private:
    Path hydraData, logDir;
    StringSet localPlatforms;
    /* The queued builds. */
    typedef std::map<BuildID, Build::ptr> Builds;
    Sync<Builds> builds;
    /* All active or pending build steps (i.e. dependencies of the
    queued builds). Note that these are weak pointers. Steps are
    kept alive by being reachable from Builds or by being in
    progress. */
    typedef std::map<Path, Step::wptr> Steps;
    Sync<Steps> steps;
    /* Build steps that have no unbuilt dependencies. */
    typedef std::list<Step::wptr> Runnable;
    Sync<Runnable> runnable;
    /* CV for waking up the dispatcher. */
    std::condition_variable dispatcherWakeup;
    std::mutex dispatcherMutex;
    /* PostgreSQL connection pool. */
    Pool<Connection> dbPool;
    /* The build machines. */
    typedef std::map<string, Machine::ptr> Machines;
    Sync<Machines> machines; // FIXME: use atomic_shared_ptr
    Path machinesFile;
    struct stat machinesFileStat;
    /* Token server limiting the number of threads copying closures in
    parallel to prevent excessive I/O load. */
    TokenServer copyClosureTokenServer{maxParallelCopyClosure};
    /* Various stats. */
    time_t startedAt;
    counter nrBuildsRead{0};
    counter nrBuildsDone{0};
    counter nrStepsDone{0};
    counter nrActiveSteps{0};
    counter nrStepsBuilding{0};
    counter nrStepsCopyingTo{0};
    counter nrStepsCopyingFrom{0};
    counter nrRetries{0};
    counter maxNrRetries{0};
    counter totalStepTime{0}; // total time for steps, including closure copying
    counter totalStepBuildTime{0}; // total build time for steps
    counter nrQueueWakeups{0};
    counter nrDispatcherWakeups{0};
    counter bytesSent{0};
    counter bytesReceived{0};
    /* Log compressor work queue. */
    Sync<std::queue<Path>> logCompressorQueue;
    std::condition_variable_any logCompressorWakeup;
    /* Notification sender work queue. FIXME: if hydra-queue-runner is
    killed before it has finished sending notifications about a
    build, then the notifications may be lost. It would be better
    to mark builds with pending notification in the database. */
    typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
    Sync<std::queue<NotificationItem>> notificationSenderQueue;
    std::condition_variable_any notificationSenderWakeup;
    /* Specific build to do for --build-one (testing only). */
    BuildID buildOne;
    public:
    State();
    private:
    void clearBusy(Connection & conn, time_t stopTime);
    /* (Re)load /etc/nix/machines. */
    void loadMachinesFile();
    /* Thread to reload /etc/nix/machines periodically. */
    void monitorMachinesFile();
    int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
    const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
    BuildID propagatedFrom = 0);
    void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
    const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
    BuildID propagatedFrom = 0);
    void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
    void queueMonitor();
    void queueMonitorLoop();
    void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
    void removeCancelledBuilds(Connection & conn);
    Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
    std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
    void makeRunnable(Step::ptr step);
    /* The thread that selects and starts runnable builds. */
    void dispatcher();
    void wakeDispatcher();
    void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
    /* Perform the given build step. Return true if the step is to be
    retried. */
    bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
    Machine::ptr machine);
    void markSucceededBuild(pqxx::work & txn, Build::ptr build,
    const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
    bool checkCachedFailure(Step::ptr step, Connection & conn);
    /* Thread that asynchronously bzips logs of finished steps. */
    void logCompressor();
    /* Thread that asynchronously invokes hydra-notify to send build
    notifications. */
    void notificationSender();
    /* Acquire the global queue runner lock, or null if somebody else
    has it. */
    std::shared_ptr<PathLocks> acquireGlobalLock();
    void dumpStatus(Connection & conn, bool log);
    public:
    void showStatus();
    void unlock();
    void run(BuildID buildOne = 0);
    };
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 36
    [9.7501]
    [9.7501]
    : copyClosureTokenServer{maxParallelCopyClosure}
  • file addition: state.hh (----------)
    [9.187]
    #pragma once
    #include <atomic>
    #include <chrono>
    #include <condition_variable>
    #include <map>
    #include <memory>
    #include <queue>
    #include "db.hh"
    #include "counter.hh"
    #include "pathlocks.hh"
    #include "pool.hh"
    #include "sync.hh"
    #include "token-server.hh"
    #include "store-api.hh"
    #include "derivations.hh"
    using namespace nix;
    typedef unsigned int BuildID;
    typedef std::chrono::time_point<std::chrono::system_clock> system_time;
    typedef enum {
    bsSuccess = 0,
    bsFailed = 1,
    bsDepFailed = 2,
    bsAborted = 3,
    bsFailedWithOutput = 6,
    bsTimedOut = 7,
    bsUnsupported = 9,
    } BuildStatus;
    typedef enum {
    bssSuccess = 0,
    bssFailed = 1,
    bssAborted = 4,
    bssTimedOut = 7,
    bssUnsupported = 9,
    bssBusy = 100, // not stored
    } BuildStepStatus;
    struct Step;
    struct BuildResult;
    struct Build
    {
    typedef std::shared_ptr<Build> ptr;
    typedef std::weak_ptr<Build> wptr;
    BuildID id;
    Path drvPath;
    std::map<string, Path> outputs;
    std::string fullJobName;
    unsigned int maxSilentTime, buildTimeout;
    std::shared_ptr<Step> toplevel;
    std::atomic_bool finishedInDB{false};
    };
    struct Step
    {
    typedef std::shared_ptr<Step> ptr;
    typedef std::weak_ptr<Step> wptr;
    Path drvPath;
    Derivation drv;
    std::set<std::string> requiredSystemFeatures;
    bool preferLocalBuild;
    struct State
    {
    /* Whether the step has finished initialisation. */
    bool created = false;
    /* The build steps on which this step depends. */
    std::set<Step::ptr> deps;
    /* The build steps that depend on this step. */
    std::vector<Step::wptr> rdeps;
    /* Builds that have this step as the top-level derivation. */
    std::vector<Build::wptr> builds;
    /* Number of times we've tried this step. */
    unsigned int tries = 0;
    /* Point in time after which the step can be retried. */
    system_time after;
    };
    std::atomic_bool finished{false}; // debugging
    Sync<State> state;
    ~Step()
    {
    //printMsg(lvlError, format("destroying step %1%") % drvPath);
    }
    };
    struct Machine
    {
    typedef std::shared_ptr<Machine> ptr;
    std::string sshName, sshKey;
    std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
    unsigned int maxJobs = 1;
    float speedFactor = 1.0;
    struct State {
    typedef std::shared_ptr<State> ptr;
    counter currentJobs{0};
    counter nrStepsDone{0};
    counter totalStepTime{0}; // total time for steps, including closure copying
    counter totalStepBuildTime{0}; // total build time for steps
    };
    State::ptr state;
    bool supportsStep(Step::ptr step)
    {
    if (systemTypes.find(step->drv.platform) == systemTypes.end()) return false;
    for (auto & f : mandatoryFeatures)
    if (step->requiredSystemFeatures.find(f) == step->requiredSystemFeatures.end()
    && !(step->preferLocalBuild && f == "local"))
    return false;
    for (auto & f : step->requiredSystemFeatures)
    if (supportedFeatures.find(f) == supportedFeatures.end()) return false;
    return true;
    }
    };
    class State
    {
    private:
    Path hydraData, logDir;
    StringSet localPlatforms;
    /* The queued builds. */
    typedef std::map<BuildID, Build::ptr> Builds;
    Sync<Builds> builds;
    /* All active or pending build steps (i.e. dependencies of the
    queued builds). Note that these are weak pointers. Steps are
    kept alive by being reachable from Builds or by being in
    progress. */
    typedef std::map<Path, Step::wptr> Steps;
    Sync<Steps> steps;
    /* Build steps that have no unbuilt dependencies. */
    typedef std::list<Step::wptr> Runnable;
    Sync<Runnable> runnable;
    /* CV for waking up the dispatcher. */
    std::condition_variable dispatcherWakeup;
    std::mutex dispatcherMutex;
    /* PostgreSQL connection pool. */
    Pool<Connection> dbPool;
    /* The build machines. */
    typedef std::map<string, Machine::ptr> Machines;
    Sync<Machines> machines; // FIXME: use atomic_shared_ptr
    Path machinesFile;
    struct stat machinesFileStat;
    /* Token server limiting the number of threads copying closures in
    parallel to prevent excessive I/O load. */
    TokenServer copyClosureTokenServer;
    /* Various stats. */
    time_t startedAt;
    counter nrBuildsRead{0};
    counter nrBuildsDone{0};
    counter nrStepsDone{0};
    counter nrActiveSteps{0};
    counter nrStepsBuilding{0};
    counter nrStepsCopyingTo{0};
    counter nrStepsCopyingFrom{0};
    counter nrRetries{0};
    counter maxNrRetries{0};
    counter totalStepTime{0}; // total time for steps, including closure copying
    counter totalStepBuildTime{0}; // total build time for steps
    counter nrQueueWakeups{0};
    counter nrDispatcherWakeups{0};
    counter bytesSent{0};
    counter bytesReceived{0};
    /* Log compressor work queue. */
    Sync<std::queue<Path>> logCompressorQueue;
    std::condition_variable_any logCompressorWakeup;
    /* Notification sender work queue. FIXME: if hydra-queue-runner is
    killed before it has finished sending notifications about a
    build, then the notifications may be lost. It would be better
    to mark builds with pending notification in the database. */
    typedef std::pair<BuildID, std::vector<BuildID>> NotificationItem;
    Sync<std::queue<NotificationItem>> notificationSenderQueue;
    std::condition_variable_any notificationSenderWakeup;
    /* Specific build to do for --build-one (testing only). */
    BuildID buildOne;
    public:
    State();
    private:
    void clearBusy(Connection & conn, time_t stopTime);
    /* (Re)load /etc/nix/machines. */
    void loadMachinesFile();
    /* Thread to reload /etc/nix/machines periodically. */
    void monitorMachinesFile();
    int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
    const std::string & machine, BuildStepStatus status, const std::string & errorMsg = "",
    BuildID propagatedFrom = 0);
    void finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, BuildID buildId, int stepNr,
    const std::string & machine, BuildStepStatus status, const string & errorMsg = "",
    BuildID propagatedFrom = 0);
    void updateBuild(pqxx::work & txn, Build::ptr build, BuildStatus status);
    void queueMonitor();
    void queueMonitorLoop();
    void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
    void removeCancelledBuilds(Connection & conn);
    Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    Build::ptr referringBuild, Step::ptr referringStep, std::set<Path> & finishedDrvs,
    std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
    void makeRunnable(Step::ptr step);
    /* The thread that selects and starts runnable builds. */
    void dispatcher();
    void wakeDispatcher();
    void builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<MaintainCount> reservation);
    /* Perform the given build step. Return true if the step is to be
    retried. */
    bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
    Machine::ptr machine);
    void markSucceededBuild(pqxx::work & txn, Build::ptr build,
    const BuildResult & res, bool isCachedBuild, time_t startTime, time_t stopTime);
    bool checkCachedFailure(Step::ptr step, Connection & conn);
    /* Thread that asynchronously bzips logs of finished steps. */
    void logCompressor();
    /* Thread that asynchronously invokes hydra-notify to send build
    notifications. */
    void notificationSender();
    /* Acquire the global queue runner lock, or null if somebody else
    has it. */
    std::shared_ptr<PathLocks> acquireGlobalLock();
    void dumpStatus(Connection & conn, bool log);
    public:
    void showStatus();
    void unlock();
    void run(BuildID buildOne = 0);
    };