use anyhow::bail;
use futures::pin_mut;
use futures_util::future::Fuse;
use futures_util::FutureExt;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::future::Future;
use std::path::Path;
use libpijul::HashMap;
use pijul_interaction::ProgressBar;
use tokio::sync::{mpsc, watch};
use crate::{RemoteRepo, CS};
impl RemoteRepo {
pub async fn download_changes_with<C, F, R>(
&mut self,
progress_bar: ProgressBar,
dest_dir: &Path,
full: bool,
op: C,
) -> anyhow::Result<R>
where
C: FnOnce(DownloadContext) -> F,
F: Future<Output = anyhow::Result<R>>,
{
let (in_tx, mut in_rx) = mpsc::unbounded_channel();
let (mut out_tx, out_rx) = mpsc::channel(100);
let notifiers: RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>> =
RefCell::new(Some(HashMap::new()));
let dc = DownloadContext {
data: Default::default(),
notifiers: ¬ifiers,
in_tx,
};
let download_task = self
.download_changes(progress_bar, &mut in_rx, &mut out_tx, dest_dir, full)
.fuse();
let control_task = op(dc).fuse();
pin_mut!(download_task);
pin_mut!(control_task);
let mut out_rx = Some(out_rx);
let mut dl_err = None;
let mut control_res = None;
loop {
futures::select_biased! {
next = out_rx.as_mut().map(|v| v.recv().fuse()).unwrap_or(Fuse::terminated()) => {
let Some((cs, value)) = next else {
out_rx.take();
continue;
};
if let Some(ref notifiers) = *notifiers.borrow() {
if let Some(tx) = notifiers.get(&cs) {
let _ = tx.send(Some(value));
}
};
}
res = download_task => {
notifiers.borrow_mut().take();
dl_err = res.err();
}
res = control_task => {
control_res = Some(res);
}
complete => break,
}
}
let control_res = control_res.unwrap();
match dl_err {
None => control_res,
Some(ce) => control_res.map_err(|e| e.context(ce)),
}
}
}
pub struct DownloadContext<'a> {
data: RefCell<HashMap<CS, ChangeData>>,
notifiers: &'a RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>>,
in_tx: mpsc::UnboundedSender<CS>,
}
struct ChangeData {
rx: watch::Receiver<Option<bool>>,
}
impl<'a> DownloadContext<'a> {
pub async fn download(&self, cs: CS) -> anyhow::Result<bool> {
let mut rx = {
let mut lock = self.data.borrow_mut();
match lock.entry(cs) {
Entry::Occupied(v) => v.get().rx.clone(),
Entry::Vacant(v) => {
let (tx, rx) = watch::channel(None);
match *self.notifiers.borrow_mut() {
None => {
bail!("map is gone");
}
Some(ref mut v) => {
assert!(v.insert(cs, tx).is_none());
}
}
self.in_tx.send(cs)?;
v.insert(ChangeData { rx }).rx.clone()
}
}
};
rx.changed().await?;
let res = rx.borrow_and_update().unwrap();
Ok(res)
}
}