Very basic multi-threaded queue runner
[?]
May 28, 2015, 11:31 PM
NJJ7H64SZOX5EGACDCQAUQ7R6UEWD5IIC35A2MWFOOJV55DJYPHACDependencies
- [2]
24BMQDZAStart of single-process hydra-queue-runner - [3]
62MQPRXCPass null values to libpqxx properly
Change contents
- replacement in src/hydra-queue-runner/build-result.cc at line 9
BuildResult getBuildResult(const Derivation & drv)BuildResult getBuildResult(std::shared_ptr<StoreAPI> store, const Derivation & drv) - edit in src/hydra-queue-runner/build-result.hh at line 2
#include <memory> - replacement in src/hydra-queue-runner/build-result.hh at line 27
BuildResult getBuildResult(const nix::Derivation & drv);[2.4741]BuildResult getBuildResult(std::shared_ptr<nix::StoreAPI> store, const nix::Derivation & drv); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 1
#include <atomic>#include <condition_variable> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 4
#include <memory> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 5
#include <memory>#include <thread> - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 17
std::mutex exitRequestMutex;std::condition_variable exitRequest;bool exitRequested(false); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 23
static std::atomic_int _int(0); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 25
void sigintHandler(int signo){_int = 1;}void signalThread(){struct sigaction act;act.sa_handler = sigintHandler;sigemptyset(&act.sa_mask);act.sa_flags = 0;if (sigaction(SIGINT, &act, 0))throw SysError("installing handler for SIGINT");while (true) {sleep(1000000);if (_int) break;}{std::lock_guard<std::mutex> lock(exitRequestMutex);exitRequested = true;}exitRequest.notify_all();} - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 115
std::thread queueMonitorThread; - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 128
std::mutex runnableMutex;std::condition_variable runnableCV; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 146
void getQueuedBuilds(pqxx::connection & conn);void queueMonitorThreadEntry();void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 150
Step::ptr createStep(const Path & drvPath);Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 157
void doBuildSteps();void makeRunnable(Step::ptr step); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 159
void doBuildStep(Step::ptr step);void builderThreadEntry(int slot);void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 165
void run(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 233
void State::getQueuedBuilds(pqxx::connection & conn)void State::queueMonitorThreadEntry(){auto store = openStore(); // FIXME: poolConnection conn;while (true) {getQueuedBuilds(store, conn);{std::unique_lock<std::mutex> lock(exitRequestMutex);exitRequest.wait_for(lock, std::chrono::seconds(5));if (exitRequested) break;}}printMsg(lvlError, "queue monitor exits");}void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn) - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 255
printMsg(lvlError, "checking the queue..."); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 288
Step::ptr step = createStep(build->drvPath);Step::ptr step = createStep(store, build->drvPath); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 291
BuildResult res = getBuildResult(drv);BuildResult res = getBuildResult(store, drv); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 309
Step::ptr State::createStep(const Path & drvPath)Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 337
Step::ptr dep = createStep(i.first);Step::ptr dep = createStep(store, i.first); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 346
if (step->deps.empty()) runnable.insert(step);if (step->deps.empty()) makeRunnable(step); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 365
runnable.insert(rdep);makeRunnable(rdep); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 409
void State::doBuildSteps()void State::makeRunnable(Step::ptr step) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 411
while (!runnable.empty()) {printMsg(lvlInfo, format("%1% runnable steps") % runnable.size());Step::ptr step = *runnable.begin();runnable.erase(step);doBuildStep(step);assert(step->deps.empty());{std::lock_guard<std::mutex> lock(runnableMutex);runnable.insert(step); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 417
runnableCV.notify_one(); - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 422
void State::doBuildStep(Step::ptr step)void State::builderThreadEntry(int slot){auto store = openStore(); // FIXME: poolwhile (true) {Step::ptr step;{std::unique_lock<std::mutex> lock(runnableMutex);while (runnable.empty())runnableCV.wait(lock);step = *runnable.begin();runnable.erase(step);}printMsg(lvlError, format("slot %1%: got build step ‘%2%’") % slot % step->drvPath);doBuildStep(store, step);}printMsg(lvlError, "builder thread exits");}void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step) - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 498
if (success) res = getBuildResult(step->drv);if (success) res = getBuildResult(store, step->drv); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 581
}void State::run(){{Connection conn;markActiveBuildStepsAsAborted(conn, 0);}queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this);sleep(1);for (int n = 0; n < 4; n++)std::thread(&State::builderThreadEntry, this, n).detach();queueMonitorThread.join(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 606
std::thread(signalThread).detach(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 609
/* Ignore signals. This is inherited by the other threads. */sigset_t set;sigemptyset(&set);sigaddset(&set, SIGHUP);sigaddset(&set, SIGINT);sigaddset(&set, SIGTERM);sigprocmask(SIG_BLOCK, &set, NULL); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 619
store = openStore(); - edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 622
Connection conn; - replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 623
state.markActiveBuildStepsAsAborted(conn, 0);state.getQueuedBuilds(conn);state.doBuildSteps();state.run();