Make concurrency more robust

[?]
May 29, 2015, 3:14 PM
ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC

Dependencies

  • [2] NJJ7H64S Very basic multi-threaded queue runner
  • [3] 24BMQDZA Start of single-process hydra-queue-runner
  • [4] 62MQPRXC Pass null values to libpqxx properly

Change contents

  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 11
    [3.4943]
    [3.4943]
    #include "sync.hh"
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 19
    [3.5058]
    [2.288]
    template <class C, class V>
    bool has(const C & c, const V & v)
    {
    return c.find(v) != c.end();
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 86
    [3.5456]
    [3.5456]
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 88
    [3.5457]
    [3.5457]
    struct Step;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 90
    [3.5458]
    [3.5458]
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 99
    [3.5623]
    [3.5623]
    std::string fullJobName;
    std::shared_ptr<Step> toplevel;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 106
    [3.5686]
    [3.5686]
    ~Build()
    {
    printMsg(lvlError, format("destroying build %1%") % id);
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 118
    [3.5782]
    [3.5782]
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 122
    [3.5821][3.5821:5905]()
    /* The build steps on which this step depends. */
    std::set<Step::ptr> deps;
    [3.5821]
    [3.5905]
    struct State
    {
    /* The build steps on which this step depends. */
    std::set<Step::ptr> deps;
    /* The build steps that depend on this step. */
    std::vector<Step::wptr> rdeps;
    /* Builds that have this step as the top-level derivation. */
    std::vector<Build::wptr> builds;
    };
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 134
    [3.5906][3.5906:5993]()
    /* The build steps that depend on this step. */
    std::vector<Step::wptr> rdeps;
    [3.5906]
    [3.5993]
    Sync<State> state;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 136
    [3.5994][3.5994:6097]()
    /* Builds that have this step as the top-level derivation. */
    std::vector<Build::wptr> builds;
    [3.5994]
    [3.6097]
    ~Step()
    {
    printMsg(lvlError, format("destroying step %1%") % drvPath);
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 150
    [3.6154][3.6154:6196]()
    std::map<BuildID, Build::ptr> builds;
    [3.6154]
    [3.6196]
    typedef std::map<BuildID, Build::ptr> Builds;
    Sync<Builds> builds;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 154
    [3.6264][3.6264:6327]()
    queued builds). */
    std::map<Path, Step::ptr> steps;
    [3.6264]
    [3.6327]
    queued builds). Note that these are weak pointers. Steps are
    kept alive by being reachable from Builds or by being in
    progress. */
    typedef std::map<Path, Step::wptr> Steps;
    Sync<Steps> steps;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 161
    [3.6385][3.6385:6419]()
    std::set<Step::ptr> runnable;
    [3.6385]
    [3.6419]
    typedef std::list<Step::wptr> Runnable;
    Sync<Runnable> runnable;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 164
    [3.6420][2.949:1019]()
    std::mutex runnableMutex;
    std::condition_variable runnableCV;
    [3.6420]
    [2.1019]
    std::condition_variable_any runnableCV;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 185
    [3.7043][2.1143:1224]()
    Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath);
    [3.7043]
    [3.7091]
    Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    std::set<Step::ptr> & newRunnable);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 293
    [2.1944][3.9880:9906](),[3.9880][3.9880:9906]()
    pqxx::work txn(conn);
    [2.1944]
    [3.9906]
    #if 0
    {
    auto runnable_(runnable.lock());
    auto builds_(builds.lock());
    auto steps_(steps.lock());
    printMsg(lvlError, format("%1% builds, %2% steps, %3% runnable steps")
    % builds_->size()
    % steps_->size()
    % runnable_->size());
    }
    #endif
    /* Grab the queued builds from the database, but don't process
    them yet (since we don't want a long-running transaction). */
    std::list<Build::ptr> newBuilds; // FIXME: use queue
    {
    pqxx::work txn(conn);
    // FIXME: query only builds with ID higher than the previous
    // highest.
    auto res = txn.exec("select * from Builds where finished = 0 order by id");
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 316
    [3.9907][3.9907:10056]()
    // FIXME: query only builds with ID higher than the previous
    // highest.
    auto res = txn.exec("select * from Builds where finished = 0");
    [3.9907]
    [3.10056]
    auto builds_(builds.lock());
    for (auto const & row : res) {
    BuildID id = row["id"].as<BuildID>();
    if (has(*builds_, id)) continue;
    auto build = std::make_shared<Build>();
    build->id = id;
    build->drvPath = row["drvPath"].as<string>();
    build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 327
    [3.10057][3.10057:10235]()
    // FIXME: don't process inside a txn.
    for (auto const & row : res) {
    BuildID id = row["id"].as<BuildID>();
    if (builds.find(id) != builds.end()) continue;
    [3.10057]
    [3.10235]
    newBuilds.push_back(build);
    }
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 331
    [3.10236][3.10236:10351]()
    Build::ptr build(new Build);
    build->id = id;
    build->drvPath = row["drvPath"].as<string>();
    [3.10236]
    [3.10351]
    /* Now instantiate build steps for each new build. The builder
    threads can start building the runnable build steps right away,
    even while we're still processing other new builds. */
    for (auto & build : newBuilds) {
    // FIXME: remove build from newBuilds to ensure quick destruction
    // FIXME: exception handling
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 338
    [3.10352][3.10352:10473]()
    printMsg(lvlInfo, format("loading build %1% (%2%:%3%:%4%)") % id % row["project"] % row["jobset"] % row["job"]);
    [3.10352]
    [3.10473]
    printMsg(lvlInfo, format("loading build %1% (%2%)") % build->id % build->fullJobName);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 342
    [3.10582][3.10582:10611]()
    Connection conn;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 353
    [3.11027][2.1945:2005]()
    Step::ptr step = createStep(store, build->drvPath);
    [3.11027]
    [3.11080]
    std::set<Step::ptr> newRunnable;
    Step::ptr step = createStep(store, build->drvPath, newRunnable);
    /* If we didn't get a step, it means the step's outputs are
    all valid. So we mark this as a finished, cached build. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 362
    [3.11214][3.11214:11243]()
    Connection conn;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 370
    [3.11436][3.11436:11475]()
    step->builds.push_back(build);
    [3.11436]
    [3.11475]
    /* Note: if we exit this scope prior to this, the build and
    all newly created steps are destroyed. */
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 373
    [3.11476][3.11476:11504]()
    builds[id] = build;
    [3.11476]
    [3.11504]
    {
    auto builds_(builds.lock());
    auto step_(step->state.lock());
    (*builds_)[build->id] = build;
    step_->builds.push_back(build);
    build->toplevel = step;
    }
    /* Prior to this, the build is not visible to
    getDependentBuilds(). Now it is, so the build can be
    failed if a dependency fails. (It can't succeed right away
    because its top-level is not runnable yet). */
    /* Add the new runnable build steps to ‘runnable’ and wake up
    the builder threads. */
    for (auto & r : newRunnable)
    makeRunnable(r);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 394
    [3.11514][2.2065:2148]()
    Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath)
    [3.11514]
    [3.11564]
    Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
    std::set<Step::ptr> & newRunnable)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 397
    [3.11566][3.11566:11653]()
    auto prev = steps.find(drvPath);
    if (prev != steps.end()) return prev->second;
    [3.11566]
    [3.11653]
    /* Check if the requested step already exists. */
    {
    auto steps_(steps.lock());
    auto prev = steps_->find(drvPath);
    if (prev != steps_->end()) {
    auto step = prev->second.lock();
    /* Since ‘step’ is a strong pointer, the referred Step
    object won't be deleted after this. */
    if (step) return step;
    steps_->erase(drvPath); // remove stale entry
    }
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 412
    [3.11732][3.11732:11762]()
    Step::ptr step(new Step);
    [3.11732]
    [3.11762]
    auto step = std::make_shared<Step>();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 432
    [3.12286]
    [3.12286]
    bool hasDeps = false;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 434
    [3.12329][2.2149:2201]()
    Step::ptr dep = createStep(store, i.first);
    [3.12329]
    [3.12374]
    Step::ptr dep = createStep(store, i.first, newRunnable);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 436
    [3.12393][3.12393:12469]()
    step->deps.insert(dep);
    dep->rdeps.push_back(step);
    [3.12393]
    [3.12469]
    hasDeps = true;
    auto step_(step->state.lock());
    auto dep_(dep->state.lock());
    step_->deps.insert(dep);
    dep_->rdeps.push_back(step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 444
    [3.12486][3.12486:12513]()
    steps[drvPath] = step;
    [3.12486]
    [3.12513]
    {
    auto steps_(steps.lock());
    assert(steps_->find(drvPath) == steps_->end());
    (*steps_)[drvPath] = step;
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 450
    [3.12514][2.2202:2250]()
    if (step->deps.empty()) makeRunnable(step);
    [3.12514]
    [3.12565]
    if (!hasDeps) newRunnable.insert(step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 458
    [3.12643][3.12643:12675]()
    steps.erase(step->drvPath);
    [3.12643]
    [3.12675]
    printMsg(lvlInfo, format("destroying build step ‘%1%’") % step->drvPath);
    {
    auto steps_(steps.lock());
    steps_->erase(step->drvPath);
    }
    std::vector<Step::wptr> rdeps;
    {
    auto step_(step->state.lock());
    rdeps = step_->rdeps;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 471
    [3.12676][3.12676:12715]()
    for (auto & rdep_ : step->rdeps) {
    [3.12676]
    [3.12715]
    /* Sanity checks. */
    for (auto & build_ : step_->builds) {
    auto build = build_.lock();
    if (!build) continue;
    assert(build->drvPath == step->drvPath);
    assert(build->finishedInDB);
    }
    }
    for (auto & rdep_ : rdeps) {
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 483
    [3.12778][3.12778:12869]()
    assert(rdep->deps.find(step) != rdep->deps.end());
    rdep->deps.erase(step);
    [3.12778]
    [3.12869]
    bool runnable = false;
    {
    auto rdep_(rdep->state.lock());
    assert(has(rdep_->deps, step));
    rdep_->deps.erase(step);
    if (rdep_->deps.empty()) runnable = true;
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 493
    [3.12991][3.12991:13027]()
    if (rdep->deps.empty())
    [3.12991]
    [2.2251]
    if (runnable)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 496
    [3.13081][3.13081:13177]()
    /* If ‘step’ failed, then delete all dependent steps as
    well. */
    [3.13081]
    [3.13177]
    /* If ‘step’ failed or was cancelled, then delete all
    dependent steps as well. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 500
    [3.13221][3.13221:13421]()
    for (auto & build_ : step->builds) {
    auto build = build_.lock();
    if (!build) continue;
    assert(build->drvPath == step->drvPath);
    assert(build->finishedInDB);
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 511
    [3.13628][3.13628:13679]()
    if (done.find(step) != done.end()) return;
    [3.13628]
    [3.13679]
    if (has(done, step)) return;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 513
    [3.13706]
    [3.13706]
    std::vector<Step::wptr> rdeps;
    {
    auto step_(step->state.lock());
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 519
    [3.13707][3.13707:13835]()
    for (auto & build : step->builds) {
    auto build2 = build.lock();
    if (build2) res.insert(build2);
    [3.13707]
    [3.13835]
    for (auto & build : step_->builds) {
    auto build_ = build.lock();
    if (build_) res.insert(build_);
    }
    /* Make a copy of rdeps so that we don't hold the lock for
    very long. */
    rdeps = step_->rdeps;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 529
    [3.13846][3.13846:13963]()
    for (auto & rdep : step->rdeps) {
    auto rdep2 = rdep.lock();
    if (rdep2) visit(rdep2);
    [3.13846]
    [3.13963]
    for (auto & rdep : rdeps) {
    auto rdep_ = rdep.lock();
    if (rdep_) visit(rdep_);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 543
    [3.14048][2.2330:2362]()
    assert(step->deps.empty());
    [3.14048]
    [2.2362]
    {
    auto step_(step->state.lock());
    assert(step_->deps.empty());
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 549
    [2.2369][2.2369:2457]()
    std::lock_guard<std::mutex> lock(runnableMutex);
    runnable.insert(step);
    [2.2369]
    [3.14256]
    auto runnable_(runnable.lock());
    runnable_->push_back(step);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 562
    [2.2598]
    [2.2598]
    /* Sleep until a runnable build step becomes available. */
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 565
    [2.2632][2.2632:2842]()
    std::unique_lock<std::mutex> lock(runnableMutex);
    while (runnable.empty())
    runnableCV.wait(lock);
    step = *runnable.begin();
    runnable.erase(step);
    [2.2632]
    [2.2842]
    auto runnable_(runnable.lock());
    while (runnable_->empty())
    runnable_.wait(runnableCV);
    auto weak = *runnable_->begin();
    runnable_->pop_front();
    step = weak.lock();
    if (!step) continue;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 574
    [2.2853]
    [2.2853]
    /* Build it. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 576
    [2.2950][2.2950:2951]()
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 585
    [3.14308][3.14308:14341]()
    assert(step->deps.empty());
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 586
    [3.14410][3.14410:14547]()
    on this derivation. Arbitrarily pick one (though preferring
    those build of which this is the top-level derivation) for the
    [3.14410]
    [3.14547]
    on this derivation. Arbitrarily pick one (though preferring a
    build of which this is the top-level derivation) for the
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 594
    [3.14812][3.14812:14856]()
    auto builds = getDependentBuilds(step);
    [3.14812]
    [3.14856]
    {
    auto dependents = getDependentBuilds(step);
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 597
    [3.14857][3.14857:15146]()
    if (builds.empty()) {
    /* Apparently all builds that depend on this derivation are
    gone (e.g. cancelled). So don't bother. */
    printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);
    destroyStep(step, true);
    return;
    }
    [3.14857]
    [3.15146]
    if (dependents.empty()) {
    /* Apparently all builds that depend on this derivation
    are gone (e.g. cancelled). So don't bother. (This is
    very unlikely to happen, because normally Steps are
    only kept alive by being reachable from a
    Build). FIXME: what if a new Build gets a reference to
    this step? */
    printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);
    destroyStep(step, false);
    return;
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 609
    [3.15147][3.15147:15251]()
    for (auto build2 : builds)
    if (build2->drvPath == step->drvPath) { build = build2; break; }
    [3.15147]
    [3.15251]
    for (auto build2 : dependents)
    if (build2->drvPath == step->drvPath) { build = build2; break; }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 612
    [3.15252][3.15252:15293]()
    if (!build) build = *builds.begin();
    [3.15252]
    [3.15293]
    if (!build) build = *dependents.begin();
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 614
    [3.15294][3.15294:15415]()
    printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % builds.size());
    [3.15294]
    [3.15415]
    printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % dependents.size());
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 644
    [3.16048][3.16048:16115]()
    // FIXME: handle new builds having been added in the meantime.
    [3.16048]
    [3.16115]
    /* Remove this step. After this, incoming builds that depend on
    drvPath will either see that the output paths exist, or will
    create a new build step for drvPath. The latter is fine - it
    won't conflict with this one, because we're removing it. In any
    case, the set of dependent builds for ‘step’ can't increase
    anymore because ‘step’ is no longer visible to createStep(). */
    {
    auto steps_(steps.lock());
    steps_->erase(step->drvPath);
    }
    /* Get the final set of dependent builds. */
    auto dependents = getDependentBuilds(step);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 658
    [3.16116]
    [3.16116]
    std::set<Build::ptr> direct;
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 660
    [3.16122]
    [3.16122]
    auto step_(step->state.lock());
    for (auto & build : step_->builds) {
    auto build_ = build.lock();
    if (build_) direct.insert(build_);
    }
    }
    /* Update the database. */
    {
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 677
    [3.16358][3.16358:16491]()
    for (auto build2_ : step->builds) {
    auto build2 = build2_.lock();
    if (!build2) continue;
    [3.16358]
    [3.16491]
    for (auto build2 : direct)
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 679
    [3.16573][3.16573:16587]()
    }
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 685
    [3.16787][3.16787:16828]()
    for (auto build2 : builds) {
    [3.16787]
    [3.16828]
    for (auto build2 : dependents) {
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 691
    [3.17060][3.17060:17101]()
    for (auto build2 : builds) {
    [3.17060]
    [3.17101]
    for (auto build2 : dependents) {
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 703
    [3.17581][3.17581:17582]()
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 705
    [3.17589][3.17589:17637]()
    /* Remove the build step from the graph. */
    [3.17589]
    [3.17637]
    /* In case of success, destroy all Build objects of which ‘step’
    is the top-level derivation. In case of failure, destroy all
    dependent Build objects. Any Steps not referenced by other
    Builds will be destroyed as well. */
    for (auto build2 : dependents)
    if (build2->toplevel == step || !success) {
    auto builds_(builds.lock());
    builds_->erase(build2->id);
    }
    /* Remove the step from the graph. In case of success, make
    dependent build steps runnable if they have no other
    dependencies. */
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 765
    [2.3364][2.3364:3379]()
    sleep(1);
  • file addition: sync.hh (----------)
    [3.187]
    #pragma once
    #include <mutex>
    #include <condition_variable>
    /* This template class ensures synchronized access to a value of type
    T. It is used as follows:
    struct Data { int x; ... };
    Sync<Data> data;
    {
    auto data_(data.lock());
    data_->x = 123;
    }
    Here, "data" is automatically unlocked when "data_" goes out of
    scope.
    */
    template <class T>
    class Sync
    {
    private:
    std::mutex mutex;
    T data;
    public:
    class Lock
    {
    private:
    Sync * s;
    friend Sync;
    Lock(Sync * s) : s(s) { s->mutex.lock(); }
    public:
    Lock(Lock && l) : s(l.s) { l.s = 0; }
    Lock(const Lock & l) = delete;
    ~Lock() { if (s) s->mutex.unlock(); }
    T * operator -> () { return &s->data; }
    T & operator * () { return s->data; }
    /* FIXME: performance impact of condition_variable_any? */
    void wait(std::condition_variable_any & cv)
    {
    assert(s);
    cv.wait(s->mutex);
    }
    };
    Lock lock() { return Lock(this); }
    };