For some reason it didn't record this as part of the other change
M32SWK6ZR3ZXXQCCGFCE4DZQIDKUGHESHECI6R2Q5WQLRWV52NTAC
JGN2BVS5RYX6EFOG7WJGIPTSWG3KE757J5BUGCC64QWCUSXZKZPQC
MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
C3L2TLQWREYOM3YHL37L7PS74YGLHBEDQRSCVMYIU6HKBEPNN2SAC
DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC
PYTC7DPVCWKYDXXBY44BBNB4DHZ3N4OQW3EOEQ7H6Z5P5XBG2EIAC
Q45QHPO4HDTEZF2W4UDZSYYQ46BPEIWSW4GJILZR5HTJNLKXJABQC
A3RM526Y7LUXNYW4TL56YKQ5GVOK2R5D7JJVTSQ6TT5MEXIR6YAAC
IBPVOKM5MXTGB2P7LCD75MISAYUNDPEKQAUEVCXJJWLWCX2TJZBAC
UDGL7ER2R6SY2CBSPA4O4ULQ5PCUDWQBGYOWTN3MQGSS5L2EI2LQC
2GQCLJZGIXMTKDVMYIIQJDOR5EXGBZS5FKH2S4DTN25WKKBUMQQQC
YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC
RM225IDQR36MNBMN7OT2R562M4FUD6L34RY7A3WDBBETZIIDKZ5AC
X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC
2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC
L2VH4BYK3IULLGBHXMZJWKRKDQY43QEMQRTXFJCNRDE7PODLXWTAC
EUZFFJSOWV4PXDFFPDAFBHFUUMOFEU6ST7JH57YYRRR2SEOXLN6QC
76PCXGML77EZWTRI5E6KHLVRAFTJ2AB5YRN5EKOYNAPKTWY2KCGAC
ZDK3GNDBWXJ2OXFDYB72ZCEBGLBF4MKE5K3PVHDZATHJ7HJIDPRQC
JUYSZJSHULJFR4HUJF72TEKKFMBPG4ZOGAGOJ2BX6P3D4DRZAU5QC
G7HJHNFDZCGOPGVETNYK7BDDPJXHEIPGZJEJXBGBXSWPWEX3BIQQC
LKIKT4FRKVZAYITMKO23RRFZG25UFX7J6GBOOUQOD24YBRNLHLOAC
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC
ABQDWHNGSBF2REQDCGXSBFAU4RUMXYAF2KHJ5O3D32M7Z3A3FEDAC
I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC
CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC
I24UEJQLCH2SOXA4UHIYWTRDCHSOPU7AFTRUOTX7HZIAV4AZKYEQC
C5XGFNKIX3RM6KOKRYTECBDDRDAE33JWIVJAJJTEFKFXQNXHEKHQC
27RZYCM3XP72CW7FYGE373YAFD6EEZOZ4YAHEKV6JE5L6Z5N3JNAC
5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC
4XLHUME7YLJV6XUZBOW7PX62TCJXIWW2CPITXO5GZOULWGXRVDZAC
XQHABMC2FOMH7SZIYVYAR5MNH2DK2AOUCX2RJKZM3PDG2H5JIXYQC
QWIYNMI5SOTLRPYE4O3AG7R75JXM2TB3ZADU646PG6ACPBGSYUYAC
3E2KY6Y4SQ2UO6PLA4K3MQKHIRHSQCBAJIPJT7NQ6BIETMAQX5QAC
BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC
HXEIH4UQ6EX3MAY33JK4WQUE5GUSZ673OX57JKNFXC2N2QLTXKXAC
3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC
Ok(())
})
.await?;
}
}
debug!("not applying {:?}", h)
} else {
}
}
if !touches_inodes {
touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
}
if !touches_inodes {
continue;
}
to_apply_inodes.insert(h);
if let Some(apply_bar) = &apply_bar {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(ref h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
debug!("applied");
})
})
if !touches_inodes {
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
touches_inodes |= changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
debug!("inodes = {:?}", inodes);
}
if let CS::Change(hash) = item {
rev_deps.insert(hash, Vec::new());
}
}
tag: caps.name("tag").is_some(),
}
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
F: Future<Output = Result<(), E>>,
{
stream(|sender| {
let fut = op(sender.clone());
async move {
if let Err(e) = fut.await {
let _ = sender.send(Err(e));
}
}
})
}
/// Compare the remote set (theirs_ge_dichotomy) with our current
/// version of that (ours_ge_dichotomy) and return the changes in our
/// current version that are not in the remote anymore.
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
txn: &T,
current_channel: &ChannelRef<T>,
ours_ge_dichotomy: &[(u64, CS)],
theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
let mut remote_unrecs = Vec::new();
for (n, hash) in ours_ge_dichotomy {
debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
if theirs_ge_dichotomy_set.contains(hash) {
// If this change is still present in the remote, skip
debug!("still present");
continue;
} else {
let has_it = match hash {
CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
CS::State(state) => {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
txn.is_tagged(txn.tags(&*ch), n.into())?
} else {
false
}
}
};
if has_it {
remote_unrecs.push((*n, *hash))
} else {
// If this unrecord wasn't in our current channel, skip
continue;
}
}
}
Ok(remote_unrecs)
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>,
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output = ()>,
{
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
}
debug!("offending line: {:?}", data);
bail!("Protocol error")
}
}));
pos: ChangePosition(
caps.name("num")
.unwrap()
.as_str()
.parse::<u64>()
.unwrap()
.into(),
),
if let Some(caps) = PATHS_LINE.captures(data) {
return Ok(ListLine::Position(Position {
change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
if data.starts_with("error:") {
return Ok(ListLine::Error(data.split_at(6).1.to_string()));
}
});
}
}
h,
m,
n: caps.name("num").unwrap().as_str().parse().unwrap(),
if let Some(caps) = CHANGELIST_LINE.captures(data) {
if let (Some(h), Some(m)) = (
Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
) {
return Ok(ListLine::Change {
debug!("data = {:?}", data);
}
fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
Error(String),
Position(Position<Hash>),
},
tag: bool,
Change {
n: u64,
h: Hash,
m: Merkle,
static ref PATHS_LINE: Regex =
Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
}
enum ListLine {
static ref CHANGELIST_LINE: Regex = Regex::new(
r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
)
.unwrap();
lazy_static! {
}
}
Ok(())
self.complete_changes(repo, txn, local_channel, &pullable, false)
.await?;
self.pull(repo, txn, local_channel, &pullable, &inodes, true)
.await?;
self.update_identities(repo, &remote_changes).await?;
}
}
pullable.push(CS::Change(p.a.into()))
for x in txn.iter_remote(&rem.remote, 0)? {
let (_, p) = x?;
let rem = remote_changes.lock();
{
let mut pullable = Vec::new();
let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
x
} else {
bail!("Channel not found")
};
path: &[String],
) -> Result<(), anyhow::Error> {
&mut self,
repo: &mut Repository,
txn: &mut T,
local_channel: &mut ChannelRef<T>,
pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
Ok(())
}
Ok(())
})
.await?;
}
for w in waiting {
w.await?;
}
self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
let mut waiting = Vec::new();
for c in changes {
let CS::Change(c) = c else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
}
if full {
waiting.push(cx.download(CS::Change(*c))?);
continue;
}
let Some(&change) = txn.get_internal(&sc)? else {
debug!("could not find internal for {:?}", sc);
continue;
};
// Check if at least one non-empty vertex from c is still alive.
let v = libpijul::pristine::Vertex {
change,
start: ChangePosition(0u64.into()),
end: ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
waiting.push(cx.download(CS::Change(*c))?);
break;
}
}
let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
debug!("complete changes {:?}", changes);
full: bool,
) -> Result<(), anyhow::Error> {
changes: &[CS],
local_channel: &ChannelRef<T>,
txn: &T,
repo: &pijul_repository::Repository,
&mut self,
pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
Ok(())
}
.await?;
self.complete_changes(repo, txn, channel, &to_pull, false)
self.update_identities(repo, &remote).await?;
.await?;
self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
}
bail!("State not found: {:?}", state)
found = true;
break;
}
}
if !found {
if p.b == state {
to_pull.push(CS::Change(p.a.into()));
let (n, p) = x?;
debug!("{:?} {:?}", n, p);
for x in txn.iter_remote(&remote.lock().remote, 0)? {
let mut to_pull = Vec::new();
let mut found = false;
let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
self.update_changelist(txn, &[]).await?;
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(());
};
) -> Result<(), anyhow::Error> {
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
state: Merkle,
pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
Ok(())
}
self.complete_changes(repo, txn, channel, &hashes, false)
.await?;
}
Ok(())
})
.await?;
}
while let Some(cs) = stream.try_next().await? {
if let CS::Change(hash) = cs {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
}
hashes.push(cs);
let mut ws = ApplyWorkspace::new();
self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
pin_mut!(stream);
{
let txn = &mut *txn;
let hashes = &mut hashes;
let mut hashes = Vec::new();
let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
tag: &[Hash],
) -> Result<(), anyhow::Error> {
channel: &ChannelRef<T>,
txn: &mut T,
repo: &Repository,
&mut self,
pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
for item in items {
let item = *item.borrow();
tasks.push(make_download_job(item));
};
}
// there is probably a way to model this using futures, but I
// couldn't find a nice way to do it
// so here, have three funny collections
let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
let mut fetched = HashSet::new();
let make_download_job = |cs| async move {
match cx.download(cs) {
Ok(v) => (cs, v.await),
Err(e) => (cs, Err(e)),
fn download_changes_rec<'a, I>(
cx: &'a DownloadContext,
repo: &'a Repository,
items: I,
) -> impl Stream<Item = anyhow::Result<CS>> + 'a
where
I: IntoIterator + 'a,
I::Item: Borrow<CS>,
{
try_stream(move |sender| async move {
let mut tasks = FuturesUnordered::new();
Ok(result)
debug!("finished");
}
}
let mut result = Vec::with_capacity(to_apply_inodes.len());
for h in to_apply {
if to_apply_inodes.contains(&h) {
result.push(*h)
}
let mut ws = ApplyWorkspace::new();
while let Some(h) = stream.try_next().await? {
debug!("to_apply: {:?}", h);
let mut touches_inodes = inodes.is_empty();
let mut ws = ApplyWorkspace::new();
while let Some(h) = stream.try_next().await? {
debug!("to_apply: {:?}", h);
let mut touches_inodes = inodes.is_empty();
debug!("inodes = {:?}", inodes);
if !touches_inodes {
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
touches_inodes |= changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
})
})
}
}
if !touches_inodes {
touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
}
if !touches_inodes {
continue;
}
to_apply_inodes.insert(h);
if let Some(apply_bar) = &apply_bar {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(ref h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
debug!("applied");
} else {
debug!("not applying {:?}", h)
}
}
Ok(())
})
.await?;
}
let mut result = Vec::with_capacity(to_apply_inodes.len());
for h in to_apply {
if to_apply_inodes.contains(&h) {
result.push(*h)
}
}
debug!("finished");
Ok(result)
}
pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &Repository,
txn: &mut T,
channel: &ChannelRef<T>,
tag: &[Hash],
) -> Result<(), anyhow::Error> {
let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
let mut hashes = Vec::new();
{
let txn = &mut *txn;
let hashes = &mut hashes;
self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
pin_mut!(stream);
let mut ws = ApplyWorkspace::new();
while let Some(cs) = stream.try_next().await? {
if let CS::Change(hash) = cs {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
}
hashes.push(cs);
}
Ok(())
})
.await?;
}
self.complete_changes(repo, txn, channel, &hashes, false)
.await?;
Ok(())
}
pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
state: Merkle,
) -> Result<(), anyhow::Error> {
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(());
};
self.update_changelist(txn, &[]).await?;
let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let mut to_pull = Vec::new();
let mut found = false;
for x in txn.iter_remote(&remote.lock().remote, 0)? {
let (n, p) = x?;
debug!("{:?} {:?}", n, p);
to_pull.push(CS::Change(p.a.into()));
if p.b == state {
found = true;
break;
}
}
if !found {
bail!("State not found: {:?}", state)
}
self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
.await?;
self.update_identities(repo, &remote).await?;
self.complete_changes(repo, txn, channel, &to_pull, false)
.await?;
Ok(())
}
pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
&mut self,
repo: &pijul_repository::Repository,
txn: &T,
local_channel: &ChannelRef<T>,
changes: &[CS],
full: bool,
) -> Result<(), anyhow::Error> {
debug!("complete changes {:?}", changes);
let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
let mut waiting = Vec::new();
for c in changes {
let CS::Change(c) = c else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
}
if full {
waiting.push(cx.download(CS::Change(*c))?);
continue;
}
let Some(&change) = txn.get_internal(&sc)? else {
debug!("could not find internal for {:?}", sc);
continue;
};
// Check if at least one non-empty vertex from c is still alive.
let v = libpijul::pristine::Vertex {
change,
start: ChangePosition(0u64.into()),
end: ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
waiting.push(cx.download(CS::Change(*c))?);
break;
}
}
}
for w in waiting {
w.await?;
}
Ok(())
})
.await?;
Ok(())
}
pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
local_channel: &mut ChannelRef<T>,
path: &[String],
) -> Result<(), anyhow::Error> {
let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
x
} else {
bail!("Channel not found")
};
let mut pullable = Vec::new();
{
let rem = remote_changes.lock();
for x in txn.iter_remote(&rem.remote, 0)? {
let (_, p) = x?;
pullable.push(CS::Change(p.a.into()))
}
}
self.pull(repo, txn, local_channel, &pullable, &inodes, true)
.await?;
self.update_identities(repo, &remote_changes).await?;
self.complete_changes(repo, txn, local_channel, &pullable, false)
.await?;
Ok(())
}
}
lazy_static! {
static ref CHANGELIST_LINE: Regex = Regex::new(
r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
)
.unwrap();
static ref PATHS_LINE: Regex =
Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
}
enum ListLine {
Change {
n: u64,
h: Hash,
m: Merkle,
tag: bool,
},
Position(Position<Hash>),
Error(String),
}
fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
debug!("data = {:?}", data);
if let Some(caps) = CHANGELIST_LINE.captures(data) {
if let (Some(h), Some(m)) = (
Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
) {
return Ok(ListLine::Change {
n: caps.name("num").unwrap().as_str().parse().unwrap(),
h,
m,
tag: caps.name("tag").is_some(),
});
}
}
if data.starts_with("error:") {
return Ok(ListLine::Error(data.split_at(6).1.to_string()));
}
if let Some(caps) = PATHS_LINE.captures(data) {
return Ok(ListLine::Position(Position {
change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
pos: ChangePosition(
caps.name("num")
.unwrap()
.as_str()
.parse::<u64>()
.unwrap()
.into(),
),
}));
}
debug!("offending line: {:?}", data);
bail!("Protocol error")
}
/// Compare the remote set (theirs_ge_dichotomy) with our current
/// version of that (ours_ge_dichotomy) and return the changes in our
/// current version that are not in the remote anymore.
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
txn: &T,
current_channel: &ChannelRef<T>,
ours_ge_dichotomy: &[(u64, CS)],
theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
let mut remote_unrecs = Vec::new();
for (n, hash) in ours_ge_dichotomy {
debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
if theirs_ge_dichotomy_set.contains(hash) {
// If this change is still present in the remote, skip
debug!("still present");
continue;
} else {
let has_it = match hash {
CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
CS::State(state) => {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
txn.is_tagged(txn.tags(&*ch), n.into())?
} else {
false
}
}
};
if has_it {
remote_unrecs.push((*n, *hash))
} else {
// If this unrecord wasn't in our current channel, skip
continue;
}
}
}
Ok(remote_unrecs)
}
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
F: Future<Output = Result<(), E>>,
{
stream(|sender| {
let fut = op(sender.clone());
async move {
if let Err(e) = fut.await {
let _ = sender.send(Err(e));
}
}
})
}
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>,
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output = ()>,
{
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
fn download_changes_rec<'a, I>(
cx: &'a DownloadContext,
repo: &'a Repository,
items: I,
) -> impl Stream<Item = anyhow::Result<CS>> + 'a
where
I: IntoIterator + 'a,
I::Item: Borrow<CS>,
{
try_stream(move |sender| async move {
let mut tasks = FuturesUnordered::new();
// there is probably a way to model this using futures, but I
// couldn't find a nice way to do it
// so here, have three funny collections
let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
let mut fetched = HashSet::new();
let make_download_job = |cs| async move {
match cx.download(cs) {
Ok(v) => (cs, v.await),
Err(e) => (cs, Err(e)),
}
};
for item in items {
let item = *item.borrow();
tasks.push(make_download_job(item));
if let CS::Change(hash) = item {
rev_deps.insert(hash, Vec::new());
}
}