let t = tokio::spawn(async move {self_.download_changes(cloned_download_bar,&mut hash_recv,&mut send,&changes_dir,false,).await?;
let mut asked = HashSet::new();for h in to_apply {}
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {let stream = download_changes_rec(&cx, repo, to_apply);pin_mut!(stream);
let u = self.download_changes_rec(repo,hash_send,recv,send_ready,download_bar,waiting,asked,).await?;
let mut ws = ApplyWorkspace::new();while let Some(h) = stream.try_next().await? {debug!("to_apply: {:?}", h);let mut touches_inodes = inodes.is_empty();
let mut ws = libpijul::ApplyWorkspace::new();let mut to_apply_inodes = HashSet::new();while let Some(h) = recv_ready.recv().await {debug!("to_apply: {:?}", h);let touches_inodes = inodes.is_empty()|| {
use libpijul::changestore::ChangeStore;if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,
if !touches_inodes {if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;touches_inodes |= changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,})
})
}}if !touches_inodes {touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);}if !touches_inodes {continue;}to_apply_inodes.insert(h);if let Some(apply_bar) = &apply_bar {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(ref h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;}debug!("applied");
if touches_inodes {to_apply_inodes.insert(h);} else {continue;}if let Some(apply_bar) = apply_bar.clone() {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;}debug!("applied");} else {debug!("not applying {:?}", h)}
Ok(())}).await?;
}if !needs_dep {send_ready.send(CS::Change(hash)).await?;} else {ready.push(CS::Change(hash))}} else {send_ready.send(CS::Change(hash)).await?;}}if waiting == 0 {break;}}info!("waiting loop done");for r in ready {send_ready.send(r).await?;}std::mem::drop(recv_signal);Ok(())});Ok(t)
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);let mut self_ = std::mem::replace(self, RemoteRepo::None);let changes_dir = repo.changes_dir.clone();
let t = tokio::spawn(async move {self_.download_changes(cloned_download_bar,&mut recv_hash,&mut send_signal,&changes_dir,false,).await?;Ok(self_)});
let mut hashes = Vec::new();
let mut waiting = 0;let mut asked = HashSet::new();for &h in tag.iter() {waiting += 1;send_hash.send(CS::Change(h))?;asked.insert(CS::Change(h));}
{let txn = &mut *txn;let hashes = &mut hashes;
let u = self.download_changes_rec(repo,send_hash,recv_signal,send_ready,download_bar,waiting,asked,).await?;
let mut ws = ApplyWorkspace::new();
let mut hashes = Vec::new();let mut ws = libpijul::ApplyWorkspace::new();{let mut channel_ = channel.write();while let Some(hash) = recv_ready.recv().await {if let CS::Change(ref hash) = hash {txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
while let Some(cs) = stream.try_next().await? {if let CS::Change(hash) = cs {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;}hashes.push(cs);
use libpijul::changestore::ChangeStore;let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);let mut self_ = std::mem::replace(self, RemoteRepo::None);let changes_dir = repo.changes_dir.clone();
let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =tokio::spawn(async move {self_.download_changes(download_bar,&mut recv_hash,&mut send_sig,&changes_dir,true,).await?;Ok::<_, anyhow::Error>(self_)});
for c in changes {let c = if let CS::Change(c) = c { c } else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;
self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {let mut waiting = Vec::new();for c in changes {let CS::Change(c) = c else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;}if full {waiting.push(cx.download(CS::Change(*c))?);continue;}let Some(&change) = txn.get_internal(&sc)? else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: ChangePosition(0u64.into()),end: ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {waiting.push(cx.download(CS::Change(*c))?);break;}}
let change = if let Some(&i) = txn.get_internal(&sc)? {i} else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: libpijul::pristine::ChangePosition(0u64.into()),end: libpijul::pristine::ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {send_hash.send(CS::Change(*c))?;break;}}}debug!("dropping send_hash");std::mem::drop(send_hash);while recv_sig.recv().await.is_some() {}*self = t.await??;
Ok(())}).await?;
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>whereC: FnOnce(mpsc::Sender<T>) -> F,F: Future<Output = ()>,{struct Impl<F, T> {fut: Once<F>,rx: mpsc::Receiver<T>,}impl<F, T> Stream for Impl<F, T>whereF: Future<Output = ()>,{type Item = T;fn poll_next(self: Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> Poll<Option<Self::Item>> {let (fut, mut rx) = unsafe {let this = self.get_unchecked_mut();(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))};match (fut.poll_next(cx), rx.poll_recv(cx)) {(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),_ => Poll::Pending,}}}let (tx, rx) = mpsc::channel(1);Impl {fut: stream::once(op(tx)),rx,}}
async fn download_changes_rec(&mut self,repo: &mut Repository,send_hash: tokio::sync::mpsc::UnboundedSender<CS>,mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,send_ready: tokio::sync::mpsc::Sender<CS>,progress_bar: ProgressBar,mut waiting: usize,mut asked: HashSet<CS>,) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {let changes_dir = repo.changes_dir.clone();let changes = repo.changes.clone();let t = tokio::spawn(async move {let mut buf = PathBuf::new();
fn download_changes_rec<'a, I>(cx: &'a DownloadContext,repo: &'a Repository,items: I,) -> impl Stream<Item = anyhow::Result<CS>> + 'awhereI: IntoIterator + 'a,I::Item: Borrow<CS>,{try_stream(move |sender| async move {let mut tasks = FuturesUnordered::new();
if waiting == 0 {return Ok(());
// there is probably a way to model this using futures, but I// couldn't find a nice way to do it// so here, have three funny collectionslet mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();let mut fetched = HashSet::new();let make_download_job = |cs| async move {match cx.download(cs) {Ok(v) => (cs, v.await),Err(e) => (cs, Err(e)),
let mut ready = Vec::new();while let Some((hash, follow)) = recv_signal.recv().await {debug!("received {:?} {:?}", hash, follow);if let CS::Change(hash) = hash {waiting -= 1;if follow {use libpijul::changestore::ChangeStore;let mut needs_dep = false;for dep in changes.get_dependencies(&hash)? {let dep: libpijul::pristine::Hash = dep;
};
let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);let has_dep = std::fs::metadata(&dep_path).is_ok();
for item in items {let item = *item.borrow();tasks.push(make_download_job(item));
if !has_dep {needs_dep = true;if asked.insert(CS::Change(dep)) {progress_bar.inc(1);send_hash.send(CS::Change(dep))?;waiting += 1}}}debug!("to_apply {:?}", h);
if let CS::Change(hash) = item {rev_deps.insert(hash, Vec::new());}}
asked.insert(*h);hash_send.send(*h)?;waiting += 1;
while let Some((cs, res)) = tasks.next().await {debug!("{:?} finished downloading (result: {:?})", cs, &res);let follow = res?;let CS::Change(hash) = cs else {debug!("it is not a change, we're done");sender.send(Ok(cs)).await?;continue;};fetched.insert(hash);// first, populate our dependenciesif follow {info!("{:?}", hash);let pending_deps = match pending_deps.entry(hash) {Entry::Occupied(_) => unreachable!(),Entry::Vacant(v) => v.insert(Default::default()),};for dep in repo.changes.get_dependencies(&hash)? {if !fetched.contains(&dep) {pending_deps.insert(dep);}rev_deps.entry(dep).or_insert_with(|| {tasks.push(make_download_job(CS::Change(dep)));Default::default()});}}// then, send completed changes (including parents of this) to// our callerlet mut to_check = vec![hash];while let Some(hash) = to_check.pop() {if let Some(pending_deps) = pending_deps.get(&hash) {if !pending_deps.is_empty() {continue;}}// this change has no pending dependencies => it is completesender.send(Ok(CS::Change(hash))).await?;// other changes may be complete if this was the last// missing dependency, propagatefor dep in rev_deps.get(&hash).into_iter().flatten() {if let Some(p) = pending_deps.get_mut(dep) {assert!(p.remove(dep));to_check.push(*dep);}}}}Ok(())})}