hydra-notify: extract runPluginsForEvent to a TaskDispatcher

[?]
Aug 26, 2021, 7:39 PM
SWXGVPJNE35OLCZD7HOZOJLZZPTWPOKQ6W56Y5EM2TZMNBZJESEQC

Dependencies

  • [2] JXJPLS22 hydra-notify: properly call new_event
  • [3] BJJLHGFC hydra-notify: pre-declare metrics
  • [4] 3ELC3W2D hydra-notify: support sending diagnostic dumps to STDERR on request
  • [5] T3OHZDYP hydra-notify: move BuildFinished processing to an Event
  • [6] 4H6FVIWG hydra-notify: make the prometheus endpoint configurable, default-off
  • [7] FN3QFV6V hydra-notify: Create a helper for running each plugin on an event
  • [8] IE2PRAQU hydra-queue-runner: Send build notifications
  • [9] 4ZCUCACY hydra-notify: Fix processing notifications
  • [10] OPSWSU4L hydra-notify: move BuildStarted processing to an Event
  • [11] P4SME2BC Abstract over postgres' LISTEN/NOTIFY
  • [12] IZVNPQAS fixup! hydra-notify: pre-declare metrics
  • [13] 32KJOERM Turn hydra-notify into a daemon
  • [14] CQZQE32V Improve handling of Perl's block eval errors
  • [15] 2HCWJE5W hydra-notify: fixup printing of build IDs
  • [16] 3AKZKWCR RunCommand: Test
  • [17] GE7LFZHQ hydra-notify: move buildFinished query in to the function impl
  • [18] I7DFJWL6 hydra-notify: initial scratch take of prometheus events
  • [*] D5QIOJGP * Move everything up one directory.
  • [*] 2JJP7673 tests: move to t, allow `yath test` from root

Change contents

  • file addition: Task.pm (----------)
    [20.53]
    package Hydra::Task;
    use strict;
    use warnings;
    sub new {
    my ($self, $event, $plugin_name) = @_;
    return bless {
    "event" => $event,
    "plugin_name" => $plugin_name,
    }, $self;
    }
    1;
  • file addition: TaskDispatcher.pm (----------)
    [20.53]
    package Hydra::TaskDispatcher;
    use strict;
    use warnings;
    use Hydra::Task;
    use Time::HiRes qw( gettimeofday tv_interval );
    =head1 Hydra::TaskDispatcher
    Excecute many plugins with Hydra::Event as its input.
    The TaskDispatcher is responsible for dealing with fanout
    from one incoming Event being executed across many plugins,
    or one Event being executed against a single plugin by first
    wrapping it in a Task.
    Its execution model is based on creating a Hydra::Task for
    each plugin's execution. The task represents the name of
    the plugin to run and the Event to process.
    =cut
    =head2 new
    Arguments:
    =over 1
    =item C<$dbh>
    L<DBI::db> The database connection.
    =back
    =item C<$prometheus>
    L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny
    or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus.
    =back
    =item C<%plugins>
    L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against.
    =back
    =cut
    sub new {
    my ($self, $db, $prometheus, $plugins) = @_;
    $prometheus->declare(
    "notify_plugin_executions",
    type => "counter",
    help => "Number of times each plugin has been called by channel."
    );
    $prometheus->declare(
    "notify_plugin_runtime",
    type => "histogram",
    help => "Number of seconds spent executing each plugin by channel."
    );
    $prometheus->declare(
    "notify_plugin_success",
    type => "counter",
    help => "Number of successful executions of this plugin on this channel."
    );
    $prometheus->declare(
    "notify_plugin_error",
    type => "counter",
    help => "Number of failed executions of this plugin on this channel."
    );
    my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
    my $obj = bless {
    "db" => $db,
    "prometheus" => $prometheus,
    "plugins_by_name" => \%plugins_by_name,
    }, $self;
    }
    =head2 dispatch_event
    Execute each configured plugin against the provided L<Hydra::Event>.
    Arguments:
    =over 1
    =item C<$event>
    L<Hydra::Event> the event, usually from L<Hydra::PostgresListener>.
    =back
    =cut
    sub dispatch_event {
    my ($self, $event) = @_;
    foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) {
    my $task = Hydra::Task->new($event, $plugin_name);
    $self->dispatch_task($task);
    }
    }
    =head2 dispatch_task
    Execute a specifi plugin against the provided L<Hydra::Task>.
    The Task includes information about what plugin should be executed.
    If the provided plugin does not exist, an error logged is logged and the
    function returns falsey.
    Arguments:
    =over 1
    =item C<$task>
    L<Hydra::Task> the task, usually from L<Hydra::Shema::Result::TaskRetries>.
    =back
    =cut
    sub dispatch_task {
    my ($self, $task) = @_;
    my $channel_name = $task->{"event"}->{'channel_name'};
    my $plugin_name = $task->{"plugin_name"};
    my $event_labels = {
    channel => $channel_name,
    plugin => $plugin_name,
    };
    my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
    if (!defined($plugin)) {
    $self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels);
    print STDERR "No plugin named $plugin_name\n";
    return 0;
    }
    $self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
    eval {
    my $start_time = [gettimeofday()];
    $task->{"event"}->execute($self->{"db"}, $plugin);
    $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
    $self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
    1;
    } or do {
    $self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
    print STDERR "error running $channel_name hooks: $@\n";
    return 0;
    }
    }
    1;
  • edit in src/script/hydra-notify at line 13
    [5.1752]
    [5.62]
    use Hydra::TaskDispatcher;
  • edit in src/script/hydra-notify at line 16
    [5.119][5.119:167]()
    use Time::HiRes qw( gettimeofday tv_interval );
  • edit in src/script/hydra-notify at line 28
    [3.188][3.188:626]()
    $prom->declare(
    "notify_plugin_executions",
    type => "counter",
    help => "Number of times each plugin has been called by channel."
    );
    $prom->declare(
    "notify_plugin_runtime",
    type => "histogram",
    help => "Number of seconds spent executing each plugin by channel."
    );
    $prom->declare(
    "notify_plugin_success",
    type => "counter",
    help => "Number of successful executions of this plugin on this channel."
    );
  • edit in src/script/hydra-notify at line 29
    [3.642][3.642:785]()
    "notify_plugin_error",
    type => "counter",
    help => "Number of failed executions of this plugin on this channel."
    );
    $prom->declare(
  • edit in src/script/hydra-notify at line 76
    [5.3326]
    [5.1457]
    my $task_dispatcher = Hydra::TaskDispatcher->new($db, $prom, [@plugins]);
  • edit in src/script/hydra-notify at line 85
    [4.482][5.0:48](),[5.1922][5.0:48](),[5.48][5.475:524](),[5.48][5.1591:1592](),[5.524][5.1591:1592](),[5.1922][5.1591:1592](),[5.1591][5.1591:1592](),[5.1592][5.49:85](),[5.85][5.525:625](),[5.625][5.85:100](),[5.85][5.85:100](),[5.100][5.626:672](),[5.672][5.100:143](),[5.100][5.100:143](),[5.143][5.673:915](),[5.915][5.143:176](),[5.143][5.143:176](),[5.176][5.916:1015](),[5.1015][5.176:273](),[5.176][5.176:273]()
    sub runPluginsForEvent {
    my ($event) = @_;
    my $channelName = $event->{'channel_name'};
    foreach my $plugin (@plugins) {
    $prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin });
    eval {
    my $startTime = [gettimeofday()];
    $event->execute($db, $plugin);
    $prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin });
    $prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin });
    1;
    } or do {
    $prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin });
    print STDERR "error running $event->{'channel_name'} hooks: $@\n";
    }
    }
    }
  • replacement in src/script/hydra-notify at line 92
    [5.1460][5.1460:1555]()
    my $event = Hydra::Event::BuildFinished->new($build->id);
    runPluginsForEvent($event);
    [5.1460]
    [5.2593]
    my $event = Hydra::Event->new_event("build_finished", $build->id);
    $task_dispatcher->dispatch_event($event);
  • edit in src/script/hydra-notify at line 95
    [5.2595][5.2595:2596]()
  • replacement in src/script/hydra-notify at line 115
    [2.86][5.1642:1682](),[5.1642][5.1642:1682]()
    runPluginsForEvent($event);
    [2.86]
    [5.1682]
    $task_dispatcher->dispatch_event($event);
  • file addition: TaskDispatcher.t (----------)
    [21.697]
    use strict;
    use warnings;
    use Setup;
    use Hydra::TaskDispatcher;
    use Prometheus::Tiny::Shared;
    use Test2::V0;
    use Test2::Tools::Mock qw(mock_obj);
    my $db = "bogus db";
    my $prometheus = Prometheus::Tiny::Shared->new;
    sub make_noop_plugin {
    my ($name) = @_;
    my $plugin = {
    "name" => $name,
    };
    my $mock_plugin = mock_obj $plugin => ();
    return $mock_plugin;
    }
    sub make_fake_event {
    my ($channel_name) = @_;
    my $event = {
    channel_name => $channel_name,
    called_with => [],
    };
    my $mock_event = mock_obj $event => (
    add => [
    "execute" => sub {
    my ($self, $db, $plugin) = @_;
    push @{$self->{"called_with"}}, $plugin;
    }
    ]
    );
    return $mock_event;
    }
    sub make_failing_event {
    my ($channel_name) = @_;
    my $event = {
    channel_name => $channel_name,
    called_with => [],
    };
    my $mock_event = mock_obj $event => (
    add => [
    "execute" => sub {
    my ($self, $db, $plugin) = @_;
    push @{$self->{"called_with"}}, $plugin;
    die "Failing plugin."
    }
    ]
    );
    return $mock_event;
    }
    subtest "dispatch_event" => sub {
    subtest "every plugin gets called once, even if it fails all of them." => sub {
    my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")];
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
    my $event = make_failing_event("bogus-channel");
    $dispatcher->dispatch_event($event);
    is(@{$event->{"called_with"}}, 2, "Both plugins should be called");
    my @expected_names = [ "bogus-1", "bogus-2" ];
    my @actual_names = sort([
    $event->{"called_with"}[0]->name,
    $event->{"called_with"}[1]->name
    ]);
    is(
    @actual_names,
    @expected_names,
    "Both plugins should be executed, but not in any particular order."
    );
    };
    };
    subtest "dispatch_task" => sub {
    subtest "every plugin gets called once" => sub {
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
    my $event = make_fake_event("bogus-channel");
    my $task = Hydra::Task->new($event, ref $bogus_plugin);
    is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy.");
    is(@{$event->{"called_with"}}, 1, "Just one plugin should be called");
    is(
    $event->{"called_with"}[0]->name,
    "bogus-1",
    "Just bogus-1 should be executed."
    );
    };
    subtest "a task with an invalid plugin is not fatal" => sub {
    my $bogus_plugin = make_noop_plugin("bogus-1");
    my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
    my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
    my $event = make_fake_event("bogus-channel");
    my $task = Hydra::Task->new($event, "this-plugin-does-not-exist");
    is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey.");
    is(@{$event->{"called_with"}}, 0, "No plugins are called");
    };
    };
    done_testing;