Fix potential race in dispatcher wakeup

[?]
Aug 10, 2015, 9:58 AM
RKW3XA4UGUXX6L4JPA2ZLDFEF2RS6MZYR6C2GK23SYTRWMUADV3QC

Dependencies

  • [2] 4I2HF4L3 Unindent
  • [3] HJOEIMLR Refactor
  • [4] MHVIT4JY Split hydra-queue-runner.cc more
  • [5] OG3Z3QGC Namespace cleanup
  • [*] ENXUSMSV Make concurrency more robust
  • [*] MB3TISH2 Rate-limit the number of threads copying closures at the same time

Change contents

  • replacement in src/hydra-queue-runner/dispatcher.cc at line 39
    [2.176][2.176:493]()
    std::unique_lock<std::mutex> lock(dispatcherMutex);
    printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
    std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
    dispatcherWakeup.wait_until(lock, sleepUntil);
    [2.176]
    [2.493]
    auto dispatcherWakeup_(dispatcherWakeup.lock());
    if (!*dispatcherWakeup_) {
    printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
    std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
    dispatcherWakeup_.wait_until(dispatcherWakeupCV, sleepUntil);
    }
  • edit in src/hydra-queue-runner/dispatcher.cc at line 46
    [2.528]
    [2.528]
    *dispatcherWakeup_ = false;
  • replacement in src/hydra-queue-runner/dispatcher.cc at line 173
    [3.19892][3.19892:19997]()
    { std::lock_guard<std::mutex> lock(dispatcherMutex); } // barrier
    dispatcherWakeup.notify_one();
    [3.19892]
    [3.19997]
    {
    auto dispatcherWakeup_(dispatcherWakeup.lock());
    *dispatcherWakeup_ = true;
    }
    dispatcherWakeupCV.notify_one();
  • replacement in src/hydra-queue-runner/state.hh at line 205
    [3.4985][3.4985:5063]()
    std::condition_variable dispatcherWakeup;
    std::mutex dispatcherMutex;
    [3.4985]
    [3.5063]
    Sync<bool> dispatcherWakeup;
    std::condition_variable_any dispatcherWakeupCV;
  • edit in src/hydra-queue-runner/sync.hh at line 62
    [8.1638]
    [7.10721]
    }
    template<class Clock, class Duration>
    std::cv_status wait_until(std::condition_variable_any & cv,
    const std::chrono::time_point<Clock, Duration> & duration)
    {
    assert(s);
    return cv.wait_until(s->mutex, duration);