TaskDispatcher: Support re-queueing tasks, and dropping tasks after 100 failures.

[?]
Aug 26, 2021, 9:00 PM
I7Q3M7V32AUNVHKPCRAMSOIZXSTTOOPZ2PR3A2UF4J7KRDGIWWOQC

Dependencies

  • [2] SWXGVPJN hydra-notify: extract runPluginsForEvent to a TaskDispatcher

Change contents

  • edit in src/lib/Hydra/TaskDispatcher.pm at line 20
    [2.817]
    [2.817]
    The dispatcher's behavior is slightly different based on
    if the Task has an associated record:
    =over 1
    =item *
    If a task succeeds and there is no record, the Dispatcher
    assumes there is no further accounting of the task to be
    done.
    =item *
    If a task succeeds and there is a record, the Dispatcher
    calls C<delete> on the record.
    =item *
    If a task fails and there is no record, the Dispatcher
    calls C<$store_task> with the Task as its only argument.
    It is the C<$store_task>'s responsibility to store the
    task in some way for retrying.
    =item *
    If a task fails and there is a record, the Dispatcher
    calls C<requeue> on the record.
    =back
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 58
    [2.908][2.908:915]()
    =back
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 61
    [2.1075][2.1075:1082]()
    =back
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 64
    [2.1179]
    [2.1179]
    =item C<$store_task> (Optional)
    A sub to call when storing a task for the first time. This sub is called
    after a L<Hydra::Task>'s execution fails without an associated record.
    The sub is called with the failing task, and is responsible for storing
    the task for another attempt.
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 71
    [2.1180]
    [2.1180]
    If no C<$store_task> sub is provided, all failed events are dropped.
  • replacement in src/lib/Hydra/TaskDispatcher.pm at line 78
    [2.1203][2.1203:1252]()
    my ($self, $db, $prometheus, $plugins) = @_;
    [2.1203]
    [2.1252]
    my ($self, $db, $prometheus, $plugins, $store_task) = @_;
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 100
    [2.1938]
    [2.1938]
    $prometheus->declare(
    "notify_plugin_retry_success",
    type => "counter",
    help => "Number of successful executions of retried tasks."
    );
    $prometheus->declare(
    "notify_plugin_drop",
    type => "counter",
    help => "Number of tasks that have been dropped after too many retries."
    );
    $prometheus->declare(
    "notify_plugin_requeue",
    type => "counter",
    help => "Number of tasks that have been requeued after a failure."
    );
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 118
    [2.2000]
    [2.2000]
    if (!defined($store_task)) {
    $store_task = sub {};
    }
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 126
    [2.2128]
    [2.2128]
    "store_task" => $store_task,
  • replacement in src/lib/Hydra/TaskDispatcher.pm at line 178
    [2.3112][2.3112:3210]()
    my $event_labels = {
    channel => $channel_name,
    plugin => $plugin_name,
    };
    [2.3112]
    [2.3210]
    my $event_labels = $self->prom_labels_for_task($task);
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 196
    [2.3847]
    [2.3847]
    $self->success($task);
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 199
    [2.3872]
    [2.3872]
    $self->failure($task);
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 203
    [2.4028]
    [2.4028]
    }
    }
    =head2 success
    Mark a task's execution as successful.
    If the task has an associated record, the record is deleted.
    Arguments:
    =over 1
    =item C<$task>
    L<Hydra::Task> the task to mark as successful.
    =back
    =cut
    sub success {
    my ($self, $task) = @_;
    my $event_labels = $self->prom_labels_for_task($task);
    if (defined($task->{"record"})) {
    $self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);
    $task->{"record"}->delete();
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 233
    [2.4036]
    [2.4036]
    =head2 failure
    Mark a task's execution as failed.
    The task is requeued if it has been attempted fewer than than 100 times.
    Arguments:
    =over 1
    =item C<$task>
    L<Hydra::Task> the task to mark as successful.
    =back
    =cut
    sub failure {
    my ($self, $task) = @_;
  • edit in src/lib/Hydra/TaskDispatcher.pm at line 254
    [2.4037]
    [2.4037]
    my $event_labels = $self->prom_labels_for_task($task);
    if (defined($task->{"record"})) {
    if ($task->{"record"}->{"attempts"} > 100) {
    $self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);
    $task->{"record"}->delete();
    } else {
    $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
    $task->{"record"}->requeue();
    }
    } else {
    $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
    $self->{"store_task"}($task);
    }
    }
    =head2 prom_labels_for_task
    Given a specific task, return a hash of standard labels to record with
    Prometheus.
    Arguments:
    =over 1
    =item C<$task>
    L<Hydra::Task> the task to return labels for.
    =back
    =cut
    sub prom_labels_for_task {
    my ($self, $task) = @_;
    my $channel_name = $task->{"event"}->{'channel_name'};
    my $plugin_name = $task->{"plugin_name"};
    return {
    channel => $channel_name,
    plugin => $plugin_name,
    };
    }
  • edit in t/TaskDispatcher.t at line 61
    [2.5581]
    [2.5581]
    }
    sub make_fake_record {
    my %attrs = @_;
    my $record = {
    "attempts" => $attrs{"attempts"} || 0,
    "requeued" => 0,
    "deleted" => 0
    };
    my $mock_record = mock_obj $record => (
    add => [
    "delete" => sub {
    my ($self, $db, $plugin) = @_;
    $self->{"deleted"} = 1;
    },
    "requeue" => sub {
    my ($self, $db, $plugin) = @_;
    $self->{"requeued"} = 1;
    }
    ]
    );
    return $mock_record;
  • edit in t/TaskDispatcher.t at line 132
    [2.7147]
    [2.7147]
  • edit in t/TaskDispatcher.t at line 144
    [2.7709]
    [2.7709]
    };
    subtest "a failed run without a record saves the task for later" => sub {
    my $db = "bogus db";
    my $record = make_fake_record();
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my $task = {
    "event" => make_failing_event("fail-event"),
    "plugin_name" => ref $bogus_plugin,
    "record" => undef,
    };
    my $save_hook_called = 0;
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin],
    sub {
    $save_hook_called = 1;
    }
    );
    $dispatcher->dispatch_task($task);
    is($save_hook_called, 1, "The record was requeued with the store hook.");
    };
    subtest "a successful run from a record deletes the record" => sub {
    my $db = "bogus db";
    my $record = make_fake_record();
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my $task = {
    "event" => make_fake_event("success-event"),
    "plugin_name" => ref $bogus_plugin,
    "record" => $record,
    };
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
    $dispatcher->dispatch_task($task);
    is($record->{"deleted"}, 1, "The record was deleted.");
    };
    subtest "a failed run from a record re-queues the task" => sub {
    my $db = "bogus db";
    my $record = make_fake_record();
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my $task = {
    "event" => make_failing_event("fail-event"),
    "plugin_name" => ref $bogus_plugin,
    "record" => $record,
    };
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
    $dispatcher->dispatch_task($task);
    is($record->{"requeued"}, 1, "The record was requeued.");
  • edit in t/TaskDispatcher.t at line 201
    [2.7716]
    [2.7716]
    subtest "a failed run from a record with a lot of attempts deletes the task" => sub {
    my $db = "bogus db";
    my $record = make_fake_record(attempts => 101);
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my $task = {
    "event" => make_failing_event("fail-event"),
    "plugin_name" => ref $bogus_plugin,
    "record" => $record,
    };
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
    $dispatcher->dispatch_task($task);
    is($record->{"deleted"}, 1, "The record was deleted.");
    };