package Hydra::TaskDispatcher;
use strict;
use warnings;
use Hydra::Task;
use Time::HiRes qw( gettimeofday tv_interval );
sub new {
my ($self, $db, $prometheus, $plugins, $store_task) = @_;
$prometheus->declare(
"notify_plugin_executions",
type => "counter",
help => "Number of times each plugin has been called by channel."
);
$prometheus->declare(
"notify_plugin_runtime",
type => "histogram",
help => "Number of seconds spent executing each plugin by channel."
);
$prometheus->declare(
"notify_plugin_success",
type => "counter",
help => "Number of successful executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_error",
type => "counter",
help => "Number of failed executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_retry_success",
type => "counter",
help => "Number of successful executions of retried tasks."
);
$prometheus->declare(
"notify_plugin_drop",
type => "counter",
help => "Number of tasks that have been dropped after too many retries."
);
$prometheus->declare(
"notify_plugin_requeue",
type => "counter",
help => "Number of tasks that have been requeued after a failure."
);
$prometheus->declare(
"notify_plugin_no_such_plugin",
type => "counter",
help => "Number of tasks that have not been processed because the plugin does not exist."
);
$prometheus->declare(
"notify_plugin_not_interested",
type => "counter",
help => "Number of tasks that have not been processed because the plugin was not interested in the event."
);
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
if (!defined($store_task)) {
$store_task = sub {};
}
my $obj = bless {
"db" => $db,
"prometheus" => $prometheus,
"plugins_by_name" => \%plugins_by_name,
"store_task" => $store_task,
}, $self;
}
sub dispatch_event {
my ($self, $event) = @_;
foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) {
my $task = Hydra::Task->new($event, $plugin_name);
$self->dispatch_task($task);
}
}
sub dispatch_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
my $event_labels = $self->prom_labels_for_task($task);
my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
if (!defined($plugin)) {
$self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels);
print STDERR "No plugin named $plugin_name\n";
return 0;
}
if (!$task->{"event"}->interestedIn($plugin)) {
$self->{"prometheus"}->inc("notify_plugin_not_interested", $event_labels);
return 0;
}
$self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
eval {
my $start_time = [gettimeofday()];
$task->{"event"}->execute($self->{"db"}, $plugin);
$self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
$self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
$self->success($task);
1;
} or do {
$self->failure($task);
$self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
print STDERR "error running $channel_name hooks: $@\n";
return 0;
}
}
sub success {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
$self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);
$task->{"record"}->delete();
}
}
sub failure {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
if ($task->{"record"}->attempts > 100) {
$self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);
$task->{"record"}->delete();
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$task->{"record"}->requeue();
}
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$self->{"store_task"}($task);
}
}
sub prom_labels_for_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
return {
channel => $channel_name,
plugin => $plugin_name,
};
}
1;