This rewrites the top-level loop of hydra-evaluator in C++. The Perl stuff is moved into hydra-eval-jobset. (Rewriting the entire evaluator would be nice but is a bit too much work.) The new version has some advantages:
It can run multiple jobset evaluations in parallel.
It uses PostgreSQL notifications so it doesn't have to poll the database. So if a jobset is triggered via the web interface or from a GitHub / Bitbucket webhook, evaluation of the jobset will start almost instantaneously (assuming the evaluator is not at its concurrency limit).
It imposes a timeout on evaluations. So if e.g. hydra-eval-jobset hangs connecting to a Mercurial server, it will eventually be killed.
4YCF3KBGI4VYKHJXAREJLCJLY3UWB2FX447CJ4XQWFRKRFKG5WCQC
Z4Y3TVEEN6LIGBRCMWVFZYFFE6NKHUSO76RZ3GSPTZEME3CBLUTAC
3CVY4QJXRFFA6V4JVBPMSPRKL2KGBCCE3HSOFWVUK76GIGO3NDPAC
N66HHKLQGRAVLPEO2LTAOGGXKZUYKYQSSOG4MQAMQVUIXVHIYP3QC
ACBS7C6QGXAMPLQ336WAQMMGLZXBDNNYPE26ZBPUQ24GL4BF643AC
Q3VTDUSQ75JD5L55VWQV6MGAS6IC4AWGSADL2ISMX5E7R3LBVBHAC
4VYY2ADPFENPAC36FJCSEMEHOPL6E2T4RPXJKYVF47DFKAO6PLIAC
3X6FN7RGF2MNQDBVXQHWOCVGWBMGAJXTXHH2B4N7JIRSZZSWU3UAC
PKTKSVTLHM5L6MOKXDQOJZ35FV2VXAD556S26QMICR6SNIY56LBQC
FYXZQ55GY3O5B4HHKUDU2UQZPTB7RXF4GXBF4ZZL5IIM4TVL7Y3QC
WJTP6VZILETQ4GWUNE7QPHQK3HWJJ6J5ILX2E7KYKUANAA4DBF4QC
FV2M6MOTAP4BJMEKU5XUDVEACWEJGEIRCCE2MRY3F6SF2SFOE3MQC
RBQEBVT5RL4VVP6Y2V3IIKIX4E7USU5COI3W7Y2Y36VV3QFCTQQQC
34UH6B6LBITNUL6HXSPLL3657R73LC6C34QMJJLKTMLUGHFTLUSQC
T4LLYESZ2HUXSLKZ6GNBLVWUVG7R5IDFHYHYO773QIZ6QTOOXR2AC
YQWH4POV22KYCCKOTZXD36QKUOWEQ2DSPYPO5DNZDZ344RI25OAAC
6L3ZM55SJNE7HNUZ46SFEXV6MGDZNI26XJTEDTGTP5ZPACNF43AAC
24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC
CLXEECMFKLUIN5QBV3BPPPSU6G5UF6MXRSNWA6LXUBNQGSJY4U5QC
HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC
Y6H7Y3OTXVLF6PJ5BR6WXS7KVS4VMTUKRYVM6FALKMY3DCBYEJSQC
D5QIOJGPKQJIYBUCSC3MFJ3TXLPNZ2XMI37GXMFRVRFWWR2VMTFAC
L4AI5YL6CBMVS5GEPME3WIL2CHACMU7PGEKKNKLHHF5SP4ID3LCQC
N22GPKYTOLZLBGTGDATQDVZ4R5APZEAOIA7L32X4UXBH4XNI7MWAC
OVR2RWBIUXNW2XSG63KFL2S3Q7UVTLHOEYO3573LZBY7AMLUKKTQC
PCKLFRT5IZVLG47GQQ23GBSROKUR4CUEZW4PRVGREHNDFTCZ7VBAC
X27GNHDV5KPZ5GSH6DCAJMNCEMZLCP7M43JWF2X3O5QWXMOX273AC
QLOLZHRXOUSNVLJG2SWVC5EISFUOUKJIT32XN6DNW4VFFJMN5PMAC
C3AG65SWAGDSML5MBD4ZPRCHI2LNNDN6IQZOI3HLYRJRPAW3QPAAC
J74UTA3I7FHYJBO7FZMTQ3AQTZPD36DD7RFVGJHLHLOZ5YBPNONQC
5GRJZZORI57MXOZPWMCEZQNBYDXJUAC4QVEO5APLECV5UFXAJHYAC
J7WIAUWTYAUXLXVND7I45NOGHMDT4GGB5GPRJCXJJDGKQHWSVVGQC
53PW36WSP3WZOGHI4OSMX2I26O32WWZOZJH3B2W67JZMRKAOWTXAC
TULPZ62YXEHXUWGBZMLCLYILEXPQS5ADPT22574BIRFU4CZMBSKAC
SQQSV4NBN6QB6BZVC7FCUFMWEKSGSNATQ3IFOVXJNTGSSGP4YM3QC
TIOBBINAK6IEDTCFBP67QDP3U7BHPVJOHBD6FEHASSSIQGNLCUXAC
RXVJFQ5AV3WME4HDVBPSRCALQTXROT4KQPOQVO6KTWTBNZIZZGPAC
D6YQQQCNJT7RJ4KF5JN76NM5HT4FVROBGSNB26LYPRYVLEG576MAC
GAZHOBWEXOSWFNJYOYXASH3LCOLREWB3DZXELIAP2Q6L2X2ZSI4QC
UQYHPQ6UO7LIQ4P6NSCAEV677K52RQOQKCF4KSGNXMMFOQKY7IPQC
G2ZB6464XGPBIMSZIPSB24EIXSCCGV4XWC3IWPS2CXYPDSUZSU5QC
3PNG7NIBQQURUUPRVQXYL342OT7JUUYOMY2JJNP6YDX7SYJDZMYAC
QIW2WZKWE3ULO4O4QB6EQWUKZY2UFNN7WA464FMIZOAHP3UHFGSAC
UGA45FNCYAHX77QI3MTMSRF676N2BUW2ZUUI4OZCI2C3EJOYKY3QC
HX4QYOYAKKFKK4KF6SZ6FQCM5ZF7ZFDTQUR4GS5WPPTYZASYZZUAC
YI3BZG5NWMKBT7T2HEOHVHSZBQ5KPDWSI3VRYOMQ346LRFB4MR2AC
LZVO64YG43JD7YMZSCTZNOBS5ROZA4FMPKJW2YOMHX2V5PTGBVWQC
bin_PROGRAMS = hydra-evaluator
hydra_evaluator_SOURCES = hydra-evaluator.cc
hydra_evaluator_LDADD = $(NIX_LIBS) -lpqxx
hydra_evaluator_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra
#include "shared.hh"
#include "db.hh"
#include "pool.hh"
#include <algorithm>
#include <thread>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace nix;
struct Evaluator
{
nix::Pool<Connection> dbPool;
typedef std::pair<std::string, std::string> JobsetName;
struct Jobset
{
JobsetName name;
time_t lastCheckedTime, triggerTime;
int checkInterval;
Pid pid;
};
typedef std::map<JobsetName, Jobset> Jobsets;
int evalTimeout = 3600;
size_t maxEvals = 4;
struct State
{
size_t runningEvals = 0;
Jobsets jobsets;
};
Sync<State> state_;
std::condition_variable childStarted;
std::condition_variable maybeDoWork;
const time_t notTriggered = std::numeric_limits<time_t>::max();
void readJobsets()
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
auto res = txn.parameterized
("select project, j.name, lastCheckedTime, triggerTime, checkInterval from Jobsets j join Projects p on j.project = p.name "
"where j.enabled != 0 and p.enabled != 0").exec();
auto state(state_.lock());
std::set<JobsetName> seen;
for (auto const & row : res) {
auto name = JobsetName{row["project"].as<std::string>(), row["name"].as<std::string>()};
auto res = state->jobsets.try_emplace(name, Jobset{name});
auto & jobset = res.first->second;
jobset.lastCheckedTime = row["lastCheckedTime"].as<time_t>(0);
jobset.triggerTime = row["triggerTime"].as<time_t>(notTriggered);
jobset.checkInterval = row["checkInterval"].as<time_t>();
seen.insert(name);
}
for (auto i = state->jobsets.begin(); i != state->jobsets.end(); )
if (seen.count(i->first))
++i;
else {
printInfo("forgetting jobset ‘%s:%s’", i->first.first, i->first.second);
i = state->jobsets.erase(i);
}
}
void startEval(State & state, Jobset & jobset)
{
printInfo("starting evaluation of jobset ‘%s:%s’", jobset.name.first, jobset.name.second);
assert(jobset.pid == -1);
jobset.pid = startProcess([&]() {
Strings args = { "timeout", "-s", "KILL", std::to_string(evalTimeout), "hydra-eval-jobset", jobset.name.first, jobset.name.second };
execvp(args.front().c_str(), stringsToCharPtrs(args).data());
throw SysError(format("executing ‘%1%’") % args.front());
});
state.runningEvals++;
childStarted.notify_one();
time_t now = time(0);
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Jobsets set lastCheckedTime = $1, triggerTime = null where project = $2 and name = $3")
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.commit();
jobset.lastCheckedTime = now;
jobset.triggerTime = notTriggered;
}
}
void startEvals(State & state)
{
std::vector<Jobsets::iterator> sorted;
time_t now = time(0);
/* Filter out jobsets that have been evaluated recently and have
not been triggered. */
for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i)
if (i->second.pid == -1 &&
(i->second.triggerTime != std::numeric_limits<time_t>::max() ||
(i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now)))
sorted.push_back(i);
/* Put jobsets in order of ascending trigger time, last checked
time, and name. */
std::sort(sorted.begin(), sorted.end(),
[](const Jobsets::iterator & a, const Jobsets::iterator & b) {
return
a->second.triggerTime != b->second.triggerTime
? a->second.triggerTime < b->second.triggerTime
: a->second.lastCheckedTime != b->second.lastCheckedTime
? a->second.lastCheckedTime < b->second.lastCheckedTime
: a->first < b->first;
});
/* Start jobset evaluations up to the concurrency limit.*/
for (auto & i : sorted) {
if (state.runningEvals >= maxEvals) break;
startEval(state, i->second);
}
}
void loop()
{
auto state(state_.lock());
while (true) {
time_t now = time(0);
std::chrono::seconds sleepTime = std::chrono::seconds::max();
if (state->runningEvals < maxEvals) {
for (auto & i : state->jobsets)
if (i.second.pid == -1 &&
i.second.checkInterval > 0)
sleepTime = std::min(sleepTime, std::chrono::seconds(
std::max((time_t) 1, i.second.lastCheckedTime - now + i.second.checkInterval)));
}
debug("waiting for %d s", sleepTime.count());
if (sleepTime == std::chrono::seconds::max())
state.wait(maybeDoWork);
else
state.wait_for(maybeDoWork, sleepTime);
startEvals(*state);
}
}
/* A thread that listens to PostgreSQL notifications about jobset
changes, updates the jobsets map, and signals the main thread
to start evaluations. */
void databaseMonitor()
{
while (true) {
try {
auto conn(dbPool.get());
receiver jobsetsAdded(*conn, "jobsets_added");
receiver jobsetsDeleted(*conn, "jobsets_deleted");
receiver jobsetsChanged(*conn, "jobset_scheduling_changed");
while (true) {
/* Note: we read/notify before
await_notification() to ensure we don't miss a
state change. */
readJobsets();
maybeDoWork.notify_one();
conn->await_notification();
printInfo("received jobset event");
}
} catch (std::exception & e) {
printError("exception in database monitor thread: %s", e.what());
sleep(30);
}
}
}
/* A thread that reaps child processes.*/
void reaper()
{
while (true) {
{
auto state(state_.lock());
while (!state->runningEvals)
state.wait(childStarted);
}
int status;
pid_t pid = waitpid(-1, &status, 0);
if (pid == -1) {
if (errno == EINTR) continue;
throw SysError("waiting for children");
}
{
auto state(state_.lock());
assert(state->runningEvals);
state->runningEvals--;
for (auto & jobset : state->jobsets)
if (jobset.second.pid == pid) {
printInfo("evaluation of jobset ‘%s:%s’ finished with status %d",
jobset.first.first, jobset.first.second, status);
jobset.second.pid.release();
maybeDoWork.notify_one();
break;
}
}
}
}
void run()
{
std::thread reaperThread([&]() { reaper(); });
std::thread monitorThread([&]() { databaseMonitor(); });
while (true) {
try {
loop();
} catch (std::exception & e) {
printError("exception in main loop: %s", e.what());
sleep(30);
}
}
}
};
int main(int argc, char * * argv)
{
return handleExceptions(argv[0], [&]() {
initNix();
signal(SIGINT, SIG_DFL);
signal(SIGTERM, SIG_DFL);
signal(SIGHUP, SIG_DFL);
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
return false;
});
Evaluator().run();
});
}
sub checkSomeJobset {
# If any jobset has been triggered by a push, check it.
my ($jobset) = $db->resultset('Jobsets')->search(
{ 'triggertime' => { '!=', undef } },
{ join => 'project', order_by => [ 'triggertime' ], rows => 1 });
die "syntax: $0 <PROJECT> <JOBSET>\n" unless @ARGV == 2;
# Otherwise, check the jobset that hasn't been checked for the
# longest time (but don't check more often than the jobset's
# minimal check interval).
($jobset) = $db->resultset('Jobsets')->search(
{ 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 },
, 'checkinterval' => { '!=', 0 }
, -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] },
{ join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 })
unless defined $jobset;
return 0 unless defined $jobset;
return system($0, $jobset->project->name, $jobset->name) == 0;
}
if (scalar @ARGV == 2) {
my $projectName = $ARGV[0];
my $jobsetName = $ARGV[1];
my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
exit checkJobset($jobset);
}
while (1) {
eval {
if (checkSomeJobset) {
# Just so we don't go completely crazy if lastcheckedtime
# isn't updated properly.
sleep 1;
} else {
# print STDERR "sleeping...\n";
sleep 30;
}
};
if ($@) { print STDERR "$@"; }
}
my $projectName = $ARGV[0];
my $jobsetName = $ARGV[1];
my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
exit checkJobset($jobset);
create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
create trigger JobsetSchedulingChanged after update on Jobsets for each row
when (((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
or old.checkInterval != new.checkInterval
or old.enabled != new.enabled)
execute procedure notifyJobsetSchedulingChanged();
create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql;
create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded();
create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
create trigger JobsetSchedulingChanged after update on Jobsets for each row
when ((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
or old.checkInterval != new.checkInterval
or old.enabled != new.enabled
execute procedure notifyJobsetSchedulingChanged();
PATH=$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \
PATH=$(abs_top_srcdir)/src/hydra-evaluator:$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \