JGN2BVS5RYX6EFOG7WJGIPTSWG3KE757J5BUGCC64QWCUSXZKZPQC
JLVRAJA5Z5DNBKHZDLT6QTKJ4LLJ547S4DH7X4A3XTB6234O5XDQC
3E2KY6Y4SQ2UO6PLA4K3MQKHIRHSQCBAJIPJT7NQ6BIETMAQX5QAC
QWIYNMI5SOTLRPYE4O3AG7R75JXM2TB3ZADU646PG6ACPBGSYUYAC
5Z2Y7VGVHJ7A6UPSEAPFVMIB5J7YHSI6WWHUUUYVLDJ56XSNCXKQC
G7HJHNFDZCGOPGVETNYK7BDDPJXHEIPGZJEJXBGBXSWPWEX3BIQQC
LKIKT4FRKVZAYITMKO23RRFZG25UFX7J6GBOOUQOD24YBRNLHLOAC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC
L4JXJHWXYNCL4QGJXNKKTOKKTAXKKXBJUUY7HFZGEUZ5A2V5H34QC
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC
4RV7T4SRYIQLBW3EZFWBO5G65QAVG2GHPI4KMDDZERCX65KQWSPQC
5QTMRUXNE2XNJCMLN6MQN24UEZ55EFC3LIR4PO6OPNTT5KEL7WXQC
UDGL7ER2R6SY2CBSPA4O4ULQ5PCUDWQBGYOWTN3MQGSS5L2EI2LQC
IVLLXQ5ZWZDKHO4TNQG3TPXN34H6Y2WXPAGSO4PWCYNSKUZWOEJQC
I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC
Q45QHPO4HDTEZF2W4UDZSYYQ46BPEIWSW4GJILZR5HTJNLKXJABQC
2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC
JUYSZJSHULJFR4HUJF72TEKKFMBPG4ZOGAGOJ2BX6P3D4DRZAU5QC
DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC
4XLHUME7YLJV6XUZBOW7PX62TCJXIWW2CPITXO5GZOULWGXRVDZAC
BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC
C3L2TLQWREYOM3YHL37L7PS74YGLHBEDQRSCVMYIU6HKBEPNN2SAC
IBPVOKM5MXTGB2P7LCD75MISAYUNDPEKQAUEVCXJJWLWCX2TJZBAC
4HTHYIA3GLMUBUMAI7CQMZA4KE47EUXOP24XLUEYFRMOG57CJLMAC
X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC
ZWVYH7WPYOGDKWODFSAJ6R5U64DON2AVVJ2XZJKHAOMLJEFTYF3QC
HXEIH4UQ6EX3MAY33JK4WQUE5GUSZ673OX57JKNFXC2N2QLTXKXAC
PYTC7DPVCWKYDXXBY44BBNB4DHZ3N4OQW3EOEQ7H6Z5P5XBG2EIAC
GNMZNKB46GTPTWBR452FITHPBCMYPSDLV5VZQSY7BX6OJHWTWTZAC
5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC
YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC
EUZFFJSOWV4PXDFFPDAFBHFUUMOFEU6ST7JH57YYRRR2SEOXLN6QC
TPEH2XNBS5RO4IEVKENVF6P65AH7IX64KK2JAYMSJT3J5GXO67EAC
CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC
I24UEJQLCH2SOXA4UHIYWTRDCHSOPU7AFTRUOTX7HZIAV4AZKYEQC
XQHABMC2FOMH7SZIYVYAR5MNH2DK2AOUCX2RJKZM3PDG2H5JIXYQC
Z6ASIMORLV437IEFIUYXIRYSGUTIBMPJNIVF6XQMGRP6WV6R4YPAC
let t = tokio::spawn(async move {
self_
.download_changes(
cloned_download_bar,
&mut hash_recv,
&mut send,
&changes_dir,
false,
)
.await?;
let mut asked = HashSet::new();
for h in to_apply {
}
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
let stream = download_changes_rec(&cx, repo, to_apply);
pin_mut!(stream);
let u = self
.download_changes_rec(
repo,
hash_send,
recv,
send_ready,
download_bar,
waiting,
asked,
)
.await?;
let mut ws = ApplyWorkspace::new();
while let Some(h) = stream.try_next().await? {
debug!("to_apply: {:?}", h);
let mut touches_inodes = inodes.is_empty();
let mut ws = libpijul::ApplyWorkspace::new();
let mut to_apply_inodes = HashSet::new();
while let Some(h) = recv_ready.recv().await {
debug!("to_apply: {:?}", h);
let touches_inodes = inodes.is_empty()
|| {
use libpijul::changestore::ChangeStore;
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
if !touches_inodes {
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
touches_inodes |= changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
})
}
}
if !touches_inodes {
touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
}
if !touches_inodes {
continue;
}
to_apply_inodes.insert(h);
if let Some(apply_bar) = &apply_bar {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(ref h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
debug!("applied");
if touches_inodes {
to_apply_inodes.insert(h);
} else {
continue;
}
if let Some(apply_bar) = apply_bar.clone() {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
}
debug!("applied");
} else {
debug!("not applying {:?}", h)
}
Ok(())
})
.await?;
}
if !needs_dep {
send_ready.send(CS::Change(hash)).await?;
} else {
ready.push(CS::Change(hash))
}
} else {
send_ready.send(CS::Change(hash)).await?;
}
}
if waiting == 0 {
break;
}
}
info!("waiting loop done");
for r in ready {
send_ready.send(r).await?;
}
std::mem::drop(recv_signal);
Ok(())
});
Ok(t)
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let changes_dir = repo.changes_dir.clone();
let t = tokio::spawn(async move {
self_
.download_changes(
cloned_download_bar,
&mut recv_hash,
&mut send_signal,
&changes_dir,
false,
)
.await?;
Ok(self_)
});
let mut hashes = Vec::new();
let mut waiting = 0;
let mut asked = HashSet::new();
for &h in tag.iter() {
waiting += 1;
send_hash.send(CS::Change(h))?;
asked.insert(CS::Change(h));
}
{
let txn = &mut *txn;
let hashes = &mut hashes;
let u = self
.download_changes_rec(
repo,
send_hash,
recv_signal,
send_ready,
download_bar,
waiting,
asked,
)
.await?;
let mut ws = ApplyWorkspace::new();
let mut hashes = Vec::new();
let mut ws = libpijul::ApplyWorkspace::new();
{
let mut channel_ = channel.write();
while let Some(hash) = recv_ready.recv().await {
if let CS::Change(ref hash) = hash {
txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
while let Some(cs) = stream.try_next().await? {
if let CS::Change(hash) = cs {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
}
hashes.push(cs);
use libpijul::changestore::ChangeStore;
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let changes_dir = repo.changes_dir.clone();
let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
tokio::spawn(async move {
self_
.download_changes(
download_bar,
&mut recv_hash,
&mut send_sig,
&changes_dir,
true,
)
.await?;
Ok::<_, anyhow::Error>(self_)
});
for c in changes {
let c = if let CS::Change(c) = c { c } else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
let mut waiting = Vec::new();
for c in changes {
let CS::Change(c) = c else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
}
if full {
waiting.push(cx.download(CS::Change(*c))?);
continue;
}
let Some(&change) = txn.get_internal(&sc)? else {
debug!("could not find internal for {:?}", sc);
continue;
};
// Check if at least one non-empty vertex from c is still alive.
let v = libpijul::pristine::Vertex {
change,
start: ChangePosition(0u64.into()),
end: ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
waiting.push(cx.download(CS::Change(*c))?);
break;
}
}
let change = if let Some(&i) = txn.get_internal(&sc)? {
i
} else {
debug!("could not find internal for {:?}", sc);
continue;
};
// Check if at least one non-empty vertex from c is still alive.
let v = libpijul::pristine::Vertex {
change,
start: libpijul::pristine::ChangePosition(0u64.into()),
end: libpijul::pristine::ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
send_hash.send(CS::Change(*c))?;
break;
}
}
}
debug!("dropping send_hash");
std::mem::drop(send_hash);
while recv_sig.recv().await.is_some() {}
*self = t.await??;
Ok(())
})
.await?;
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>,
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output = ()>,
{
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
async fn download_changes_rec(
&mut self,
repo: &mut Repository,
send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
send_ready: tokio::sync::mpsc::Sender<CS>,
progress_bar: ProgressBar,
mut waiting: usize,
mut asked: HashSet<CS>,
) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
let changes_dir = repo.changes_dir.clone();
let changes = repo.changes.clone();
let t = tokio::spawn(async move {
let mut buf = PathBuf::new();
fn download_changes_rec<'a, I>(
cx: &'a DownloadContext,
repo: &'a Repository,
items: I,
) -> impl Stream<Item = anyhow::Result<CS>> + 'a
where
I: IntoIterator + 'a,
I::Item: Borrow<CS>,
{
try_stream(move |sender| async move {
let mut tasks = FuturesUnordered::new();
if waiting == 0 {
return Ok(());
// there is probably a way to model this using futures, but I
// couldn't find a nice way to do it
// so here, have three funny collections
let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
let mut fetched = HashSet::new();
let make_download_job = |cs| async move {
match cx.download(cs) {
Ok(v) => (cs, v.await),
Err(e) => (cs, Err(e)),
let mut ready = Vec::new();
while let Some((hash, follow)) = recv_signal.recv().await {
debug!("received {:?} {:?}", hash, follow);
if let CS::Change(hash) = hash {
waiting -= 1;
if follow {
use libpijul::changestore::ChangeStore;
let mut needs_dep = false;
for dep in changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
};
let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
let has_dep = std::fs::metadata(&dep_path).is_ok();
for item in items {
let item = *item.borrow();
tasks.push(make_download_job(item));
if !has_dep {
needs_dep = true;
if asked.insert(CS::Change(dep)) {
progress_bar.inc(1);
send_hash.send(CS::Change(dep))?;
waiting += 1
}
}
}
debug!("to_apply {:?}", h);
if let CS::Change(hash) = item {
rev_deps.insert(hash, Vec::new());
}
}
asked.insert(*h);
hash_send.send(*h)?;
waiting += 1;
while let Some((cs, res)) = tasks.next().await {
debug!("{:?} finished downloading (result: {:?})", cs, &res);
let follow = res?;
let CS::Change(hash) = cs else {
debug!("it is not a change, we're done");
sender.send(Ok(cs)).await?;
continue;
};
fetched.insert(hash);
// first, populate our dependencies
if follow {
info!("{:?}", hash);
let pending_deps = match pending_deps.entry(hash) {
Entry::Occupied(_) => unreachable!(),
Entry::Vacant(v) => v.insert(Default::default()),
};
for dep in repo.changes.get_dependencies(&hash)? {
if !fetched.contains(&dep) {
pending_deps.insert(dep);
}
rev_deps.entry(dep).or_insert_with(|| {
tasks.push(make_download_job(CS::Change(dep)));
Default::default()
});
}
}
// then, send completed changes (including parents of this) to
// our caller
let mut to_check = vec![hash];
while let Some(hash) = to_check.pop() {
if let Some(pending_deps) = pending_deps.get(&hash) {
if !pending_deps.is_empty() {
continue;
}
}
// this change has no pending dependencies => it is complete
sender.send(Ok(CS::Change(hash))).await?;
// other changes may be complete if this was the last
// missing dependency, propagate
for dep in rev_deps.get(&hash).into_iter().flatten() {
if let Some(p) = pending_deps.get_mut(dep) {
assert!(p.remove(dep));
to_check.push(*dep);
}
}
}
}
Ok(())
})
}