I7Q3M7V32AUNVHKPCRAMSOIZXSTTOOPZ2PR3A2UF4J7KRDGIWWOQC
The dispatcher's behavior is slightly different based on
if the Task has an associated record:
=over 1
=item *
If a task succeeds and there is no record, the Dispatcher
assumes there is no further accounting of the task to be
done.
=item *
If a task succeeds and there is a record, the Dispatcher
calls C<delete> on the record.
=item *
If a task fails and there is no record, the Dispatcher
calls C<$store_task> with the Task as its only argument.
It is the C<$store_task>'s responsibility to store the
task in some way for retrying.
=item *
If a task fails and there is a record, the Dispatcher
calls C<requeue> on the record.
=back
$prometheus->declare(
"notify_plugin_retry_success",
type => "counter",
help => "Number of successful executions of retried tasks."
);
$prometheus->declare(
"notify_plugin_drop",
type => "counter",
help => "Number of tasks that have been dropped after too many retries."
);
$prometheus->declare(
"notify_plugin_requeue",
type => "counter",
help => "Number of tasks that have been requeued after a failure."
);
}
}
=head2 success
Mark a task's execution as successful.
If the task has an associated record, the record is deleted.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task to mark as successful.
=back
=cut
sub success {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
$self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);
$task->{"record"}->delete();
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
if ($task->{"record"}->{"attempts"} > 100) {
$self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);
$task->{"record"}->delete();
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$task->{"record"}->requeue();
}
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$self->{"store_task"}($task);
}
}
=head2 prom_labels_for_task
Given a specific task, return a hash of standard labels to record with
Prometheus.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task to return labels for.
=back
=cut
sub prom_labels_for_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
return {
channel => $channel_name,
plugin => $plugin_name,
};
}
}
sub make_fake_record {
my %attrs = @_;
my $record = {
"attempts" => $attrs{"attempts"} || 0,
"requeued" => 0,
"deleted" => 0
};
my $mock_record = mock_obj $record => (
add => [
"delete" => sub {
my ($self, $db, $plugin) = @_;
$self->{"deleted"} = 1;
},
"requeue" => sub {
my ($self, $db, $plugin) = @_;
$self->{"requeued"} = 1;
}
]
);
return $mock_record;
};
subtest "a failed run without a record saves the task for later" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => undef,
};
my $save_hook_called = 0;
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin],
sub {
$save_hook_called = 1;
}
);
$dispatcher->dispatch_task($task);
is($save_hook_called, 1, "The record was requeued with the store hook.");
};
subtest "a successful run from a record deletes the record" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_fake_event("success-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};
subtest "a failed run from a record re-queues the task" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"requeued"}, 1, "The record was requeued.");
subtest "a failed run from a record with a lot of attempts deletes the task" => sub {
my $db = "bogus db";
my $record = make_fake_record(attempts => 101);
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};