SWXGVPJNE35OLCZD7HOZOJLZZPTWPOKQ6W56Y5EM2TZMNBZJESEQC
JXJPLS22LTPDVUEM6AELNRQOKFUXCO36LRULKOMMTSL5ONCYO3AQC
BJJLHGFCE6RDMBF3HEEJ4DB6VOLTNA72UA5PWTMFJHVATEKV4EDAC
3ELC3W2DNZLEMIIOGJ4VPWSYDPVTVP5SXRDCAGBKVAW66DBJWNIAC
D5QIOJGPKQJIYBUCSC3MFJ3TXLPNZ2XMI37GXMFRVRFWWR2VMTFAC
P4SME2BCHKBETP4SFPOWOJM23K57VRFX3RAS634CQKTASXOXYRKQC
I7DFJWL64QNNEZVTYQYJO2YI6NYKMWXXBV2TG6G3YAVID5ANPZ2AC
IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC
32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC
FN3QFV6VB2BPYLKNYBUCV7HTSXCLKKLBD43TSEQUZNNAJVQACDZQC
T3OHZDYPABJBSN2G2BMQ47RL5PI5WVMAYQJVPT6TCPEFVGRLPEFQC
2JJP76737U2JWJWQ6UDFEAQCGWRAQH46HC6OCIKWMB5QYRXF6DQQC
package Hydra::Task;
use strict;
use warnings;
sub new {
my ($self, $event, $plugin_name) = @_;
return bless {
"event" => $event,
"plugin_name" => $plugin_name,
}, $self;
}
1;
package Hydra::TaskDispatcher;
use strict;
use warnings;
use Hydra::Task;
use Time::HiRes qw( gettimeofday tv_interval );
=head1 Hydra::TaskDispatcher
Excecute many plugins with Hydra::Event as its input.
The TaskDispatcher is responsible for dealing with fanout
from one incoming Event being executed across many plugins,
or one Event being executed against a single plugin by first
wrapping it in a Task.
Its execution model is based on creating a Hydra::Task for
each plugin's execution. The task represents the name of
the plugin to run and the Event to process.
=cut
=head2 new
Arguments:
=over 1
=item C<$dbh>
L<DBI::db> The database connection.
=back
=item C<$prometheus>
L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny
or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus.
=back
=item C<%plugins>
L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against.
=back
=cut
sub new {
my ($self, $db, $prometheus, $plugins) = @_;
$prometheus->declare(
"notify_plugin_executions",
type => "counter",
help => "Number of times each plugin has been called by channel."
);
$prometheus->declare(
"notify_plugin_runtime",
type => "histogram",
help => "Number of seconds spent executing each plugin by channel."
);
$prometheus->declare(
"notify_plugin_success",
type => "counter",
help => "Number of successful executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_error",
type => "counter",
help => "Number of failed executions of this plugin on this channel."
);
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
my $obj = bless {
"db" => $db,
"prometheus" => $prometheus,
"plugins_by_name" => \%plugins_by_name,
}, $self;
}
=head2 dispatch_event
Execute each configured plugin against the provided L<Hydra::Event>.
Arguments:
=over 1
=item C<$event>
L<Hydra::Event> the event, usually from L<Hydra::PostgresListener>.
=back
=cut
sub dispatch_event {
my ($self, $event) = @_;
foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) {
my $task = Hydra::Task->new($event, $plugin_name);
$self->dispatch_task($task);
}
}
=head2 dispatch_task
Execute a specifi plugin against the provided L<Hydra::Task>.
The Task includes information about what plugin should be executed.
If the provided plugin does not exist, an error logged is logged and the
function returns falsey.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task, usually from L<Hydra::Shema::Result::TaskRetries>.
=back
=cut
sub dispatch_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
my $event_labels = {
channel => $channel_name,
plugin => $plugin_name,
};
my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
if (!defined($plugin)) {
$self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels);
print STDERR "No plugin named $plugin_name\n";
return 0;
}
$self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
eval {
my $start_time = [gettimeofday()];
$task->{"event"}->execute($self->{"db"}, $plugin);
$self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
$self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
1;
} or do {
$self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
print STDERR "error running $channel_name hooks: $@\n";
return 0;
}
}
1;
$prom->declare(
"notify_plugin_executions",
type => "counter",
help => "Number of times each plugin has been called by channel."
);
$prom->declare(
"notify_plugin_runtime",
type => "histogram",
help => "Number of seconds spent executing each plugin by channel."
);
$prom->declare(
"notify_plugin_success",
type => "counter",
help => "Number of successful executions of this plugin on this channel."
);
sub runPluginsForEvent {
my ($event) = @_;
my $channelName = $event->{'channel_name'};
foreach my $plugin (@plugins) {
$prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin });
eval {
my $startTime = [gettimeofday()];
$event->execute($db, $plugin);
$prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin });
$prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin });
1;
} or do {
$prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin });
print STDERR "error running $event->{'channel_name'} hooks: $@\n";
}
}
}
use strict;
use warnings;
use Setup;
use Hydra::TaskDispatcher;
use Prometheus::Tiny::Shared;
use Test2::V0;
use Test2::Tools::Mock qw(mock_obj);
my $db = "bogus db";
my $prometheus = Prometheus::Tiny::Shared->new;
sub make_noop_plugin {
my ($name) = @_;
my $plugin = {
"name" => $name,
};
my $mock_plugin = mock_obj $plugin => ();
return $mock_plugin;
}
sub make_fake_event {
my ($channel_name) = @_;
my $event = {
channel_name => $channel_name,
called_with => [],
};
my $mock_event = mock_obj $event => (
add => [
"execute" => sub {
my ($self, $db, $plugin) = @_;
push @{$self->{"called_with"}}, $plugin;
}
]
);
return $mock_event;
}
sub make_failing_event {
my ($channel_name) = @_;
my $event = {
channel_name => $channel_name,
called_with => [],
};
my $mock_event = mock_obj $event => (
add => [
"execute" => sub {
my ($self, $db, $plugin) = @_;
push @{$self->{"called_with"}}, $plugin;
die "Failing plugin."
}
]
);
return $mock_event;
}
subtest "dispatch_event" => sub {
subtest "every plugin gets called once, even if it fails all of them." => sub {
my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_failing_event("bogus-channel");
$dispatcher->dispatch_event($event);
is(@{$event->{"called_with"}}, 2, "Both plugins should be called");
my @expected_names = [ "bogus-1", "bogus-2" ];
my @actual_names = sort([
$event->{"called_with"}[0]->name,
$event->{"called_with"}[1]->name
]);
is(
@actual_names,
@expected_names,
"Both plugins should be executed, but not in any particular order."
);
};
};
subtest "dispatch_task" => sub {
subtest "every plugin gets called once" => sub {
my $bogus_plugin = make_noop_plugin("bogus-1");
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_fake_event("bogus-channel");
my $task = Hydra::Task->new($event, ref $bogus_plugin);
is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy.");
is(@{$event->{"called_with"}}, 1, "Just one plugin should be called");
is(
$event->{"called_with"}[0]->name,
"bogus-1",
"Just bogus-1 should be executed."
);
};
subtest "a task with an invalid plugin is not fatal" => sub {
my $bogus_plugin = make_noop_plugin("bogus-1");
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_fake_event("bogus-channel");
my $task = Hydra::Task->new($event, "this-plugin-does-not-exist");
is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey.");
is(@{$event->{"called_with"}}, 0, "No plugins are called");
};
};
done_testing;