Abstract over postgres' LISTEN/NOTIFY
[?]
Jun 17, 2021, 8:27 PM
P4SME2BCHKBETP4SFPOWOJM23K57VRFX3RAS634CQKTASXOXYRKQCDependencies
- [2]
4ZCUCACYhydra-notify: Fix processing notifications - [3]
2JJP7673tests: move to t, allow `yath test` from root - [4]
32KJOERMTurn hydra-notify into a daemon - [5]
FCTX433OAdd buildStarted plugin hook - [6]
WYELTQMOSeparate payload elements using \t - [7]
6WRGCITDEnable declarative projects. - [8]
NTEDD7T4Provide a plugin hook for when build steps finish - [9]
3AKZKWCRRunCommand: Test - [10]
IE2PRAQUhydra-queue-runner: Send build notifications - [*]
D5QIOJGP* Move everything up one directory.
Change contents
- file addition: PostgresListener.pm[12.53]
package Hydra::PostgresListener;use strict;use warnings;use IO::Select;=head1 Hydra::PostgresListenerAn abstraction around using Postgres' LISTEN / NOTIFY in an event loop.=cut=head2 newArguments:=over 1=item C<$dbh>L<DBI::db> The database connection.=back=cutsub new {my ($self, $dbh) = @_;my $sel = IO::Select->new($dbh->func("getfd"));return bless {"dbh" => $dbh,"sel" => $sel,}, $self;}=head2 subscribeSubscribe to the named channel for messagesArguments:=over 1=item C<$channel>The channel name.=back=cutsub subscribe {my ($self, $channel) = @_;$channel = $self->{'dbh'}->quote_identifier($channel);$self->{'dbh'}->do("listen $channel");}=head2 block_for_messagesWait for messages to arrive within the specified timeout.Arguments:=over 1=item C<$timeout>The maximum number of seconds to wait for messages.Optional: if unspecified, block forever.=backReturns: a sub, call the sub repeatedly to get a message. The subwill return undef when there are no pending messages.Example:my $events = $listener->block_for_messages();while (my $message = $events->()) {...}=cutsub block_for_messages {my ($self, $timeout) = @_;$self->{'sel'}->can_read($timeout);return sub {my $notify = $self->{'dbh'}->func("pg_notifies");if (defined($notify)) {my ($channelName, $pid, $payload) = @$notify;return {channel => $channelName,pid => $pid,payload => $payload,}} else {return undef}}}1; - edit in src/script/hydra-notify at line 6
use Hydra::PostgresListener; - edit in src/script/hydra-notify at line 9
use IO::Select; - replacement in src/script/hydra-notify at line 29
$dbh->do("listen build_started");$dbh->do("listen build_finished");$dbh->do("listen step_finished");my $listener = Hydra::PostgresListener->new($dbh);$listener->subscribe("build_started");$listener->subscribe("build_finished");$listener->subscribe("step_finished"); - edit in src/script/hydra-notify at line 116
my $fd = $dbh->func("getfd");my $sel = IO::Select->new($fd); - replacement in src/script/hydra-notify at line 117[4.125]→[4.2705:2725](∅→∅),[4.2705]→[4.2705:2725](∅→∅),[4.2791]→[4.2791:2792](∅→∅),[4.2792]→[2.0:53](∅→∅)
$sel->can_read;while (my $notify = $dbh->func("pg_notifies")) {my $messages = $listener->block_for_messages();while (my $message = $messages->()) { - replacement in src/script/hydra-notify at line 120
my ($channelName, $pid, $payload) = @$notify;my $channelName = $message->{"channel"};my $pid = $message->{"pid"};my $payload = $message->{"payload"}; - file addition: PostgresListener.t[3.697]
use strict;use Setup;my %ctx = test_init();require Hydra::Model::DB;use Hydra::PostgresListener;use Test2::V0;my $db = Hydra::Model::DB->new;my $dbh = $db->storage->dbh;my $listener = Hydra::PostgresListener->new($dbh);$listener->subscribe("foo");$listener->subscribe("bar");is(undef, $listener->block_for_messages(0)->(), "There is no message");is(undef, $listener->block_for_messages(0)->(), "There is no message");is(undef, $listener->block_for_messages(0)->(), "There is no message");$dbh->do("notify foo, ?", undef, "hi");my $event = $listener->block_for_messages(0)->();is($event->{'channel'}, "foo", "The channel matches");isnt($event->{'pid'}, undef, "The pid is set");is($event->{'payload'}, "hi", "The payload matches");is(undef, $listener->block_for_messages(0)->(), "There is no message");like(dies {local $SIG{ALRM} = sub { die "timeout" };alarm 1;$listener->block_for_messages->();alarm 0;},qr/timeout/,"An unspecified block should block forever");like(dies {local $SIG{ALRM} = sub { die "timeout" };alarm 1;$listener->block_for_messages(2)->();alarm 0;},qr/timeout/,"A 2-second block goes longer than 1 second");ok(lives {local $SIG{ALRM} = sub { die "timeout" };alarm 2;is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout");alarm 0;},"A 1-second block expires within 2 seconds");subtest "with wacky channel names" => sub {my $channel = "foo! very weird channel names...; select * from t where 1 = 1";my $escapedChannel = $dbh->quote_identifier($channel);$listener->subscribe($channel);is(undef, $listener->block_for_messages(0)->(), "There is no message");$dbh->do("notify $escapedChannel, ?", undef, "hi");my $event = $listener->block_for_messages(0)->();is($event->{'channel'}, $channel, "The channel matches");isnt($event->{'pid'}, undef, "The pid is set");is($event->{'payload'}, "hi", "The payload matches");};done_testing;