ENXUSMSVOU3AZFMH2ZXR4ZVPV2LRRQYQJ6IFX33YN6IH2ORSNSAAC
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
struct State
{
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
/* The build steps that depend on this step. */
std::vector<Step::wptr> rdeps;
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
};
pqxx::work txn(conn);
#if 0
{
auto runnable_(runnable.lock());
auto builds_(builds.lock());
auto steps_(steps.lock());
printMsg(lvlError, format("%1% builds, %2% steps, %3% runnable steps")
% builds_->size()
% steps_->size()
% runnable_->size());
}
#endif
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::list<Build::ptr> newBuilds; // FIXME: use queue
{
pqxx::work txn(conn);
// FIXME: query only builds with ID higher than the previous
// highest.
auto res = txn.exec("select * from Builds where finished = 0 order by id");
// FIXME: query only builds with ID higher than the previous
// highest.
auto res = txn.exec("select * from Builds where finished = 0");
auto builds_(builds.lock());
for (auto const & row : res) {
BuildID id = row["id"].as<BuildID>();
if (has(*builds_, id)) continue;
auto build = std::make_shared<Build>();
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
Build::ptr build(new Build);
build->id = id;
build->drvPath = row["drvPath"].as<string>();
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away,
even while we're still processing other new builds. */
for (auto & build : newBuilds) {
// FIXME: remove build from newBuilds to ensure quick destruction
// FIXME: exception handling
Step::ptr step = createStep(store, build->drvPath);
std::set<Step::ptr> newRunnable;
Step::ptr step = createStep(store, build->drvPath, newRunnable);
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
builds[id] = build;
{
auto builds_(builds.lock());
auto step_(step->state.lock());
(*builds_)[build->id] = build;
step_->builds.push_back(build);
build->toplevel = step;
}
/* Prior to this, the build is not visible to
getDependentBuilds(). Now it is, so the build can be
failed if a dependency fails. (It can't succeed right away
because its top-level is not runnable yet). */
/* Add the new runnable build steps to ‘runnable’ and wake up
the builder threads. */
for (auto & r : newRunnable)
makeRunnable(r);
auto prev = steps.find(drvPath);
if (prev != steps.end()) return prev->second;
/* Check if the requested step already exists. */
{
auto steps_(steps.lock());
auto prev = steps_->find(drvPath);
if (prev != steps_->end()) {
auto step = prev->second.lock();
/* Since ‘step’ is a strong pointer, the referred Step
object won't be deleted after this. */
if (step) return step;
steps_->erase(drvPath); // remove stale entry
}
}
for (auto & build : step->builds) {
auto build2 = build.lock();
if (build2) res.insert(build2);
for (auto & build : step_->builds) {
auto build_ = build.lock();
if (build_) res.insert(build_);
}
/* Make a copy of rdeps so that we don't hold the lock for
very long. */
rdeps = step_->rdeps;
std::unique_lock<std::mutex> lock(runnableMutex);
while (runnable.empty())
runnableCV.wait(lock);
step = *runnable.begin();
runnable.erase(step);
auto runnable_(runnable.lock());
while (runnable_->empty())
runnable_.wait(runnableCV);
auto weak = *runnable_->begin();
runnable_->pop_front();
step = weak.lock();
if (!step) continue;
if (builds.empty()) {
/* Apparently all builds that depend on this derivation are
gone (e.g. cancelled). So don't bother. */
printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);
destroyStep(step, true);
return;
}
if (dependents.empty()) {
/* Apparently all builds that depend on this derivation
are gone (e.g. cancelled). So don't bother. (This is
very unlikely to happen, because normally Steps are
only kept alive by being reachable from a
Build). FIXME: what if a new Build gets a reference to
this step? */
printMsg(lvlInfo, format("cancelling build step ‘%1%’") % step->drvPath);
destroyStep(step, false);
return;
}
// FIXME: handle new builds having been added in the meantime.
/* Remove this step. After this, incoming builds that depend on
drvPath will either see that the output paths exist, or will
create a new build step for drvPath. The latter is fine - it
won't conflict with this one, because we're removing it. In any
case, the set of dependent builds for ‘step’ can't increase
anymore because ‘step’ is no longer visible to createStep(). */
{
auto steps_(steps.lock());
steps_->erase(step->drvPath);
}
/* Get the final set of dependent builds. */
auto dependents = getDependentBuilds(step);
/* Remove the build step from the graph. */
/* In case of success, destroy all Build objects of which ‘step’
is the top-level derivation. In case of failure, destroy all
dependent Build objects. Any Steps not referenced by other
Builds will be destroyed as well. */
for (auto build2 : dependents)
if (build2->toplevel == step || !success) {
auto builds_(builds.lock());
builds_->erase(build2->id);
}
/* Remove the step from the graph. In case of success, make
dependent build steps runnable if they have no other
dependencies. */
#pragma once
#include <mutex>
#include <condition_variable>
/* This template class ensures synchronized access to a value of type
T. It is used as follows:
struct Data { int x; ... };
Sync<Data> data;
{
auto data_(data.lock());
data_->x = 123;
}
Here, "data" is automatically unlocked when "data_" goes out of
scope.
*/
template <class T>
class Sync
{
private:
std::mutex mutex;
T data;
public:
class Lock
{
private:
Sync * s;
friend Sync;
Lock(Sync * s) : s(s) { s->mutex.lock(); }
public:
Lock(Lock && l) : s(l.s) { l.s = 0; }
Lock(const Lock & l) = delete;
~Lock() { if (s) s->mutex.unlock(); }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
/* FIXME: performance impact of condition_variable_any? */
void wait(std::condition_variable_any & cv)
{
assert(s);
cv.wait(s->mutex);
}
};
Lock lock() { return Lock(this); }
};