It now receives notifications about started/finished builds/steps via PostgreSQL. This gets rid of the (substantial) overhead of starting hydra-notify for every event. It also allows other programs (even on other machines) to listen to Hydra notifications.
32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC
FHVJYJFEZEGR6DVIQ2HGE7DLWA65QAGAI3K2KACGNM2JTDJTRJ4AC
SMUWBAD5D2WF2Q5B75YIUQL76FQHY3C4W25OZDFVFCF5AIYUD4YAC
PXTSKX4GBDOBMI4MDO64DBIHSEVTM6IZ5O6GUFXK6UL63GHWBMBAC
ZUXMEUX6PPFXU7ELM3DL7ICM44UBRDNFHOKYZUD2YEJQSVWLTRSQC
HZZCYIYTIZ7PUGXNMZSEO463LS4VO7ADUH3QPZQCLMBTPPPOOJSQC
B7ENVLRS2KLKEG66TY5G4EW6M274JZOBDUCRB7BFBXLTBG455STQC
LSUX6IQR7TBTTCDQAVD3KDAVARLM7SUD7PPVTR5IOKA3BNZDYA7AC
7Q2PXRZAERQFX53OEZGY26ZELA2KRWGXDA5QEYQQVUF5ME65Z7TQC
7LD275CWWC7L5V4ETGIQRMOEJV5JRUSYQ7LUFINAUGAHWANCREGAC
EPWEMRI23UN6C777AL2GWEOXYZRTPEPFC2SZVZ4JTWUGYVFDL5XAC
MHVIT4JYWUYD4UCGB2AHLXWLX6B5SYE22BREERNGANT7RGGDUFOAC
FCTX433OH7QIVWHXL23DKVSUKBQSLQTRK3PFCKKSMLX6A634456QC
NTEDD7T4M7XXYCWNZN57EJOEH3KI3UYMHOANV4MY2LSUPWX3JRDQC
UVQJBDHNWHTGOAEL426F6RZPD67QABBRP73E6QB5ND5OV2S3267QC
LVQXQIYA7QMLVYOANYEFHDBTFAOSE3D2IYAVOG2DXURTASRCUNYQC
24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC
IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC
GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC
K5G5GZY7D7KWVR5RAGZFHH3ZPG5OCLZT4HZ6XIJJ7YYVUMC2CTZQC
JPHDKOMJEDZQYRACBIJ57SJME7IZNLDJY7RB6QR47KG7IL3PG7UAC
X6FOUYFJ5ODEOMHLL6AHM6KI2JRZ2XOI76NYJYJNDUQBTV5UJLBAC
X4KYZJBQ7WULUWTHAPX5BZTZR7C6HRCC4N4J5K726PGZKSY6FQOQC
SL3WSRACCX2IMJHHLTRAUQT7QDLCOKYLVO2FEHWIHXM5GPKSRJTQC
C6HOMHZWMSC7ORGFUF5YG2ACKV2SCP26HL3UH6VXH6RNDYRXH5DAC
XV4AEKJCFTNCOR52IRFYHORCNNFKIHOADIUARTC2U5Z6ZQBEEFYQC
YTAYNN7VNYZNLGUSGY3EF33MGQWMJW76FKV657SBKASQFQC7EB3AC
BRAESISHTN4IIWUBVDMPDMY7QLMJDKX7GQ7K6NSJN66L5VPWSX3QC
HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC
B2L4T3X63XVYJQXEDU4WT5Y4R6PMDXGC6WN2KGOMHRQILSABNQOAC
6WRGCITDYP7JIBYP25QIWCHWRJWFPDP2D3TJS3WO3KUHQQJAWHMQC
/* Asynchronously run plugins. FIXME: if we're killed,
plugin actions might not be run. Need to ensure
at-least-once semantics. */
enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
pqxx::work txn(*conn);
notifyBuildFinished(txn, buildId, dependentIDs);
txn.commit();
while (true) {
try {
NotificationItem item;
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
while (notificationSenderQueue_->empty())
notificationSenderQueue_.wait(notificationSenderWakeup);
item = notificationSenderQueue_->front();
notificationSenderQueue_->pop();
}
MaintainCount<counter> mc(nrNotificationsInProgress);
printMsg(lvlChatty, format("sending notification about build %1%") % item.id);
auto now1 = std::chrono::steady_clock::now();
Pid pid = startProcess([&]() {
Strings argv;
switch (item.type) {
case NotificationItem::Type::BuildStarted:
argv = {"hydra-notify", "build-started", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::BuildFinished:
argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
for (auto id : item.dependentIds)
argv.push_back(std::to_string(id));
break;
case NotificationItem::Type::StepFinished:
argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
break;
};
printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));
execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
throw SysError("cannot start hydra-notify");
});
int res = pid.wait();
if (!statusOk(res))
throw Error("notification about build %d failed: %s", item.id, statusToString(res));
auto now2 = std::chrono::steady_clock::now();
if (item.type == NotificationItem::Type::BuildFinished) {
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Builds set notificationPendingSince = null where id = $1")
(item.id)
.exec();
txn.commit();
}
txn.exec(fmt("notify build_started, '%s'", buildId));
}
} catch (std::exception & e) {
nrNotificationsFailed++;
printMsg(lvlError, format("notification sender: %1%") % e.what());
sleep(5);
}
}
void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds)
{
auto payload = fmt("%d ", buildId);
for (auto & d : dependentIds)
payload += fmt("%d ", d);
// FIXME: apparently parameterized() doesn't support NOTIFY.
txn.exec(fmt("notify build_finished, '%s'", payload));
root.attr("nrNotificationsDone", nrNotificationsDone);
root.attr("nrNotificationsFailed", nrNotificationsFailed);
root.attr("nrNotificationsInProgress", nrNotificationsInProgress);
root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());
root.attr("nrNotificationTimeMs", nrNotificationTimeMs);
uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;
root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);
/* Idem for notification sending. */
auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);
for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
std::thread(&State::notificationSender, this).detach();
/* Enqueue notification items for builds that were finished
previously, but for which we didn't manage to send
notifications. */
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
for (auto const & row : res) {
auto id = row["id"].as<BuildID>();
enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
}
}
counter nrNotificationsDone{0};
counter nrNotificationsFailed{0};
counter nrNotificationsInProgress{0};
counter nrNotificationTimeMs{0};
/* Notification sender work queue. FIXME: if hydra-queue-runner is
killed before it has finished sending notifications about a
build, then the notifications may be lost. It would be better
to mark builds with pending notification in the database. */
struct NotificationItem
{
enum class Type : char {
BuildStarted,
BuildFinished,
StepFinished,
};
Type type;
BuildID id;
std::vector<BuildID> dependentIds;
unsigned int stepNr;
nix::Path logPath;
};
nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
std::condition_variable notificationSenderWakeup;
void enqueueNotificationItem(const NotificationItem && item)
{
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->emplace(item);
}
notificationSenderWakeup.notify_one();
}
/* Thread that asynchronously invokes hydra-notify to send build
notifications. */
void notificationSender();
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
const std::vector<BuildID> & dependentIds);
my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n";
$dbh->do("listen build_started");
$dbh->do("listen build_finished");
$dbh->do("listen step_finished");
sub buildStarted {
my ($buildId) = @_;
my $buildId = shift @ARGV or die;
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
foreach my $plugin (@plugins) {
eval { $plugin->buildStarted($build); };
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
elsif ($cmd eq "build-started") {
foreach my $plugin (@plugins) {
eval { $plugin->buildStarted($build); };
if ($@) {
print STDERR "$plugin->buildStarted: $@\n";
}
}
}
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
else {
die "unknown action ‘$cmd’";
# Process builds that finished while hydra-notify wasn't running.
for my $build ($db->resultset('Builds')->search(
{ notificationpendingsince => { '!=', undef } }))
{
my $buildId = $build->id;
print STDERR "sending notifications for build ${\$buildId}...\n";
buildFinished($build);
}
# Process incoming notifications.
my $fd = $dbh->func("getfd");
my $sel = IO::Select->new($fd);
while (1) {
$sel->can_read;
my $notify = $dbh->func("pg_notifies");
next if !$notify;
my ($channelName, $pid, $payload) = @$notify;
#print STDERR "got '$channelName' from $pid: $payload\n";
my @payload = split / /, $payload;
eval {
if ($channelName eq "build_started") {
buildStarted(int($payload[0]));
} elsif ($channelName eq "build_finished") {
my $buildId = int($payload[0]);
my $build = $db->resultset('Builds')->find($buildId)
or die "build $buildId does not exist\n";
buildFinished($build, @payload[1..$#payload]);
} elsif ($channelName eq "step_finished") {
stepFinished(int($payload[0]), int($payload[1]));
}
};
if ($@) {
print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
}