use std::future::Future;
use std::pin::Pin;
use futures_util::stream::Stream;
use tokio::sync::mpsc::Receiver;
use pin_project::pin_project;
#[pin_project(project = StoppableReceiverProj)]
pub struct StoppableReceiver<T, F: Future<Output = ()>> {
#[pin]
receiver: Receiver<T>,
#[pin]
stop_future: F,
finished: bool,
}
impl<T, F: Future<Output = ()>> Stream for StoppableReceiver<T, F> {
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context,
) -> std::task::Poll<Option<Self::Item>> {
let StoppableReceiverProj {
mut receiver,
stop_future,
finished,
} = self.project();
if *finished {
return std::task::Poll::Ready(None);
}
if stop_future.poll(cx).is_ready() {
*finished = true;
receiver.close();
return std::task::Poll::Ready(None);
}
receiver.poll_recv(cx)
}
}
pub fn stop_receiver<T, F: Future<Output = ()>>(
receiver: Receiver<T>,
stop_future: F,
) -> StoppableReceiver<T, F> {
StoppableReceiver {
receiver,
stop_future,
finished: false,
}
}