use anyhow::bail;
use futures::pin_mut;
use futures_util::future::Fuse;
use futures_util::FutureExt;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::future::Future;
use std::path::Path;

use libpijul::HashMap;
use pijul_interaction::ProgressBar;
use tokio::sync::{mpsc, watch};

use crate::{RemoteRepo, CS};

impl RemoteRepo {
    pub async fn download_changes_with<C, F, R>(
        &mut self,
        progress_bar: ProgressBar,
        dest_dir: &Path,
        full: bool,
        op: C,
    ) -> anyhow::Result<R>
    where
        C: FnOnce(DownloadContext) -> F,
        F: Future<Output = anyhow::Result<R>>,
    {
        let (in_tx, mut in_rx) = mpsc::unbounded_channel();
        let (mut out_tx, out_rx) = mpsc::channel(100);

        let notifiers: RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>> =
            RefCell::new(Some(HashMap::new()));

        let dc = DownloadContext {
            data: Default::default(),
            notifiers: &notifiers,
            in_tx,
        };

        let download_task = self
            .download_changes(progress_bar, &mut in_rx, &mut out_tx, dest_dir, full)
            .fuse();
        let control_task = op(dc).fuse();

        pin_mut!(download_task);
        pin_mut!(control_task);

        let mut out_rx = Some(out_rx);
        let mut dl_err = None;
        let mut control_res = None;

        loop {
            futures::select_biased! {
                next = out_rx.as_mut().map(|v| v.recv().fuse()).unwrap_or(Fuse::terminated()) => {
                    let Some((cs, value)) = next else {
                        out_rx.take();
                        continue;
                    };

                    if let Some(ref notifiers) = *notifiers.borrow() {
                        if let Some(tx) = notifiers.get(&cs) {
                            let _ = tx.send(Some(value));
                        }
                    };
                }
                res = download_task => {
                    notifiers.borrow_mut().take();
                    dl_err = res.err();
                }
                res = control_task => {
                    control_res = Some(res);
                }
                complete => break,
            }
        }

        let control_res = control_res.unwrap();

        match dl_err {
            None => control_res,
            Some(ce) => control_res.map_err(|e| e.context(ce)),
        }
    }
}

pub struct DownloadContext<'a> {
    data: RefCell<HashMap<CS, ChangeData>>,
    notifiers: &'a RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>>,
    in_tx: mpsc::UnboundedSender<CS>,
}

struct ChangeData {
    rx: watch::Receiver<Option<bool>>,
}

impl<'a> DownloadContext<'a> {
    pub async fn download(&self, cs: CS) -> anyhow::Result<bool> {
        let mut rx = {
            let mut lock = self.data.borrow_mut();

            match lock.entry(cs) {
                Entry::Occupied(v) => v.get().rx.clone(),
                Entry::Vacant(v) => {
                    let (tx, rx) = watch::channel(None);

                    match *self.notifiers.borrow_mut() {
                        None => {
                            bail!("map is gone");
                        }
                        Some(ref mut v) => {
                            assert!(v.insert(cs, tx).is_none());
                        }
                    }

                    self.in_tx.send(cs)?;

                    v.insert(ChangeData { rx }).rx.clone()
                }
            }
        };

        rx.changed().await?;
        let res = rx.borrow_and_update().unwrap();
        Ok(res)
    }
}