Very basic multi-threaded queue runner

[?]
May 28, 2015, 11:31 PM
NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHAC

Dependencies

  • [2] 24BMQDZA Start of single-process hydra-queue-runner
  • [3] 62MQPRXC Pass null values to libpqxx properly

Change contents

  • replacement in src/hydra-queue-runner/build-result.cc at line 9
    [2.555][2.555:606]()
    BuildResult getBuildResult(const Derivation & drv)
    [2.555]
    [2.606]
    BuildResult getBuildResult(std::shared_ptr<StoreAPI> store, const Derivation & drv)
  • edit in src/hydra-queue-runner/build-result.hh at line 2
    [2.4343]
    [2.4343]
    #include <memory>
  • replacement in src/hydra-queue-runner/build-result.hh at line 27
    [2.4741][2.4741:4798]()
    BuildResult getBuildResult(const nix::Derivation & drv);
    [2.4741]
    BuildResult getBuildResult(std::shared_ptr<nix::StoreAPI> store, const nix::Derivation & drv);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1
    [2.4840]
    [2.4841]
    #include <atomic>
    #include <condition_variable>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 4
    [2.4861][2.4861:4879]()
    #include <memory>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 5
    [2.4894]
    [2.4894]
    #include <memory>
    #include <thread>
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 17
    [2.5058]
    [2.5058]
    std::mutex exitRequestMutex;
    std::condition_variable exitRequest;
    bool exitRequested(false);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 23
    [2.5059]
    [2.5059]
    static std::atomic_int _int(0);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 25
    [2.5060]
    [2.5060]
    void sigintHandler(int signo)
    {
    _int = 1;
    }
    void signalThread()
    {
    struct sigaction act;
    act.sa_handler = sigintHandler;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    if (sigaction(SIGINT, &act, 0))
    throw SysError("installing handler for SIGINT");
    while (true) {
    sleep(1000000);
    if (_int) break;
    }
    {
    std::lock_guard<std::mutex> lock(exitRequestMutex);
    exitRequested = true;
    }
    exitRequest.notify_all();
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 115
    [2.6125]
    [2.6125]
    std::thread queueMonitorThread;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 128
    [2.6420]
    [2.6420]
    std::mutex runnableMutex;
    std::condition_variable runnableCV;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 146
    [2.6991][2.6991:7042]()
    void getQueuedBuilds(pqxx::connection & conn);
    [2.6991]
    [2.7042]
    void queueMonitorThreadEntry();
    void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 150
    [2.7043][2.7043:7091]()
    Step::ptr createStep(const Path & drvPath);
    [2.7043]
    [2.7091]
    Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 157
    [2.7263][2.7263:7288]()
    void doBuildSteps();
    [2.7263]
    [2.7288]
    void makeRunnable(Step::ptr step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 159
    [2.7289][2.7289:7327]()
    void doBuildStep(Step::ptr step);
    [2.7289]
    [2.7327]
    void builderThreadEntry(int slot);
    void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 165
    [2.7481]
    [2.7481]
    void run();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 233
    [2.9825][2.9825:9878]()
    void State::getQueuedBuilds(pqxx::connection & conn)
    [2.9825]
    [2.9878]
    void State::queueMonitorThreadEntry()
    {
    auto store = openStore(); // FIXME: pool
    Connection conn;
    while (true) {
    getQueuedBuilds(store, conn);
    {
    std::unique_lock<std::mutex> lock(exitRequestMutex);
    exitRequest.wait_for(lock, std::chrono::seconds(5));
    if (exitRequested) break;
    }
    }
    printMsg(lvlError, "queue monitor exits");
    }
    void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn)
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 255
    [2.9880]
    [2.9880]
    printMsg(lvlError, "checking the queue...");
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 288
    [2.11027][2.11027:11080]()
    Step::ptr step = createStep(build->drvPath);
    [2.11027]
    [2.11080]
    Step::ptr step = createStep(store, build->drvPath);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 291
    [2.11162][2.11162:11213]()
    BuildResult res = getBuildResult(drv);
    [2.11162]
    [2.11213]
    BuildResult res = getBuildResult(store, drv);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 309
    [2.11514][2.11514:11564]()
    Step::ptr State::createStep(const Path & drvPath)
    [2.11514]
    [2.11564]
    Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 337
    [2.12329][2.12329:12374]()
    Step::ptr dep = createStep(i.first);
    [2.12329]
    [2.12374]
    Step::ptr dep = createStep(store, i.first);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 346
    [2.12514][2.12514:12565]()
    if (step->deps.empty()) runnable.insert(step);
    [2.12514]
    [2.12565]
    if (step->deps.empty()) makeRunnable(step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 365
    [2.13027][2.13027:13066]()
    runnable.insert(rdep);
    [2.13027]
    [2.13066]
    makeRunnable(rdep);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 409
    [2.14019][2.14019:14046]()
    void State::doBuildSteps()
    [2.14019]
    [2.14046]
    void State::makeRunnable(Step::ptr step)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 411
    [2.14048][2.14048:14256]()
    while (!runnable.empty()) {
    printMsg(lvlInfo, format("%1% runnable steps") % runnable.size());
    Step::ptr step = *runnable.begin();
    runnable.erase(step);
    doBuildStep(step);
    [2.14048]
    [2.14256]
    assert(step->deps.empty());
    {
    std::lock_guard<std::mutex> lock(runnableMutex);
    runnable.insert(step);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 417
    [2.14262]
    [2.14262]
    runnableCV.notify_one();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 422
    [2.14266][2.14266:14306]()
    void State::doBuildStep(Step::ptr step)
    [2.14266]
    [2.14306]
    void State::builderThreadEntry(int slot)
    {
    auto store = openStore(); // FIXME: pool
    while (true) {
    Step::ptr step;
    {
    std::unique_lock<std::mutex> lock(runnableMutex);
    while (runnable.empty())
    runnableCV.wait(lock);
    step = *runnable.begin();
    runnable.erase(step);
    }
    printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath);
    doBuildStep(store, step);
    }
    printMsg(lvlError, "builder thread exits");
    }
    void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 498
    [2.15956][2.15956:16006]()
    if (success) res = getBuildResult(step->drv);
    [2.15956]
    [2.16006]
    if (success) res = getBuildResult(store, step->drv);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 581
    [2.19072]
    [2.19072]
    }
    void State::run()
    {
    {
    Connection conn;
    markActiveBuildStepsAsAborted(conn, 0);
    }
    queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this);
    sleep(1);
    for (int n = 0; n < 4; n++)
    std::thread(&State::builderThreadEntry, this, n).detach();
    queueMonitorThread.join();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 606
    [2.19176]
    [2.19176]
    std::thread(signalThread).detach();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 609
    [2.19177]
    [2.19177]
    /* Ignore signals. This is inherited by the other threads. */
    sigset_t set;
    sigemptyset(&set);
    sigaddset(&set, SIGHUP);
    sigaddset(&set, SIGINT);
    sigaddset(&set, SIGTERM);
    sigprocmask(SIG_BLOCK, &set, NULL);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 619
    [2.19262][2.19262:19292]()
    store = openStore();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 622
    [2.19396][2.19396:19423]()
    Connection conn;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 623
    [2.19444][2.19444:19568]()
    state.markActiveBuildStepsAsAborted(conn, 0);
    state.getQueuedBuilds(conn);
    state.doBuildSteps();
    [2.19444]
    [2.19568]
    state.run();