Make concurrency more robust
[?]
May 29, 2015, 3:14 PM
ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAACDependencies
- [2]
NJJ7H64SVery basic multi-threaded queue runner - [3]
24BMQDZAStart of single-process hydra-queue-runner - [4]
62MQPRXCPass null values to libpqxx properly
Change contents
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 11
#include "sync.hh" - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 19
template <class C, class V>bool has(const C & c, const V & v){return c.find(v) != c.end();} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 86
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 88
struct Step; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 90
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 99
std::string fullJobName;std::shared_ptr<Step> toplevel; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 106
~Build(){printMsg(lvlError, format("destroying build %1%") % id);} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 118
- replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 122
/* The build steps on which this step depends. */std::set<Step::ptr> deps;struct State{/* The build steps on which this step depends. */std::set<Step::ptr> deps;/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds;}; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 134
/* The build steps that depend on this step. */std::vector<Step::wptr> rdeps;Sync<State> state; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 136
/* Builds that have this step as the top-level derivation. */std::vector<Build::wptr> builds;~Step(){printMsg(lvlError, format("destroying step %1%") % drvPath);} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 150
std::map<BuildID, Build::ptr> builds;typedef std::map<BuildID, Build::ptr> Builds;Sync<Builds> builds; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 154
queued builds). */std::map<Path, Step::ptr> steps;queued builds). Note that these are weak pointers. Steps arekept alive by being reachable from Builds or by being inprogress. */typedef std::map<Path, Step::wptr> Steps;Sync<Steps> steps; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 161
std::set<Step::ptr> runnable;typedef std::list<Step::wptr> Runnable;Sync<Runnable> runnable; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 164
std::mutex runnableMutex;std::condition_variable runnableCV;std::condition_variable_any runnableCV; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 185
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,std::set<Step::ptr> & newRunnable); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 293
pqxx::work txn(conn);#if 0{auto runnable_(runnable.lock());auto builds_(builds.lock());auto steps_(steps.lock());printMsg(lvlError, format("%1% builds, %2% steps, %3% runnable steps")% builds_->size()% steps_->size()% runnable_->size());}#endif/* Grab the queued builds from the database, but don't processthem yet (since we don't want a long-running transaction). */std::list<Build::ptr> newBuilds; // FIXME: use queue{pqxx::work txn(conn);// FIXME: query only builds with ID higher than the previous// highest.auto res = txn.exec("select * from Builds where finished = 0 order by id"); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 316
// FIXME: query only builds with ID higher than the previous// highest.auto res = txn.exec("select * from Builds where finished = 0");auto builds_(builds.lock());for (auto const & row : res) {BuildID id = row["id"].as<BuildID>();if (has(*builds_, id)) continue;auto build = std::make_shared<Build>();build->id = id;build->drvPath = row["drvPath"].as<string>();build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 327
// FIXME: don't process inside a txn.for (auto const & row : res) {BuildID id = row["id"].as<BuildID>();if (builds.find(id) != builds.end()) continue;newBuilds.push_back(build);}} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 331
Build::ptr build(new Build);build->id = id;build->drvPath = row["drvPath"].as<string>();/* Now instantiate build steps for each new build. The builderthreads can start building the runnable build steps right away,even while we're still processing other new builds. */for (auto & build : newBuilds) {// FIXME: remove build from newBuilds to ensure quick destruction// FIXME: exception handling - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 338
printMsg(lvlInfo, format("loading build %1% (%2%:%3%:%4%)") % id % row["project"] % row["jobset"] % row["job"]);printMsg(lvlInfo, format("loading build %1% (%2%)") % build->id % build->fullJobName); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 342
Connection conn; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 353
Step::ptr step = createStep(store, build->drvPath);std::set<Step::ptr> newRunnable;Step::ptr step = createStep(store, build->drvPath, newRunnable);/* If we didn't get a step, it means the step's outputs areall valid. So we mark this as a finished, cached build. */ - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 362
Connection conn; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 370
step->builds.push_back(build);/* Note: if we exit this scope prior to this, the build andall newly created steps are destroyed. */ - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 373
builds[id] = build;{auto builds_(builds.lock());auto step_(step->state.lock());(*builds_)[build->id] = build;step_->builds.push_back(build);build->toplevel = step;}/* Prior to this, the build is not visible togetDependentBuilds(). Now it is, so the build can befailed if a dependency fails. (It can't succeed right awaybecause its top-level is not runnable yet). *//* Add the new runnable build steps to ‘runnable’ and wake upthe builder threads. */for (auto & r : newRunnable)makeRunnable(r); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 394
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath)Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,std::set<Step::ptr> & newRunnable) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 397
auto prev = steps.find(drvPath);if (prev != steps.end()) return prev->second;/* Check if the requested step already exists. */{auto steps_(steps.lock());auto prev = steps_->find(drvPath);if (prev != steps_->end()) {auto step = prev->second.lock();/* Since ‘step’ is a strong pointer, the referred Stepobject won't be deleted after this. */if (step) return step;steps_->erase(drvPath); // remove stale entry}} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 412
Step::ptr step(new Step);auto step = std::make_shared<Step>(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 432
bool hasDeps = false; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 434
Step::ptr dep = createStep(store, i.first);Step::ptr dep = createStep(store, i.first, newRunnable); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 436
step->deps.insert(dep);dep->rdeps.push_back(step);hasDeps = true;auto step_(step->state.lock());auto dep_(dep->state.lock());step_->deps.insert(dep);dep_->rdeps.push_back(step); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 444
steps[drvPath] = step;{auto steps_(steps.lock());assert(steps_->find(drvPath) == steps_->end());(*steps_)[drvPath] = step;} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 450
if (step->deps.empty()) makeRunnable(step);if (!hasDeps) newRunnable.insert(step); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 458
steps.erase(step->drvPath);printMsg(lvlInfo, format("destroying build step ‘%1%’") % step->drvPath);{auto steps_(steps.lock());steps_->erase(step->drvPath);}std::vector<Step::wptr> rdeps;{auto step_(step->state.lock());rdeps = step_->rdeps; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 471
for (auto & rdep_ : step->rdeps) {/* Sanity checks. */for (auto & build_ : step_->builds) {auto build = build_.lock();if (!build) continue;assert(build->drvPath == step->drvPath);assert(build->finishedInDB);}}for (auto & rdep_ : rdeps) { - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 483
assert(rdep->deps.find(step) != rdep->deps.end());rdep->deps.erase(step);bool runnable = false;{auto rdep_(rdep->state.lock());assert(has(rdep_->deps, step));rdep_->deps.erase(step);if (rdep_->deps.empty()) runnable = true;} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 493
if (rdep->deps.empty())if (runnable) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 496
/* If ‘step’ failed, then delete all dependent steps aswell. *//* If ‘step’ failed or was cancelled, then delete alldependent steps as well. */ - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 500
for (auto & build_ : step->builds) {auto build = build_.lock();if (!build) continue;assert(build->drvPath == step->drvPath);assert(build->finishedInDB);} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 511
if (done.find(step) != done.end()) return;if (has(done, step)) return; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 513
std::vector<Step::wptr> rdeps;{auto step_(step->state.lock()); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 519
for (auto & build : step->builds) {auto build2 = build.lock();if (build2) res.insert(build2);for (auto & build : step_->builds) {auto build_ = build.lock();if (build_) res.insert(build_);}/* Make a copy of rdeps so that we don't hold the lock forvery long. */rdeps = step_->rdeps; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 529
for (auto & rdep : step->rdeps) {auto rdep2 = rdep.lock();if (rdep2) visit(rdep2);for (auto & rdep : rdeps) {auto rdep_ = rdep.lock();if (rdep_) visit(rdep_); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 543
assert(step->deps.empty());{auto step_(step->state.lock());assert(step_->deps.empty());} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 549
std::lock_guard<std::mutex> lock(runnableMutex);runnable.insert(step);auto runnable_(runnable.lock());runnable_->push_back(step); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 562
/* Sleep until a runnable build step becomes available. */ - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 565
std::unique_lock<std::mutex> lock(runnableMutex);while (runnable.empty())runnableCV.wait(lock);step = *runnable.begin();runnable.erase(step);auto runnable_(runnable.lock());while (runnable_->empty())runnable_.wait(runnableCV);auto weak = *runnable_->begin();runnable_->pop_front();step = weak.lock();if (!step) continue; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 574
/* Build it. */ - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 576
- edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 585
assert(step->deps.empty()); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 586
on this derivation. Arbitrarily pick one (though preferringthose build of which this is the top-level derivation) for theon this derivation. Arbitrarily pick one (though preferring abuild of which this is the top-level derivation) for the - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 594
auto builds = getDependentBuilds(step);{auto dependents = getDependentBuilds(step); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 597
if (builds.empty()) {/* Apparently all builds that depend on this derivation aregone (e.g. cancelled). So don't bother. */printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);destroyStep(step, true);return;}if (dependents.empty()) {/* Apparently all builds that depend on this derivationare gone (e.g. cancelled). So don't bother. (This isvery unlikely to happen, because normally Steps areonly kept alive by being reachable from aBuild). FIXME: what if a new Build gets a reference tothis step? */printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);destroyStep(step, false);return;} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 609
for (auto build2 : builds)if (build2->drvPath == step->drvPath) { build = build2; break; }for (auto build2 : dependents)if (build2->drvPath == step->drvPath) { build = build2; break; } - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 612
if (!build) build = *builds.begin();if (!build) build = *dependents.begin(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 614
printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % builds.size());printMsg(lvlInfo, format("performing build step ‘%1%’ (needed by %2% builds)") % step->drvPath % dependents.size());} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 644
// FIXME: handle new builds having been added in the meantime./* Remove this step. After this, incoming builds that depend ondrvPath will either see that the output paths exist, or willcreate a new build step for drvPath. The latter is fine - itwon't conflict with this one, because we're removing it. In anycase, the set of dependent builds for ‘step’ can't increaseanymore because ‘step’ is no longer visible to createStep(). */{auto steps_(steps.lock());steps_->erase(step->drvPath);}/* Get the final set of dependent builds. */auto dependents = getDependentBuilds(step); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 658
std::set<Build::ptr> direct; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 660
auto step_(step->state.lock());for (auto & build : step_->builds) {auto build_ = build.lock();if (build_) direct.insert(build_);}}/* Update the database. */{ - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 677
for (auto build2_ : step->builds) {auto build2 = build2_.lock();if (!build2) continue;for (auto build2 : direct) - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 679
} - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 685
for (auto build2 : builds) {for (auto build2 : dependents) { - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 691
for (auto build2 : builds) {for (auto build2 : dependents) { - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 703
- replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 705
/* Remove the build step from the graph. *//* In case of success, destroy all Build objects of which ‘step’is the top-level derivation. In case of failure, destroy alldependent Build objects. Any Steps not referenced by otherBuilds will be destroyed as well. */for (auto build2 : dependents)if (build2->toplevel == step || !success) {auto builds_(builds.lock());builds_->erase(build2->id);}/* Remove the step from the graph. In case of success, makedependent build steps runnable if they have no otherdependencies. */ - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 765
sleep(1); - file addition: sync.hh[3.187]
#pragma once#include <mutex>#include <condition_variable>/* This template class ensures synchronized access to a value of typeT. It is used as follows:struct Data { int x; ... };Sync<Data> data;{auto data_(data.lock());data_->x = 123;}Here, "data" is automatically unlocked when "data_" goes out ofscope.*/template <class T>class Sync{private:std::mutex mutex;T data;public:class Lock{private:Sync * s;friend Sync;Lock(Sync * s) : s(s) { s->mutex.lock(); }public:Lock(Lock && l) : s(l.s) { l.s = 0; }Lock(const Lock & l) = delete;~Lock() { if (s) s->mutex.unlock(); }T * operator -> () { return &s->data; }T & operator * () { return s->data; }/* FIXME: performance impact of condition_variable_any? */void wait(std::condition_variable_any & cv){assert(s);cv.wait(s->mutex);}};Lock lock() { return Lock(this); }};