#! /usr/bin/env perl use strict; use warnings; use utf8; use Getopt::Long; use Time::HiRes qw( gettimeofday tv_interval ); use HTTP::Server::PSGI; use Hydra::Event; use Hydra::Event::BuildFinished; use Hydra::Helper::AddBuilds; use Hydra::Helper::Nix; use Hydra::Plugin; use Hydra::PostgresListener; use Hydra::TaskDispatcher; use Parallel::ForkManager; use Prometheus::Tiny::Shared; STDERR->autoflush(1); STDOUT->autoflush(1); binmode STDERR, ":encoding(utf8)"; my $config = getHydraConfig(); my $prom = Prometheus::Tiny::Shared->new; # Note: It is very important to pre-declare any metrics before using them. # Add a new declaration for any new metrics you create. Metrics which are # not pre-declared disappear when their value is null. See: # https://metacpan.org/pod/Prometheus::Tiny#declare $prom->declare( "event_loop_iterations", type => "counter", help => "Number of iterations through the event loop. Incremented at the start of the event loop." ); $prom->declare( "event_received", type => "counter", help => "Timestamp of the last time a new event was received." ); $prom->declare( "notify_event", type => "counter", help => "Number of events received on the given channel." ); $prom->declare( "notify_event_error", type => "counter", help => "Number of events received that were unprocessable by channel." ); $prom->declare( "notify_event_runtime", type => "histogram", help => "Number of seconds spent executing events by channel." ); my $promCfg = Hydra::Helper::Nix::getHydraNotifyPrometheusConfig($config); if (defined($promCfg)) { print STDERR "Starting the Prometheus exporter, listening on http://${\$promCfg->{'listen_address'}}:${\$promCfg->{'port'}}/metrics.\n"; my $fork_manager = Parallel::ForkManager->new(1); $fork_manager->start_child("metrics_exporter", sub { my $server = HTTP::Server::PSGI->new( host => $promCfg->{"listen_address"}, port => $promCfg->{"port"}, timeout => 1, ); $server->run($prom->psgi); }); } else { print STDERR "Not starting the hydra-notify Prometheus exporter.\n"; } my $queued_only; GetOptions( "queued-only" => \$queued_only ) or exit 1; my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my $task_dispatcher = Hydra::TaskDispatcher->new( $db, $prom, [@plugins], sub { my ($task) = @_; $db->resultset("TaskRetries")->save_task($task); } ); my $dbh = $db->storage->dbh; my $listener = Hydra::PostgresListener->new($dbh); $listener->subscribe("build_queued"); $listener->subscribe("build_started"); $listener->subscribe("build_finished"); $listener->subscribe("step_finished"); $listener->subscribe("hydra_notify_dump_metrics"); # Process builds that finished while hydra-notify wasn't running. for my $build ($db->resultset('Builds')->search( { notificationpendingsince => { '!=', undef } })) { print STDERR "sending notifications for build ${\$build->id}...\n"; my $event = Hydra::Event->new_event("build_finished", $build->id); $task_dispatcher->dispatch_event($event); } my $taskretries = $db->resultset('TaskRetries'); # Process incoming notifications. while (!$queued_only) { $prom->inc("event_loop_iterations"); my $messages = $listener->block_for_messages($taskretries->get_seconds_to_next_retry()); while (my $message = $messages->()) { my $start_time = [gettimeofday()]; $prom->set("event_received", time()); my $channelName = $message->{"channel"}; my $pid = $message->{"pid"}; my $payload = $message->{"payload"}; $prom->inc("notify_event", { channel => $channelName }); if ($channelName eq "hydra_notify_dump_metrics") { print STDERR "Dumping prometheus metrics:\n${\$prom->format}\n"; next; } eval { my $event = Hydra::Event->new_event($channelName, $message->{"payload"}); $task_dispatcher->dispatch_event($event); 1; } or do { $prom->inc("notify_event_error", { channel => $channelName }); print STDERR "error processing message '$payload' on channel '$channelName': $@\n"; }; $prom->histogram_observe("notify_event_runtime", tv_interval($start_time), { channel => $channelName }); } my $task = $taskretries->get_retryable_task(); if (defined($task)) { $task_dispatcher->dispatch_task($task); } }