use std::future::Future;
use std::pin::Pin;

use futures_util::stream::Stream;

use tokio::sync::mpsc::Receiver;

use pin_project::pin_project;

#[pin_project(project = StoppableReceiverProj)]
pub struct StoppableReceiver<T, F: Future<Output = ()>> {
    #[pin]
    receiver: Receiver<T>,
    #[pin]
    stop_future: F,
    finished: bool,
}

impl<T, F: Future<Output = ()>> Stream for StoppableReceiver<T, F> {
    type Item = T;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context,
    ) -> std::task::Poll<Option<Self::Item>> {
        let StoppableReceiverProj {
            mut receiver,
            stop_future,
            finished,
        } = self.project();

        if *finished {
            return std::task::Poll::Ready(None);
        }

        if stop_future.poll(cx).is_ready() {
            *finished = true;
            receiver.close();
            return std::task::Poll::Ready(None);
        }

        receiver.poll_recv(cx)
    }
}

pub fn stop_receiver<T, F: Future<Output = ()>>(
    receiver: Receiver<T>,
    stop_future: F,
) -> StoppableReceiver<T, F> {
    StoppableReceiver {
        receiver,
        stop_future,
        finished: false,
    }
}