This lets us test the event loop if we wanted, and lets us test the listening behavior in isolation.
P4SME2BCHKBETP4SFPOWOJM23K57VRFX3RAS634CQKTASXOXYRKQC 4ZCUCACYWDLCZUKIH2OLFC4NCPBE2YO7Y7AZ7WL3AESZQ3TSIYXQC 2JJP76737U2JWJWQ6UDFEAQCGWRAQH46HC6OCIKWMB5QYRXF6DQQC D5QIOJGPKQJIYBUCSC3MFJ3TXLPNZ2XMI37GXMFRVRFWWR2VMTFAC IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC 6WRGCITDYP7JIBYP25QIWCHWRJWFPDP2D3TJS3WO3KUHQQJAWHMQC 32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC 3AKZKWCRQZTASQO5BZ4KCEFJ5OMP7LVF6YOPDJN5KCRIZIW75MCAC package Hydra::PostgresListener;use strict;use warnings;use IO::Select;=head1 Hydra::PostgresListenerAn abstraction around using Postgres' LISTEN / NOTIFY in an event loop.=cut=head2 newArguments:=over 1=item C<$dbh>L<DBI::db> The database connection.=back=cutsub new {my ($self, $dbh) = @_;my $sel = IO::Select->new($dbh->func("getfd"));return bless {"dbh" => $dbh,"sel" => $sel,}, $self;}=head2 subscribeSubscribe to the named channel for messagesArguments:=over 1=item C<$channel>The channel name.=back=cutsub subscribe {my ($self, $channel) = @_;$channel = $self->{'dbh'}->quote_identifier($channel);$self->{'dbh'}->do("listen $channel");}=head2 block_for_messagesWait for messages to arrive within the specified timeout.Arguments:=over 1=item C<$timeout>The maximum number of seconds to wait for messages.Optional: if unspecified, block forever.=backReturns: a sub, call the sub repeatedly to get a message. The subwill return undef when there are no pending messages.Example:my $events = $listener->block_for_messages();while (my $message = $events->()) {...}=cutsub block_for_messages {my ($self, $timeout) = @_;$self->{'sel'}->can_read($timeout);return sub {my $notify = $self->{'dbh'}->func("pg_notifies");if (defined($notify)) {my ($channelName, $pid, $payload) = @$notify;return {channel => $channelName,pid => $pid,payload => $payload,}} else {return undef}}}1;
$dbh->do("listen build_started");$dbh->do("listen build_finished");$dbh->do("listen step_finished");
my $listener = Hydra::PostgresListener->new($dbh);$listener->subscribe("build_started");$listener->subscribe("build_finished");$listener->subscribe("step_finished");
use strict;use Setup;my %ctx = test_init();require Hydra::Model::DB;use Hydra::PostgresListener;use Test2::V0;my $db = Hydra::Model::DB->new;my $dbh = $db->storage->dbh;my $listener = Hydra::PostgresListener->new($dbh);$listener->subscribe("foo");$listener->subscribe("bar");is(undef, $listener->block_for_messages(0)->(), "There is no message");is(undef, $listener->block_for_messages(0)->(), "There is no message");is(undef, $listener->block_for_messages(0)->(), "There is no message");$dbh->do("notify foo, ?", undef, "hi");my $event = $listener->block_for_messages(0)->();is($event->{'channel'}, "foo", "The channel matches");isnt($event->{'pid'}, undef, "The pid is set");is($event->{'payload'}, "hi", "The payload matches");is(undef, $listener->block_for_messages(0)->(), "There is no message");like(dies {local $SIG{ALRM} = sub { die "timeout" };alarm 1;$listener->block_for_messages->();alarm 0;},qr/timeout/,"An unspecified block should block forever");like(dies {local $SIG{ALRM} = sub { die "timeout" };alarm 1;$listener->block_for_messages(2)->();alarm 0;},qr/timeout/,"A 2-second block goes longer than 1 second");ok(lives {local $SIG{ALRM} = sub { die "timeout" };alarm 2;is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout");alarm 0;},"A 1-second block expires within 2 seconds");subtest "with wacky channel names" => sub {my $channel = "foo! very weird channel names...; select * from t where 1 = 1";my $escapedChannel = $dbh->quote_identifier($channel);$listener->subscribe($channel);is(undef, $listener->block_for_messages(0)->(), "There is no message");$dbh->do("notify $escapedChannel, ?", undef, "hi");my $event = $listener->block_for_messages(0)->();is($event->{'channel'}, $channel, "The channel matches");isnt($event->{'pid'}, undef, "The pid is set");is($event->{'payload'}, "hi", "The payload matches");};done_testing;