Turn hydra-notify into a daemon

[?]
Aug 9, 2019, 5:11 PM
32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC

Dependencies

  • [2] FHVJYJFE Upload build logs to the binary cache
  • [3] SMUWBAD5 Send BuildFinished notifications on cached build results.
  • [4] PXTSKX4G Add buildQueued plugin hook
  • [5] ZUXMEUX6 Improve erorr message
  • [6] HZZCYIYT hydra-queue-runner: Allow concurrent notifications
  • [7] B7ENVLRS hydra-queue-runner: Make build notification more reliable
  • [8] LSUX6IQR Update to latest nixUnstable
  • [9] 7Q2PXRZA hydra-notify step-finished: Don't barf if the step has no log file
  • [10] 7LD275CW allow using a shorter context and increase hydra-notify debug
  • [11] FCTX433O Add buildStarted plugin hook
  • [12] YTAYNN7V Queue monitor: Bail out earlier if a step has failed previously
  • [13] EPWEMRI2 Allow determinism checking for entire jobsets
  • [14] 6WRGCITD Enable declarative projects.
  • [15] 5AIYUMTB Basic remote building
  • [16] XV4AEKJC hydra-queue-runner: Handle status queries on the main thread
  • [17] UYUVQWXQ Fix hydra-queue-runner --build-one
  • [18] HJOEIMLR Refactor
  • [19] NTEDD7T4 Provide a plugin hook for when build steps finish
  • [20] T2EIYJNG On SIGINT, shut down the builder threads
  • [21] X4KYZJBQ Use latest nixUnstable
  • [22] B2L4T3X6 Sync with Nix
  • [23] UVQJBDHN Move log compression to a plugin
  • [24] OG3Z3QGC Namespace cleanup
  • [25] IE2PRAQU hydra-queue-runner: Send build notifications
  • [26] BCDHO4OU Set propagatedFrom for cached failed build steps
  • [27] X6FOUYFJ int2String -> std::to_string
  • [28] K5G5GZY7 Guard against concurrent invocations of hydra-queue-runner
  • [29] RNJILKTW Upload log files to the right location
  • [30] MHVIT4JY Split hydra-queue-runner.cc more
  • [31] OKQLN5AG Set proper charset on log files
  • [32] SL3WSRAC hydra-queue-runner: Limit memory usage
  • [33] LVQXQIYA Kill active build steps when builds are cancelled
  • [34] JPHDKOMJ hydra-queue-runner: Keep some notification statistics
  • [35] HHOMBU7G hydra-queue-runner: Implement timeouts
  • [36] KQ3EGUQY Add some instrumentation to keep track of dispatcher cost
  • [37] 7LWB2J2Z Periodically clear orphaned build steps
  • [38] C6HOMHZW Don't try to handle SIGINT
  • [39] GS4BE6TB Asynchronously compress build logs
  • [40] BRAESISH Warn if PostgreSQL appears stalled
  • [*] 24BMQDZA Start of single-process hydra-queue-runner

Change contents

  • edit in src/hydra-queue-runner/builder.cc at line 101
    [11.500]
    [11.2164]
    auto conn(dbPool.get());
  • replacement in src/hydra-queue-runner/builder.cc at line 127
    [11.93][11.93:123](),[11.123][11.0:90]()
    build = build2;
    enqueueNotificationItem({NotificationItem::Type::BuildStarted, build->id});
    [11.93]
    [11.501]
    build = build2;
    pqxx::work txn(*conn);
    notifyBuildStarted(txn, build->id);
    txn.commit();
  • edit in src/hydra-queue-runner/builder.cc at line 151
    [11.3461][11.3461:3491]()
    auto conn(dbPool.get());
  • edit in src/hydra-queue-runner/builder.cc at line 175
    [2.596][2.596:597](),[2.597][11.33:320](),[11.33][11.33:320]()
    /* Asynchronously run plugins. FIXME: if we're killed,
    plugin actions might not be run. Need to ensure
    at-least-once semantics. */
    enqueueNotificationItem({NotificationItem::Type::StepFinished, buildId, {}, stepNr, result.logFile});
  • replacement in src/hydra-queue-runner/builder.cc at line 342
    [11.8217][11.226:341]()
    for (auto id : buildIDs)
    enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
    [11.8217]
    [11.8514]
    {
    pqxx::work txn(*conn);
    for (auto id : buildIDs)
    notifyBuildFinished(txn, id, {});
    txn.commit();
    }
  • replacement in src/hydra-queue-runner/builder.cc at line 466
    [11.13945][11.13945:14020](),[11.14020][11.2506:2630]()
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
    [11.13945]
    [11.14107]
    pqxx::work txn(*conn);
    notifyBuildFinished(txn, buildId, dependentIDs);
    txn.commit();
  • edit in src/hydra-queue-runner/builder.cc at line 470
    [11.14117][11.14117:14165]()
    notificationSenderWakeup.notify_one();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 302
    [11.2024]
    [42.13421]
    assert(result.logFile.find('\'') == std::string::npos);
    txn.exec(fmt("notify step_finished, '%d %d %s'", buildId, stepNr,
    result.logFile.empty() ? "-" : result.logFile));
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 456
    [11.1348][11.1348:1381]()
    void State::notificationSender()
    [11.1820]
    [11.1381]
    void State::notifyBuildStarted(pqxx::work & txn, BuildID buildId)
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 458
    [11.1383][11.1383:1416](),[11.1416][11.170:171](),[11.170][11.170:171](),[11.171][11.1417:1801](),[11.1801][11.0:1](),[11.1][8.512:578](),[11.58][11.1801:1802](),[8.578][11.1801:1802](),[11.1801][11.1801:1802](),[11.1802][11.653:744](),[11.744][11.1896:1897](),[11.1896][11.1896:1897](),[11.1897][11.59:118](),[11.118][11.1897:1940](),[11.1897][11.1897:1940](),[11.1940][11.1732:2662](),[11.2662][10.0:94](),[10.94][11.2117:2300](),[11.212][11.2117:2300](),[11.2662][11.2117:2300](),[11.2117][11.2117:2300](),[11.2300][11.184:218](),[11.218][11.2338:2339](),[11.2338][11.2338:2339](),[11.2339][5.0:133](),[5.133][11.2505:2506](),[11.990][11.2505:2506](),[11.2505][11.2505:2506](),[11.2506][11.119:178](),[11.178][7.396:771]()
    while (true) {
    try {
    NotificationItem item;
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    while (notificationSenderQueue_->empty())
    notificationSenderQueue_.wait(notificationSenderWakeup);
    item = notificationSenderQueue_->front();
    notificationSenderQueue_->pop();
    }
    MaintainCount<counter> mc(nrNotificationsInProgress);
    printMsg(lvlChatty, format("sending notification about build %1%") % item.id);
    auto now1 = std::chrono::steady_clock::now();
    Pid pid = startProcess([&]() {
    Strings argv;
    switch (item.type) {
    case NotificationItem::Type::BuildStarted:
    argv = {"hydra-notify", "build-started", std::to_string(item.id)};
    for (auto id : item.dependentIds)
    argv.push_back(std::to_string(id));
    break;
    case NotificationItem::Type::BuildFinished:
    argv = {"hydra-notify", "build-finished", std::to_string(item.id)};
    for (auto id : item.dependentIds)
    argv.push_back(std::to_string(id));
    break;
    case NotificationItem::Type::StepFinished:
    argv = {"hydra-notify", "step-finished", std::to_string(item.id), std::to_string(item.stepNr), item.logPath};
    break;
    };
    printMsg(lvlChatty, "Executing hydra-notify " + concatStringsSep(" ", argv));
    execvp("hydra-notify", (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast
    throw SysError("cannot start hydra-notify");
    });
    int res = pid.wait();
    if (!statusOk(res))
    throw Error("notification about build %d failed: %s", item.id, statusToString(res));
    auto now2 = std::chrono::steady_clock::now();
    if (item.type == NotificationItem::Type::BuildFinished) {
    auto conn(dbPool.get());
    pqxx::work txn(*conn);
    txn.parameterized
    ("update Builds set notificationPendingSince = null where id = $1")
    (item.id)
    .exec();
    txn.commit();
    }
    [11.1383]
    [7.771]
    txn.exec(fmt("notify build_started, '%s'", buildId));
    }
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 461
    [7.772][11.178:325](),[11.178][11.178:325]()
    nrNotificationTimeMs += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
    nrNotificationsDone++;
  • replacement in src/hydra-queue-runner/hydra-queue-runner.cc at line 462
    [11.326][11.2506:2545](),[11.2506][11.2506:2545](),[11.2545][11.327:364](),[11.364][11.2545:2662](),[11.2545][11.2545:2662]()
    } catch (std::exception & e) {
    nrNotificationsFailed++;
    printMsg(lvlError, format("notification sender: %1%") % e.what());
    sleep(5);
    }
    }
    [11.326]
    [11.2662]
    void State::notifyBuildFinished(pqxx::work & txn, BuildID buildId,
    const std::vector<BuildID> & dependentIds)
    {
    auto payload = fmt("%d ", buildId);
    for (auto & d : dependentIds)
    payload += fmt("%d ", d);
    // FIXME: apparently parameterized() doesn't support NOTIFY.
    txn.exec(fmt("notify build_finished, '%s'", payload));
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 541
    [11.1487][11.365:938]()
    root.attr("nrNotificationsDone", nrNotificationsDone);
    root.attr("nrNotificationsFailed", nrNotificationsFailed);
    root.attr("nrNotificationsInProgress", nrNotificationsInProgress);
    root.attr("nrNotificationsPending", notificationSenderQueue.lock()->size());
    root.attr("nrNotificationTimeMs", nrNotificationTimeMs);
    uint64_t nrNotificationsTotal = nrNotificationsDone + nrNotificationsFailed;
    root.attr("nrNotificationTimeAvgMs", nrNotificationsTotal == 0 ? 0.0 : (float) nrNotificationTimeMs / nrNotificationsTotal);
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 787
    [11.92][11.398:399](),[11.3153][11.2722:2763](),[11.2763][6.0:221]()
    /* Idem for notification sending. */
    auto maxConcurrentNotifications = config->getIntOption("max-concurrent-notifications", 2);
    for (uint64_t i = 0; i < maxConcurrentNotifications; ++i)
    std::thread(&State::notificationSender, this).detach();
  • edit in src/hydra-queue-runner/hydra-queue-runner.cc at line 788
    [11.2824][7.773:1280]()
    /* Enqueue notification items for builds that were finished
    previously, but for which we didn't manage to send
    notifications. */
    {
    auto conn(dbPool.get());
    pqxx::work txn(*conn);
    auto res = txn.parameterized("select id from Builds where notificationPendingSince > 0").exec();
    for (auto const & row : res) {
    auto id = row["id"].as<BuildID>();
    enqueueNotificationItem({NotificationItem::Type::BuildFinished, id});
    }
    }
  • edit in src/hydra-queue-runner/queue-monitor.cc at line 196
    [11.2304]
    [11.2304]
    notifyBuildFinished(txn, build->id, {});
  • edit in src/hydra-queue-runner/queue-monitor.cc at line 201
    [11.2424][3.0:90]()
    enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
  • edit in src/hydra-queue-runner/queue-monitor.cc at line 232
    [11.25116]
    [11.25116]
    notifyBuildFinished(txn, build->id, {});
  • edit in src/hydra-queue-runner/queue-monitor.cc at line 238
    [3.92][3.92:181](),[3.181][11.25183:25184](),[11.25183][11.25183:25184]()
    enqueueNotificationItem({NotificationItem::Type::BuildFinished, build->id});
  • edit in src/hydra-queue-runner/state.hh at line 350
    [11.910][11.939:1092](),[11.910][11.6084:6085](),[11.1092][11.6084:6085](),[11.6084][11.6084:6085](),[11.6223][11.6223:6498](),[11.6498][11.991:1058](),[11.1058][11.2843:2919](),[11.2919][11.1098:1191](),[11.1098][11.1098:1191](),[11.1191][11.2920:2976](),[11.2976][11.1191:1198](),[11.1191][11.1191:1198](),[11.1198][11.2097:2220](),[11.6569][11.2097:2220](),[11.2220][11.6691:6692](),[11.6691][11.6691:6692](),[11.6692][11.2977:3249]()
    counter nrNotificationsDone{0};
    counter nrNotificationsFailed{0};
    counter nrNotificationsInProgress{0};
    counter nrNotificationTimeMs{0};
    /* Notification sender work queue. FIXME: if hydra-queue-runner is
    killed before it has finished sending notifications about a
    build, then the notifications may be lost. It would be better
    to mark builds with pending notification in the database. */
    struct NotificationItem
    {
    enum class Type : char {
    BuildStarted,
    BuildFinished,
    StepFinished,
    };
    Type type;
    BuildID id;
    std::vector<BuildID> dependentIds;
    unsigned int stepNr;
    nix::Path logPath;
    };
    nix::Sync<std::queue<NotificationItem>> notificationSenderQueue;
    std::condition_variable notificationSenderWakeup;
    void enqueueNotificationItem(const NotificationItem && item)
    {
    {
    auto notificationSenderQueue_(notificationSenderQueue.lock());
    notificationSenderQueue_->emplace(item);
    }
    notificationSenderWakeup.notify_one();
    }
  • replacement in src/hydra-queue-runner/state.hh at line 510
    [11.8792][11.8792:8917]()
    /* Thread that asynchronously invokes hydra-notify to send build
    notifications. */
    void notificationSender();
    [11.8698]
    [11.8917]
    void notifyBuildStarted(pqxx::work & txn, BuildID buildId);
    void notifyBuildFinished(pqxx::work & txn, BuildID buildId,
    const std::vector<BuildID> & dependentIds);
  • edit in src/script/hydra-notify at line 8
    [11.5666]
    [11.3128]
    use IO::Select;
  • edit in src/script/hydra-notify at line 18
    [11.3326]
    [11.3326]
    my $dbh = $db->storage->dbh;
  • replacement in src/script/hydra-notify at line 21
    [11.3327][11.3662:3820]()
    my $cmd = shift @ARGV or die "Syntax: hydra-notify build-started BUILD | build-finished BUILD-ID [BUILD-IDs...] | step-finished BUILD-ID STEP-NR LOG-PATH\n";
    [11.3327]
    [11.3412]
    $dbh->do("listen build_started");
    $dbh->do("listen build_finished");
    $dbh->do("listen step_finished");
    sub buildStarted {
    my ($buildId) = @_;
  • replacement in src/script/hydra-notify at line 28
    [11.3413][11.1658:1791]()
    my $buildId = shift @ARGV or die;
    my $build = $db->resultset('Builds')->find($buildId)
    or die "build $buildId does not exist\n";
    [11.3413]
    [11.3821]
    my $build = $db->resultset('Builds')->find($buildId)
    or die "build $buildId does not exist\n";
    foreach my $plugin (@plugins) {
    eval { $plugin->buildStarted($build); };
    if ($@) {
    print STDERR "$plugin->buildStarted: $@\n";
    }
    }
    }
  • replacement in src/script/hydra-notify at line 39
    [11.3822][11.1791:1823](),[11.1791][11.1791:1823]()
    if ($cmd eq "build-finished") {
    [11.3822]
    [11.5667]
    sub buildFinished {
    my ($build, @deps) = @_;
  • edit in src/script/hydra-notify at line 47
    [11.5892]
    [11.3581]
  • replacement in src/script/hydra-notify at line 49
    [11.3601][11.3601:3630]()
    foreach my $id (@ARGV) {
    [11.3601]
    [11.3630]
    foreach my $id (@deps) {
  • edit in src/script/hydra-notify at line 61
    [11.4016]
    [11.4016]
    $build->update({ notificationpendingsince => undef });
  • replacement in src/script/hydra-notify at line 65
    [11.4019][4.265:473]()
    elsif ($cmd eq "build-queued") {
    foreach my $plugin (@plugins) {
    eval { $plugin->buildQueued($build); };
    if ($@) {
    print STDERR "$plugin->buildQueued: $@\n";
    }
    }
    }
    [11.4019]
    [4.473]
    sub stepFinished {
    my ($buildId, $stepNr, $logPath) = @_;
  • replacement in src/script/hydra-notify at line 68
    [4.474][11.4019:4230](),[11.4019][11.4019:4230]()
    elsif ($cmd eq "build-started") {
    foreach my $plugin (@plugins) {
    eval { $plugin->buildStarted($build); };
    if ($@) {
    print STDERR "$plugin->buildStarted: $@\n";
    }
    }
    }
    [4.474]
    [11.4230]
    my $build = $db->resultset('Builds')->find($buildId)
    or die "build $buildId does not exist\n";
  • edit in src/script/hydra-notify at line 71
    [11.4231][11.4231:4265](),[11.4265][9.0:59]()
    elsif ($cmd eq "step-finished") {
    die if scalar @ARGV < 2;
    my $stepNr = shift @ARGV;
  • replacement in src/script/hydra-notify at line 73
    [11.4411][9.60:131]()
    my $logPath = shift @ARGV;
    $logPath = undef if $logPath eq "";
    [11.4411]
    [11.4449]
    $logPath = undef if $logPath eq "-";
  • replacement in src/script/hydra-notify at line 84
    [11.3833][11.3833:3877]()
    else {
    die "unknown action ‘$cmd’";
    [11.3833]
    [11.3877]
    # Process builds that finished while hydra-notify wasn't running.
    for my $build ($db->resultset('Builds')->search(
    { notificationpendingsince => { '!=', undef } }))
    {
    my $buildId = $build->id;
    print STDERR "sending notifications for build ${\$buildId}...\n";
    buildFinished($build);
    }
    # Process incoming notifications.
    my $fd = $dbh->func("getfd");
    my $sel = IO::Select->new($fd);
    while (1) {
    $sel->can_read;
    my $notify = $dbh->func("pg_notifies");
    next if !$notify;
    my ($channelName, $pid, $payload) = @$notify;
    #print STDERR "got '$channelName' from $pid: $payload\n";
    my @payload = split / /, $payload;
    eval {
    if ($channelName eq "build_started") {
    buildStarted(int($payload[0]));
    } elsif ($channelName eq "build_finished") {
    my $buildId = int($payload[0]);
    my $build = $db->resultset('Builds')->find($buildId)
    or die "build $buildId does not exist\n";
    buildFinished($build, @payload[1..$#payload]);
    } elsif ($channelName eq "step_finished") {
    stepFinished(int($payload[0]), int($payload[1]));
    }
    };
    if ($@) {
    print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
    }