#! /usr/bin/env perl

use strict;
use warnings;
use utf8;
use Getopt::Long;
use Time::HiRes qw( gettimeofday tv_interval );
use HTTP::Server::PSGI;
use Hydra::Event;
use Hydra::Event::BuildFinished;
use Hydra::Helper::AddBuilds;
use Hydra::Helper::Nix;
use Hydra::Plugin;
use Hydra::PostgresListener;
use Hydra::TaskDispatcher;
use Parallel::ForkManager;
use Prometheus::Tiny::Shared;

STDERR->autoflush(1);
STDOUT->autoflush(1);
binmode STDERR, ":encoding(utf8)";

my $config = getHydraConfig();

my $prom = Prometheus::Tiny::Shared->new;
# Note: It is very important to pre-declare any metrics before using them.
# Add a new declaration for any new metrics you create. Metrics which are
# not pre-declared disappear when their value is null. See:
# https://metacpan.org/pod/Prometheus::Tiny#declare
$prom->declare(
    "event_loop_iterations",
    type => "counter",
    help => "Number of iterations through the event loop. Incremented at the start of the event loop."
);
$prom->declare(
    "event_received",
    type => "counter",
    help => "Timestamp of the last time a new event was received."
);
$prom->declare(
    "notify_event",
    type => "counter",
    help => "Number of events received on the given channel."
);
$prom->declare(
    "notify_event_error",
    type => "counter",
    help => "Number of events received that were unprocessable by channel."
);
$prom->declare(
    "notify_event_runtime",
    type => "histogram",
    help => "Number of seconds spent executing events by channel."
);

my $promCfg = Hydra::Helper::Nix::getHydraNotifyPrometheusConfig($config);
if (defined($promCfg)) {
    print STDERR "Starting the Prometheus exporter, listening on http://${\$promCfg->{'listen_address'}}:${\$promCfg->{'port'}}/metrics.\n";
    my $fork_manager = Parallel::ForkManager->new(1);
    $fork_manager->start_child("metrics_exporter", sub {
        my $server = HTTP::Server::PSGI->new(
            host => $promCfg->{"listen_address"},
            port => $promCfg->{"port"},
            timeout => 1,
        );

        $server->run($prom->psgi);
    });
} else {
    print STDERR "Not starting the hydra-notify Prometheus exporter.\n";
}

my $queued_only;

GetOptions(
    "queued-only" => \$queued_only
) or exit 1;


my $db = Hydra::Model::DB->new();

my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
my $task_dispatcher = Hydra::TaskDispatcher->new(
    $db,
    $prom,
    [@plugins],
    sub {
        my ($task) = @_;
        $db->resultset("TaskRetries")->save_task($task);
    }
);

my $dbh = $db->storage->dbh;

my $listener = Hydra::PostgresListener->new($dbh);
$listener->subscribe("build_queued");
$listener->subscribe("build_started");
$listener->subscribe("build_finished");
$listener->subscribe("step_finished");
$listener->subscribe("hydra_notify_dump_metrics");

# Process builds that finished while hydra-notify wasn't running.
for my $build ($db->resultset('Builds')->search(
                   { notificationpendingsince => { '!=', undef } }))
{
    print STDERR "sending notifications for build ${\$build->id}...\n";

    my $event = Hydra::Event->new_event("build_finished", $build->id);
    $task_dispatcher->dispatch_event($event);
}

my $taskretries = $db->resultset('TaskRetries');

# Process incoming notifications.
while (!$queued_only) {
    $prom->inc("event_loop_iterations");
    my $messages = $listener->block_for_messages($taskretries->get_seconds_to_next_retry());
    while (my $message = $messages->()) {
        my $start_time = [gettimeofday()];
        $prom->set("event_received", time());
        my $channelName = $message->{"channel"};
        my $pid = $message->{"pid"};
        my $payload = $message->{"payload"};

        $prom->inc("notify_event", { channel => $channelName });

        if ($channelName eq "hydra_notify_dump_metrics") {
            print STDERR "Dumping prometheus metrics:\n${\$prom->format}\n";
            next;
        }

        eval {
            my $event = Hydra::Event->new_event($channelName, $message->{"payload"});
            $task_dispatcher->dispatch_event($event);

            1;
        } or do {
            $prom->inc("notify_event_error", { channel => $channelName });
            print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
        };

        $prom->histogram_observe("notify_event_runtime", tv_interval($start_time), { channel => $channelName });
    }

    my $task = $taskretries->get_retryable_task();
    if (defined($task)) {
        $task_dispatcher->dispatch_task($task);
    }
}