On SIGINT, shut down the builder threads

[?]
May 29, 2015, 6:02 PM
T2EIYJNGPIANHKJ4HBJIPTINWKG7RDLHR3PVHFYAPPLHZAJQBVWAC

Dependencies

  • [2] ENXUSMSV Make concurrency more robust
  • [3] 62MQPRXC Pass null values to libpqxx properly
  • [4] NJJ7H64S Very basic multi-threaded queue runner
  • [5] 24BMQDZA Start of single-process hydra-queue-runner

Change contents

  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 30
    [3.356][3.356:383]()
    bool exitRequested(false);
    [3.356]
    [3.5058]
    std::atomic<bool> exitRequested(false);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 148
    [3.947]
    [3.947]
    std::mutex queueMonitorMutex;
    std::condition_variable queueMonitorWakeup;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 166
    [3.6420][2.1124:1168]()
    std::condition_variable_any runnableCV;
    [3.6420]
    [3.1019]
    std::condition_variable_any runnableWakeup;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 183
    [3.6991][3.1021:1057]()
    void queueMonitorThreadEntry();
    [3.6991]
    [3.1057]
    void queueMonitor();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 271
    [3.9825][3.1395:1433]()
    void State::queueMonitorThreadEntry()
    [3.9825]
    [3.1433]
    void State::queueMonitor()
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 277
    [3.1503][3.1503:1522]()
    while (true) {
    [3.1503]
    [3.1522]
    while (!exitRequested) {
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 281
    [3.1571][3.1571:1739]()
    std::unique_lock<std::mutex> lock(exitRequestMutex);
    exitRequest.wait_for(lock, std::chrono::seconds(5));
    if (exitRequested) break;
    [3.1571]
    [3.1739]
    std::unique_lock<std::mutex> lock(queueMonitorMutex);
    queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5));
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 554
    [3.2459][3.2459:2488]()
    runnableCV.notify_one();
    [3.2459]
    [3.14262]
    runnableWakeup.notify_one();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 567
    [2.6839][2.6839:6922]()
    while (runnable_->empty())
    runnable_.wait(runnableCV);
    [2.6839]
    [2.6922]
    while (runnable_->empty() && !exitRequested)
    runnable_.wait(runnableWakeup);
    if (exitRequested) break;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 765
    [3.3286][3.3286:3363]()
    queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this);
    [3.3286]
    [3.3363]
    queueMonitorThread = std::thread(&State::queueMonitor, this);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 767
    [3.3364]
    [3.3379]
    std::vector<std::thread> builderThreads;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 769
    [3.3411][3.3411:3478]()
    std::thread(&State::builderThreadEntry, this, n).detach();
    [3.3411]
    [3.3478]
    builderThreads.push_back(std::thread(&State::builderThreadEntry, this, n));
    /* Wait for SIGINT. */
    {
    std::unique_lock<std::mutex> lock(exitRequestMutex);
    while (!exitRequested)
    exitRequest.wait(lock);
    }
    printMsg(lvlError, "exiting...");
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 780
    [3.3479]
    [3.3479]
    /* Shut down the various threads. */
    { std::lock_guard<std::mutex> lock(queueMonitorMutex); } // barrier
    queueMonitorWakeup.notify_all();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 784
    [3.3510]
    [3.19072]
    { auto runnable_(runnable.lock()); } // barrier
    runnableWakeup.notify_all();
    for (auto & thread : builderThreads) thread.join();