ZLDJRMVBOGTLW6BAS3IOVYXR4BLUZ2YWGWIIPA3NFUG4RRTOGZKQC
3E2KY6Y4SQ2UO6PLA4K3MQKHIRHSQCBAJIPJT7NQ6BIETMAQX5QAC
5Z2Y7VGVHJ7A6UPSEAPFVMIB5J7YHSI6WWHUUUYVLDJ56XSNCXKQC
RIZ4IP76N4PNGXMMZSFPOQIUQICFMGSHJNJMYZGHM4WODM45QOHAC
LKIKT4FRKVZAYITMKO23RRFZG25UFX7J6GBOOUQOD24YBRNLHLOAC
6WIH3U43354JLREJ2CU7RSFPXJ4ZOWKY6T2KCZRH5FBD6S3MZ5BAC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC
5QTMRUXNE2XNJCMLN6MQN24UEZ55EFC3LIR4PO6OPNTT5KEL7WXQC
UDGL7ER2R6SY2CBSPA4O4ULQ5PCUDWQBGYOWTN3MQGSS5L2EI2LQC
IVLLXQ5ZWZDKHO4TNQG3TPXN34H6Y2WXPAGSO4PWCYNSKUZWOEJQC
I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC
2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC
BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC
C3L2TLQWREYOM3YHL37L7PS74YGLHBEDQRSCVMYIU6HKBEPNN2SAC
JUYSZJSHULJFR4HUJF72TEKKFMBPG4ZOGAGOJ2BX6P3D4DRZAU5QC
IBPVOKM5MXTGB2P7LCD75MISAYUNDPEKQAUEVCXJJWLWCX2TJZBAC
4HTHYIA3GLMUBUMAI7CQMZA4KE47EUXOP24XLUEYFRMOG57CJLMAC
X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC
ZWVYH7WPYOGDKWODFSAJ6R5U64DON2AVVJ2XZJKHAOMLJEFTYF3QC
HXEIH4UQ6EX3MAY33JK4WQUE5GUSZ673OX57JKNFXC2N2QLTXKXAC
3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC
DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC
PYTC7DPVCWKYDXXBY44BBNB4DHZ3N4OQW3EOEQ7H6Z5P5XBG2EIAC
G7HJHNFDZCGOPGVETNYK7BDDPJXHEIPGZJEJXBGBXSWPWEX3BIQQC
Q45QHPO4HDTEZF2W4UDZSYYQ46BPEIWSW4GJILZR5HTJNLKXJABQC
GNMZNKB46GTPTWBR452FITHPBCMYPSDLV5VZQSY7BX6OJHWTWTZAC
5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC
YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC
A3RM526Y7LUXNYW4TL56YKQ5GVOK2R5D7JJVTSQ6TT5MEXIR6YAAC
XQHABMC2FOMH7SZIYVYAR5MNH2DK2AOUCX2RJKZM3PDG2H5JIXYQC
Z6ASIMORLV437IEFIUYXIRYSGUTIBMPJNIVF6XQMGRP6WV6R4YPAC
OYN2YVPAN6L3X6HZXJI6B2GYGL2W5AXRK6CVDJRWALS5OENNB5UAC
EJ7TFFOWLM5EXYX57NJZZX3NLPBLLMRX7CGJYC75DJZ5LYXOQPJAC
PNJL5TPZLQ3VXAASTLUX7462RCRPO7TV3GKOTTHDZABDQCBMXPRQC
let t = tokio::spawn(async move {
self_
.download_changes(
cloned_download_bar,
&mut hash_recv,
&mut send,
&changes_dir,
false,
)
.await?;
Ok::<_, anyhow::Error>(self_)
});
let mut waiting = 0;
let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
let mut asked = HashSet::new();
for h in to_apply {
debug!("to_apply {:?}", h);
asked.insert(*h);
hash_send.send(*h)?;
waiting += 1;
}
let u = self
.download_changes_rec(
repo,
hash_send,
recv,
send_ready,
download_bar,
waiting,
asked,
)
.await?;
let mut ws = libpijul::ApplyWorkspace::new();
let mut to_apply_inodes = HashSet::new();
while let Some(h) = recv_ready.recv().await {
debug!("to_apply: {:?}", h);
let touches_inodes = inodes.is_empty()
|| {
debug!("inodes = {:?}", inodes);
use libpijul::changestore::ChangeStore;
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
})
})
} else {
false
}
}
|| { inodes.iter().any(|i| CS::Change(i.change) == h) };
if touches_inodes {
to_apply_inodes.insert(h);
} else {
continue;
}
if let Some(apply_bar) = apply_bar.clone() {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
}
debug!("applied");
} else {
debug!("not applying {:?}", h)
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
for h in to_apply {
debug!("finished");
debug!("waiting for spawned process");
*self = t.await??;
u.await??;
Ok(result)
// let mut waiting = 0;
// let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
//
// let mut asked = HashSet::new();
// for h in to_apply {
// debug!("to_apply {:?}", h);
//
// asked.insert(*h);
// hash_send.send(*h)?;
// waiting += 1;
// }
//
// let u = self
// .download_changes_rec(
// repo,
// hash_send,
// recv,
// send_ready,
// download_bar,
// waiting,
// asked,
// )
// .await?;
//
// let mut ws = libpijul::ApplyWorkspace::new();
// let mut to_apply_inodes = HashSet::new();
// while let Some(h) = recv_ready.recv().await {
// debug!("to_apply: {:?}", h);
// let touches_inodes = inodes.is_empty()
// || {
// debug!("inodes = {:?}", inodes);
// use libpijul::changestore::ChangeStore;
// if let CS::Change(ref h) = h {
// let changes = repo.changes.get_changes(h)?;
// changes.iter().any(|c| {
// c.iter().any(|c| {
// let inode = c.inode();
// debug!("inode = {:?}", inode);
// inodes.contains(&Position {
// change: inode.change.unwrap_or(*h),
// pos: inode.pos,
// })
// })
// })
// } else {
// false
// }
// }
// || { inodes.iter().any(|i| CS::Change(i.change) == h) };
//
// if touches_inodes {
// to_apply_inodes.insert(h);
// } else {
// continue;
// }
//
// if let Some(apply_bar) = apply_bar.clone() {
// info!("Applying {:?}", h);
// apply_bar.inc(1);
// debug!("apply");
// if let CS::Change(h) = h {
// let mut channel = channel.write();
// txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
// }
// debug!("applied");
// } else {
// debug!("not applying {:?}", h)
// }
// }
//
// let mut result = Vec::with_capacity(to_apply_inodes.len());
// for h in to_apply {
// if to_apply_inodes.contains(&h) {
// result.push(*h)
// }
// }
//
// debug!("finished");
// debug!("waiting for spawned process");
// *self = t.await??;
// u.await??;
// Ok(result)
mut waiting: usize,
mut asked: HashSet<CS>,
) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
let changes_dir = repo.changes_dir.clone();
let changes = repo.changes.clone();
let t = tokio::spawn(async move {
let mut buf = PathBuf::new();
items: impl IntoIterator<Item = CS> + 'a,
) -> impl Stream<Item=anyhow::Result<CS>> + 'a {
try_stream(|sender| async move {
struct State<'a> {
cx: DownloadContext<'a>,
repo: &'a Repository,
sender: mpsc::Sender<anyhow::Result<CS>>,
futs: FuturesUnordered<LocalBoxFuture<'a, anyhow::Result<()>>>,
barriers: RefCell<HashMap<CS, watch::Receiver<()>>>,
}
let state = State {
cx,
repo,
sender,
futs: Default::default(),
barriers: RefCell::new(Default::default()),
};
fn go<'a>(state: &'a State<'a>, cs: CS) -> watch::Receiver<()> {
state.barriers.borrow_mut().entry(cs).or_insert_with(|| {
let (tx, mut rx) = watch::channel(());
rx.mark_unchanged();
let mut ready = Vec::new();
while let Some((hash, follow)) = recv_signal.recv().await {
debug!("received {:?} {:?}", hash, follow);
if let CS::Change(hash) = hash {
waiting -= 1;
if follow {
use libpijul::changestore::ChangeStore;
let mut needs_dep = false;
for dep in changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
async fn go2<'a>(state: &'a State<'a>, cs: CS) -> anyhow::Result<()> {
let follow = state.cx.download(cs).await?;
if !has_dep {
needs_dep = true;
if asked.insert(CS::Change(dep)) {
progress_bar.inc(1);
send_hash.send(CS::Change(dep))?;
waiting += 1
}
}
}
if !follow {
return Ok(());
}
wait_for(state, state.repo.changes.get_dependencies(&hash)?.into_iter().map(CS::Change)).await;
Ok(())
}
async fn wait_for<'a>(state: &'a State<'a>, items: impl IntoIterator<Item=CS>) {
let mut v = Vec::new();
info!("waiting loop done");
for r in ready {
send_ready.send(r).await?;
state.futs.push(async move {
wait_for(&state, items).await;
Ok(())
}.boxed_local());
while let Some(v) = state.futs.next().await {
v?;
});
Ok(t)
})
// let t = tokio::spawn(async move {
// let mut buf = PathBuf::new();
//
// if waiting == 0 {
// return Ok(());
// }
// let mut ready = Vec::new();
// while let Some((hash, follow)) = recv_signal.recv().await {
// if let CS::Change(hash) = hash {
// waiting -= 1;
// if follow {
// use libpijul::changestore::ChangeStore;
// let mut needs_dep = false;
// for dep in changes.get_dependencies(&hash)? {
// let dep: libpijul::pristine::Hash = dep;
//
// let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
// let has_dep = std::fs::metadata(&dep_path).is_ok();
//
// if !has_dep {
// needs_dep = true;
// if asked.insert(CS::Change(dep)) {
// progress_bar.inc(1);
// send_hash.send(CS::Change(dep))?;
// waiting += 1
// }
// }
// }
//
// if !needs_dep {
// send_ready.send(CS::Change(hash)).await?;
// } else {
// ready.push(CS::Change(hash))
// }
// } else {
// send_ready.send(CS::Change(hash)).await?;
// }
// }
// if waiting == 0 {
// break;
// }
// }
// info!("waiting loop done");
// for r in ready {
// send_ready.send(r).await?;
// }
// std::mem::drop(recv_signal);
// Ok(())
// });
// Ok(t)
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
F: Future<Output = Result<(), E>>,
{
stream(|sender| {
let fut = op(sender.clone());
async move {
if let Err(e) = fut.await {
let _ = sender.send(Err(e));
}
}
})
}
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output=()>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
pub struct FetchChangesState<'a> {
shared: Mutex<Shared<'a>>,
}
struct Shared<'a> {
task: BoxFuture<'a, Result<bool, anyhow::Error>>,
changes: BTreeMap<CS, ChangeState>,
}
struct ChangeState {
wakers: Vec<Waker>,
}
impl<'a> FetchChangesState<'a> {
pub fn new<T>(
remote: &'a mut RemoteRepo,
impl RemoteRepo {
pub async fn download_changes_with<C, F, R>(
&mut self,
let notifiers: RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>> =
RefCell::new(Some(HashMap::new()));
let dc = DownloadContext {
data: Default::default(),
notifiers: ¬ifiers,
in_tx,
};
let download_task = self
.download_changes(progress_bar, &mut in_rx, &mut out_tx, dest_dir, full)
.fuse();
let control_task = op(dc).fuse();
let task = async move {
remote
.download_changes(
progress_bar,
&mut in_rx,
&mut out_tx,
dest_dir.as_ref(),
full,
)
.await
pin_mut!(download_task);
pin_mut!(control_task);
let mut out_rx = Some(out_rx);
let mut dl_err = None;
let mut control_res = None;
loop {
futures::select_biased! {
next = out_rx.as_mut().map(|v| v.recv().fuse()).unwrap_or(Fuse::terminated()) => {
let Some((cs, value)) = next else {
out_rx.take();
continue;
};
if let Some(ref notifiers) = *notifiers.borrow() {
if let Some(tx) = notifiers.get(&cs) {
let _ = tx.send(Some(value));
}
};
}
res = download_task => {
notifiers.borrow_mut().take();
dl_err = res.err();
}
res = control_task => {
control_res = Some(res);
}
complete => break,
}
shared.task.poll_unpin(cx);
todo!()
match *self.notifiers.borrow_mut() {
None => {
bail!("map is gone");
}
Some(ref mut v) => {
assert!(v.insert(cs, tx).is_none());
}
}
self.in_tx.send(cs)?;
v.insert(ChangeData { rx }).rx.clone()
}
}
};
rx.changed().await?;
let res = rx.borrow_and_update().unwrap();
Ok(res)