3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC
WTZXEWY7IAXJAFNV7STCNQY2SNRDPHX3MKOEZ77NEJUN4MS2VYSQC
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
Q45QHPO4HDTEZF2W4UDZSYYQ46BPEIWSW4GJILZR5HTJNLKXJABQC
K6GWUOD55G377RVEEMMRPZ4EUAHCM2BGXNRJTE5UZJFFMJGFCEZQC
KTTKF3RWYAK2YSH2DYYW5QVG4KSNGWUBJBFHKE24OJ7LFCBF5FEAC
WLUID7NANDWTN5GOECNEKFTLZF3MUVS7K26YWLYLSGJ56G63NV4QC
X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC
HXEIH4UQ6EX3MAY33JK4WQUE5GUSZ673OX57JKNFXC2N2QLTXKXAC
// If we're still waiting for
// another change.
debug!("before pop: {:?}", path);
libpijul::changestore::filesystem::pop_filename(path);
libpijul::changestore::filesystem::push_filename(
path,
&hashes[*current],
);
debug!("after pop: {:?}", path);
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("tmp");
debug!("creating file {:?}", path);
// If we're still waiting for another
// change.
let mut path = changes_dir.clone();
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("tmp");
let path = changes_dir.join("tmp");
std::fs::create_dir_all(&changes_dir)?;
while let Some(c) = c.recv().await {
if let State::Changes { ref mut hashes, .. } = *self.state.lock().await {
hashes.push(c);
}
debug!("download_change {:?} {:?}", c, full);
if full {
self.c
.data(format!("change {}\n", c.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", c.to_base32()).as_bytes())
.await?;
}
len += 1;
}
while let Some(_hash) = recv.recv().await {
debug!("received hash {:?}", _hash);
if let Some(ref mut progress) = *PROGRESS.lock().await {
progress.inc(1);
}
if let Some(ref mut sender) = sender {
if sender.send(_hash).await.is_err() {
let mut dropped = false;
loop {
tokio::select! {
x = recv.recv() => {
let hash = if let Some(hash) = x {
debug!("received hash {:?}", hash);
hash
} else {
debug!("finished");
break
};
break;
if let Some(ref mut sender) = sender {
if sender.send(hash).await.is_err() {
if let Some(ref mut progress) = *PROGRESS.lock().await {
progress.abandon();
}
break;
}
}
}
x = c.recv(), if !dropped => {
let c = if let Some(c) = x {
c
} else {
debug!("other end dropped");
dropped = true;
if len == 0 {
break
} else {
continue
}
};
if let State::Changes { ref mut hashes, .. } = *self.state.lock().await {
hashes.push(c);
}
debug!("download_change {:?} {:?}", c, full);
if full {
self.c
.data(format!("change {}\n", c.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", c.to_base32()).as_bytes())
.await?;
}
if let Some(ref mut p) = *PROGRESS.lock().await {
p.inc_length(1)
}
len += 1;