It now receives notifications about started/finished builds/steps via PostgreSQL. This gets rid of the (substantial) overhead of starting hydra-notify for every event. It also allows other programs (even on other machines) to listen to Hydra notifications.
32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC FHVJYJFEZEGR6DVIQ2HGE7DLWA65QAGAI3K2KACGNM2JTDJTRJ4AC SMUWBAD5D2WF2Q5B75YIUQL76FQHY3C4W25OZDFVFCF5AIYUD4YAC PXTSKX4GBDOBMI4MDO64DBIHSEVTM6IZ5O6GUFXK6UL63GHWBMBAC ZUXMEUX6PPFXU7ELM3DL7ICM44UBRDNFHOKYZUD2YEJQSVWLTRSQC HZZCYIYTIZ7PUGXNMZSEO463LS4VO7ADUH3QPZQCLMBTPPPOOJSQC B7ENVLRS2KLKEG66TY5G4EW6M274JZOBDUCRB7BFBXLTBG455STQC LSUX6IQR7TBTTCDQAVD3KDAVARLM7SUD7PPVTR5IOKA3BNZDYA7AC 7Q2PXRZAERQFX53OEZGY26ZELA2KRWGXDA5QEYQQVUF5ME65Z7TQC 7LD275CWWC7L5V4ETGIQRMOEJV5JRUSYQ7LUFINAUGAHWANCREGAC EPWEMRI23UN6C777AL2GWEOXYZRTPEPFC2SZVZ4JTWUGYVFDL5XAC MHVIT4JYWUYD4UCGB2AHLXWLX6B5SYE22BREERNGANT7RGGDUFOAC FCTX433OH7QIVWHXL23DKVSUKBQSLQTRK3PFCKKSMLX6A634456QC NTEDD7T4M7XXYCWNZN57EJOEH3KI3UYMHOANV4MY2LSUPWX3JRDQC UVQJBDHNWHTGOAEL426F6RZPD67QABBRP73E6QB5ND5OV2S3267QC LVQXQIYA7QMLVYOANYEFHDBTFAOSE3D2IYAVOG2DXURTASRCUNYQC 24BMQDZAWDQ7VNIA7TIROXSOYLOJBNZ2E4264WHWNJAEN6ZB3UOAC IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC GS4BE6TB6GH2JUZJHDPHL6YG7J7YYESF3YOZJZ2CFABXUTO4VYPQC K5G5GZY7D7KWVR5RAGZFHH3ZPG5OCLZT4HZ6XIJJ7YYVUMC2CTZQC JPHDKOMJEDZQYRACBIJ57SJME7IZNLDJY7RB6QR47KG7IL3PG7UAC X6FOUYFJ5ODEOMHLL6AHM6KI2JRZ2XOI76NYJYJNDUQBTV5UJLBAC X4KYZJBQ7WULUWTHAPX5BZTZR7C6HRCC4N4J5K726PGZKSY6FQOQC SL3WSRACCX2IMJHHLTRAUQT7QDLCOKYLVO2FEHWIHXM5GPKSRJTQC C6HOMHZWMSC7ORGFUF5YG2ACKV2SCP26HL3UH6VXH6RNDYRXH5DAC XV4AEKJCFTNCOR52IRFYHORCNNFKIHOADIUARTC2U5Z6ZQBEEFYQC YTAYNN7VNYZNLGUSGY3EF33MGQWMJW76FKV657SBKASQFQC7EB3AC BRAESISHTN4IIWUBVDMPDMY7QLMJDKX7GQ7K6NSJN66L5VPWSX3QC HJOEIMLRDVQ2KZI5HGL2HKGBM3AHP7YIKGKDAGFUNKRUXVRB24NAC B2L4T3X63XVYJQXEDU4WT5Y4R6PMDXGC6WN2KGOMHRQILSABNQOAC 6WRGCITDYP7JIBYP25QIWCHWRJWFPDP2D3TJS3WO3KUHQQJAWHMQC /* Asynchronously run plugins. FIXME: if we're killed,plugin actions might not be run. Need to ensureat-least-once semantics. */enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
pqxx::work txn(*conn);notifyBuildFinished(txn, buildId, dependentIDs);txn.commit();
while (true) {try {NotificationItem item;{auto notificationSenderQueue_(notificationSenderQueue.lock());while (notificationSenderQueue_->empty())notificationSenderQueue_.wait(notificationSenderWakeup);item = notificationSenderQueue_->front();notificationSenderQueue_->pop();}MaintainCount<counter> mc(nrNotificationsInProgress);printMsg(lvlChatty, format("sending notification about build %1%") % item.id);auto now1 = std::chrono::steady_clock::now();Pid pid = startProcess([&]() {Strings argv;switch (item.type) {case NotificationItem::Type::BuildStarted:argv = {"hydra-notify", "build-started", std::to_string(item.id)};for (auto id : item.dependentIds)argv.push_back(std::to_string(id));break;case NotificationItem::Type::BuildFinished:argv = {"hydra-notify", "build-finished", std::to_string(item.id)};for (auto id : item.dependentIds)argv.push_back(std::to_string(id));break;case NotificationItem::Type::StepFinished:argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};break;};printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove castthrow SysError("cannot start hydra-notify");});int res = pid.wait();if (!statusOk(res))throw Error("notification about build %d failed: %s", item.id, statusToString(res));auto now2 = std::chrono::steady_clock::now();if (item.type == NotificationItem::Type::BuildFinished) {auto conn(dbPool.get());pqxx::work txn(*conn);txn.parameterized("update Builds set notificationPendingSince = null where id = $1")(item.id).exec();txn.commit();}
txn.exec(fmt("notify build_started, '%s'", buildId));}
} catch (std::exception & e) {nrNotificationsFailed++;printMsg(lvlError, format("notification sender: %1%") % e.what());sleep(5);}}
void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,const std::vector<BuildID> & dependentIds){auto payload = fmt("%d ", buildId);for (auto & d : dependentIds)payload += fmt("%d ", d);// FIXME: apparently parameterized() doesn't support NOTIFY.txn.exec(fmt("notify build_finished, '%s'", payload));
root.attr("nrNotificationsDone", nrNotificationsDone);root.attr("nrNotificationsFailed", nrNotificationsFailed);root.attr("nrNotificationsInProgress", nrNotificationsInProgress);root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());root.attr("nrNotificationTimeMs", nrNotificationTimeMs);uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);
/* Idem for notification sending. */auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)std::thread(&State::notificationSender, this).detach();
/* Enqueue notification items for builds that were finishedpreviously, but for which we didn't manage to sendnotifications. */{auto conn(dbPool.get());pqxx::work txn(*conn);auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();for (auto const & row : res) {auto id = row["id"].as<BuildID>();enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});}}
counter nrNotificationsDone{0};counter nrNotificationsFailed{0};counter nrNotificationsInProgress{0};counter nrNotificationTimeMs{0};/* Notification sender work queue. FIXME: if hydra-queue-runner iskilled before it has finished sending notifications about abuild, then the notifications may be lost. It would be betterto mark builds with pending notification in the database. */struct NotificationItem{enum class Type : char {BuildStarted,BuildFinished,StepFinished,};Type type;BuildID id;std::vector<BuildID> dependentIds;unsigned int stepNr;nix::Path logPath;};nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;std::condition_variable notificationSenderWakeup;void enqueueNotificationItem(const NotificationItem && item){{auto notificationSenderQueue_(notificationSenderQueue.lock());notificationSenderQueue_->emplace(item);}notificationSenderWakeup.notify_one();}
/* Thread that asynchronously invokes hydra-notify to send buildnotifications. */void notificationSender();
void notifyBuildStarted(pqxx::work & txn, BuildID buildId);void notifyBuildFinished(pqxx::work & txn, BuildID buildId,const std::vector<BuildID> & dependentIds);
my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n";
$dbh->do("listen build_started");$dbh->do("listen build_finished");$dbh->do("listen step_finished");sub buildStarted {my ($buildId) = @_;
my $buildId = shift @ARGV or die;my $build = $db->resultset('Builds')->find($buildId)or die "build $buildId does not exist\n";
my $build = $db->resultset('Builds')->find($buildId)or die "build $buildId does not exist\n";foreach my $plugin (@plugins) {eval { $plugin->buildStarted($build); };if ($@) {print STDERR "$plugin->buildStarted: $@\n";}}}
elsif ($cmd eq "build-started") {foreach my $plugin (@plugins) {eval { $plugin->buildStarted($build); };if ($@) {print STDERR "$plugin->buildStarted: $@\n";}}}
my $build = $db->resultset('Builds')->find($buildId)or die "build $buildId does not exist\n";
else {die "unknown action ‘$cmd’";
# Process builds that finished while hydra-notify wasn't running.for my $build ($db->resultset('Builds')->search({ notificationpendingsince => { '!=', undef } })){my $buildId = $build->id;print STDERR "sending notifications for build ${\$buildId}...\n";buildFinished($build);}# Process incoming notifications.my $fd = $dbh->func("getfd");my $sel = IO::Select->new($fd);while (1) {$sel->can_read;my $notify = $dbh->func("pg_notifies");next if !$notify;my ($channelName, $pid, $payload) = @$notify;#print STDERR "got '$channelName' from $pid: $payload\n";my @payload = split / /, $payload;eval {if ($channelName eq "build_started") {buildStarted(int($payload[0]));} elsif ($channelName eq "build_finished") {my $buildId = int($payload[0]);my $build = $db->resultset('Builds')->find($buildId)or die "build $buildId does not exist\n";buildFinished($build, @payload[1..$#payload]);} elsif ($channelName eq "step_finished") {stepFinished(int($payload[0]), int($payload[1]));}};if ($@) {print STDERR "error processing message '$payload' on channel '$channelName': $@\n";}