This lets us test the event loop if we wanted, and lets us test the listening behavior in isolation.
P4SME2BCHKBETP4SFPOWOJM23K57VRFX3RAS634CQKTASXOXYRKQC
4ZCUCACYWDLCZUKIH2OLFC4NCPBE2YO7Y7AZ7WL3AESZQ3TSIYXQC
2JJP76737U2JWJWQ6UDFEAQCGWRAQH46HC6OCIKWMB5QYRXF6DQQC
D5QIOJGPKQJIYBUCSC3MFJ3TXLPNZ2XMI37GXMFRVRFWWR2VMTFAC
IE2PRAQUCQVFPJ4CAIJRPXXEFC5VBAE3EO5I5FG4XWEDRNONNHKQC
6WRGCITDYP7JIBYP25QIWCHWRJWFPDP2D3TJS3WO3KUHQQJAWHMQC
32KJOERMPFWZZZCIN6TGGVX72GMAJT6VCCIP7S65EHNF3KM42KWAC
3AKZKWCRQZTASQO5BZ4KCEFJ5OMP7LVF6YOPDJN5KCRIZIW75MCAC
package Hydra::PostgresListener;
use strict;
use warnings;
use IO::Select;
=head1 Hydra::PostgresListener
An abstraction around using Postgres' LISTEN / NOTIFY in an event loop.
=cut
=head2 new
Arguments:
=over 1
=item C<$dbh>
L<DBI::db> The database connection.
=back
=cut
sub new {
my ($self, $dbh) = @_;
my $sel = IO::Select->new($dbh->func("getfd"));
return bless {
"dbh" => $dbh,
"sel" => $sel,
}, $self;
}
=head2 subscribe
Subscribe to the named channel for messages
Arguments:
=over 1
=item C<$channel>
The channel name.
=back
=cut
sub subscribe {
my ($self, $channel) = @_;
$channel = $self->{'dbh'}->quote_identifier($channel);
$self->{'dbh'}->do("listen $channel");
}
=head2 block_for_messages
Wait for messages to arrive within the specified timeout.
Arguments:
=over 1
=item C<$timeout>
The maximum number of seconds to wait for messages.
Optional: if unspecified, block forever.
=back
Returns: a sub, call the sub repeatedly to get a message. The sub
will return undef when there are no pending messages.
Example:
my $events = $listener->block_for_messages();
while (my $message = $events->()) {
...
}
=cut
sub block_for_messages {
my ($self, $timeout) = @_;
$self->{'sel'}->can_read($timeout);
return sub {
my $notify = $self->{'dbh'}->func("pg_notifies");
if (defined($notify)) {
my ($channelName, $pid, $payload) = @$notify;
return {
channel => $channelName,
pid => $pid,
payload => $payload,
}
} else {
return undef
}
}
}
1;
$dbh->do("listen build_started");
$dbh->do("listen build_finished");
$dbh->do("listen step_finished");
my $listener = Hydra::PostgresListener->new($dbh);
$listener->subscribe("build_started");
$listener->subscribe("build_finished");
$listener->subscribe("step_finished");
use strict;
use Setup;
my %ctx = test_init();
require Hydra::Model::DB;
use Hydra::PostgresListener;
use Test2::V0;
my $db = Hydra::Model::DB->new;
my $dbh = $db->storage->dbh;
my $listener = Hydra::PostgresListener->new($dbh);
$listener->subscribe("foo");
$listener->subscribe("bar");
is(undef, $listener->block_for_messages(0)->(), "There is no message");
is(undef, $listener->block_for_messages(0)->(), "There is no message");
is(undef, $listener->block_for_messages(0)->(), "There is no message");
$dbh->do("notify foo, ?", undef, "hi");
my $event = $listener->block_for_messages(0)->();
is($event->{'channel'}, "foo", "The channel matches");
isnt($event->{'pid'}, undef, "The pid is set");
is($event->{'payload'}, "hi", "The payload matches");
is(undef, $listener->block_for_messages(0)->(), "There is no message");
like(
dies {
local $SIG{ALRM} = sub { die "timeout" };
alarm 1;
$listener->block_for_messages->();
alarm 0;
},
qr/timeout/,
"An unspecified block should block forever"
);
like(
dies {
local $SIG{ALRM} = sub { die "timeout" };
alarm 1;
$listener->block_for_messages(2)->();
alarm 0;
},
qr/timeout/,
"A 2-second block goes longer than 1 second"
);
ok(
lives {
local $SIG{ALRM} = sub { die "timeout" };
alarm 2;
is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout");
alarm 0;
},
"A 1-second block expires within 2 seconds"
);
subtest "with wacky channel names" => sub {
my $channel = "foo! very weird channel names...; select * from t where 1 = 1";
my $escapedChannel = $dbh->quote_identifier($channel);
$listener->subscribe($channel);
is(undef, $listener->block_for_messages(0)->(), "There is no message");
$dbh->do("notify $escapedChannel, ?", undef, "hi");
my $event = $listener->block_for_messages(0)->();
is($event->{'channel'}, $channel, "The channel matches");
isnt($event->{'pid'}, undef, "The pid is set");
is($event->{'payload'}, "hi", "The payload matches");
};
done_testing;