Abstract over postgres' LISTEN/NOTIFY

[?]
Jun 17, 2021, 8:27 PM
P4SME2BCHKBETP4SFPOWOJM23K57VRFX3RAS634CQKTASXOXYRKQC

Dependencies

  • [2] 4ZCUCACY hydra-notify: Fix processing notifications
  • [3] 2JJP7673 tests: move to t, allow `yath test` from root
  • [4] 32KJOERM Turn hydra-notify into a daemon
  • [5] FCTX433O Add buildStarted plugin hook
  • [6] WYELTQMO Separate payload elements using \t
  • [7] 6WRGCITD Enable declarative projects.
  • [8] NTEDD7T4 Provide a plugin hook for when build steps finish
  • [9] 3AKZKWCR RunCommand: Test
  • [10] IE2PRAQU hydra-queue-runner: Send build notifications
  • [*] D5QIOJGP * Move everything up one directory.

Change contents

  • file addition: PostgresListener.pm (----------)
    [12.53]
    package Hydra::PostgresListener;
    use strict;
    use warnings;
    use IO::Select;
    =head1 Hydra::PostgresListener
    An abstraction around using Postgres' LISTEN / NOTIFY in an event loop.
    =cut
    =head2 new
    Arguments:
    =over 1
    =item C<$dbh>
    L<DBI::db> The database connection.
    =back
    =cut
    sub new {
    my ($self, $dbh) = @_;
    my $sel = IO::Select->new($dbh->func("getfd"));
    return bless {
    "dbh" => $dbh,
    "sel" => $sel,
    }, $self;
    }
    =head2 subscribe
    Subscribe to the named channel for messages
    Arguments:
    =over 1
    =item C<$channel>
    The channel name.
    =back
    =cut
    sub subscribe {
    my ($self, $channel) = @_;
    $channel = $self->{'dbh'}->quote_identifier($channel);
    $self->{'dbh'}->do("listen $channel");
    }
    =head2 block_for_messages
    Wait for messages to arrive within the specified timeout.
    Arguments:
    =over 1
    =item C<$timeout>
    The maximum number of seconds to wait for messages.
    Optional: if unspecified, block forever.
    =back
    Returns: a sub, call the sub repeatedly to get a message. The sub
    will return undef when there are no pending messages.
    Example:
    my $events = $listener->block_for_messages();
    while (my $message = $events->()) {
    ...
    }
    =cut
    sub block_for_messages {
    my ($self, $timeout) = @_;
    $self->{'sel'}->can_read($timeout);
    return sub {
    my $notify = $self->{'dbh'}->func("pg_notifies");
    if (defined($notify)) {
    my ($channelName, $pid, $payload) = @$notify;
    return {
    channel => $channelName,
    pid => $pid,
    payload => $payload,
    }
    } else {
    return undef
    }
    }
    }
    1;
  • edit in src/script/hydra-notify at line 6
    [4.3072]
    [4.3072]
    use Hydra::PostgresListener;
  • edit in src/script/hydra-notify at line 9
    [4.5666][4.1440:1456]()
    use IO::Select;
  • replacement in src/script/hydra-notify at line 29
    [4.3327][4.1488:1591]()
    $dbh->do("listen build_started");
    $dbh->do("listen build_finished");
    $dbh->do("listen step_finished");
    [4.3327]
    [4.1591]
    my $listener = Hydra::PostgresListener->new($dbh);
    $listener->subscribe("build_started");
    $listener->subscribe("build_finished");
    $listener->subscribe("step_finished");
  • edit in src/script/hydra-notify at line 116
    [4.2630][4.2630:2693]()
    my $fd = $dbh->func("getfd");
    my $sel = IO::Select->new($fd);
  • replacement in src/script/hydra-notify at line 117
    [4.125][4.2705:2725](),[4.2705][4.2705:2725](),[4.2791][4.2791:2792](),[4.2792][2.0:53]()
    $sel->can_read;
    while (my $notify = $dbh->func("pg_notifies")) {
    [4.125]
    [4.2904]
    my $messages = $listener->block_for_messages();
    while (my $message = $messages->()) {
  • replacement in src/script/hydra-notify at line 120
    [4.2905][2.54:108]()
    my ($channelName, $pid, $payload) = @$notify;
    [4.2905]
    [2.108]
    my $channelName = $message->{"channel"};
    my $pid = $message->{"pid"};
    my $payload = $message->{"payload"};
  • file addition: PostgresListener.t (----------)
    [3.697]
    use strict;
    use Setup;
    my %ctx = test_init();
    require Hydra::Model::DB;
    use Hydra::PostgresListener;
    use Test2::V0;
    my $db = Hydra::Model::DB->new;
    my $dbh = $db->storage->dbh;
    my $listener = Hydra::PostgresListener->new($dbh);
    $listener->subscribe("foo");
    $listener->subscribe("bar");
    is(undef, $listener->block_for_messages(0)->(), "There is no message");
    is(undef, $listener->block_for_messages(0)->(), "There is no message");
    is(undef, $listener->block_for_messages(0)->(), "There is no message");
    $dbh->do("notify foo, ?", undef, "hi");
    my $event = $listener->block_for_messages(0)->();
    is($event->{'channel'}, "foo", "The channel matches");
    isnt($event->{'pid'}, undef, "The pid is set");
    is($event->{'payload'}, "hi", "The payload matches");
    is(undef, $listener->block_for_messages(0)->(), "There is no message");
    like(
    dies {
    local $SIG{ALRM} = sub { die "timeout" };
    alarm 1;
    $listener->block_for_messages->();
    alarm 0;
    },
    qr/timeout/,
    "An unspecified block should block forever"
    );
    like(
    dies {
    local $SIG{ALRM} = sub { die "timeout" };
    alarm 1;
    $listener->block_for_messages(2)->();
    alarm 0;
    },
    qr/timeout/,
    "A 2-second block goes longer than 1 second"
    );
    ok(
    lives {
    local $SIG{ALRM} = sub { die "timeout" };
    alarm 2;
    is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout");
    alarm 0;
    },
    "A 1-second block expires within 2 seconds"
    );
    subtest "with wacky channel names" => sub {
    my $channel = "foo! very weird channel names...; select * from t where 1 = 1";
    my $escapedChannel = $dbh->quote_identifier($channel);
    $listener->subscribe($channel);
    is(undef, $listener->block_for_messages(0)->(), "There is no message");
    $dbh->do("notify $escapedChannel, ?", undef, "hi");
    my $event = $listener->block_for_messages(0)->();
    is($event->{'channel'}, $channel, "The channel matches");
    isnt($event->{'pid'}, undef, "The pid is set");
    is($event->{'payload'}, "hi", "The payload matches");
    };
    done_testing;