− Ok(())
− })
− .await?;
−
− }
− }
− debug!("not applying {:?}", h)
− } else {
− }
− }
−
− if !touches_inodes {
− touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
− }
−
− if !touches_inodes {
− continue;
− }
−
− to_apply_inodes.insert(h);
−
− if let Some(apply_bar) = &apply_bar {
− info!("Applying {:?}", h);
− apply_bar.inc(1);
− debug!("apply");
−
− if let CS::Change(ref h) = h {
− let mut channel = channel.write();
− txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
− }
−
− debug!("applied");
− })
− })
−
− if !touches_inodes {
− if let CS::Change(ref h) = h {
− let changes = repo.changes.get_changes(h)?;
−
− touches_inodes |= changes.iter().any(|c| {
− c.iter().any(|c| {
− let inode = c.inode();
− debug!("inode = {:?}", inode);
− inodes.contains(&Position {
− change: inode.change.unwrap_or(*h),
− pos: inode.pos,
− })
− debug!("inodes = {:?}", inodes);
−
− }
−
−
− if let CS::Change(hash) = item {
− rev_deps.insert(hash, Vec::new());
− }
− }
−
− tag: caps.name("tag").is_some(),
− }
−
− fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
− where
− C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
− F: Future<Output = Result<(), E>>,
− {
− stream(|sender| {
− let fut = op(sender.clone());
−
− async move {
− if let Err(e) = fut.await {
− let _ = sender.send(Err(e));
− }
− }
− })
− }
−
− /// Compare the remote set (theirs_ge_dichotomy) with our current
− /// version of that (ours_ge_dichotomy) and return the changes in our
− /// current version that are not in the remote anymore.
− fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
− txn: &T,
− current_channel: &ChannelRef<T>,
− ours_ge_dichotomy: &[(u64, CS)],
− theirs_ge_dichotomy_set: &HashSet<CS>,
− ) -> Result<Vec<(u64, CS)>, anyhow::Error> {
− let mut remote_unrecs = Vec::new();
− for (n, hash) in ours_ge_dichotomy {
− debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
− if theirs_ge_dichotomy_set.contains(hash) {
− // If this change is still present in the remote, skip
− debug!("still present");
− continue;
− } else {
− let has_it = match hash {
− CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
− CS::State(state) => {
− let ch = current_channel.read();
− if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
− txn.is_tagged(txn.tags(&*ch), n.into())?
− } else {
− false
− }
− }
− };
− if has_it {
− remote_unrecs.push((*n, *hash))
− } else {
− // If this unrecord wasn't in our current channel, skip
− continue;
− }
− }
− }
− Ok(remote_unrecs)
−
− fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
− where
− C: FnOnce(mpsc::Sender<T>) -> F,
− F: Future<Output = ()>,
− {
− struct Impl<F, T> {
− fut: Once<F>,
− rx: mpsc::Receiver<T>,
− }
−
− impl<F, T> Stream for Impl<F, T>
− where
− F: Future<Output = ()>,
− {
− type Item = T;
−
− fn poll_next(
− self: Pin<&mut Self>,
− cx: &mut std::task::Context<'_>,
− ) -> Poll<Option<Self::Item>> {
− let (fut, mut rx) = unsafe {
− let this = self.get_unchecked_mut();
− (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
− };
−
− match (fut.poll_next(cx), rx.poll_recv(cx)) {
− (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
− (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
− _ => Poll::Pending,
− }
− }
− }
−
− let (tx, rx) = mpsc::channel(1);
−
− Impl {
− fut: stream::once(op(tx)),
− rx,
− }
− }
−
− }
− debug!("offending line: {:?}", data);
− bail!("Protocol error")
− }
− }));
− pos: ChangePosition(
− caps.name("num")
− .unwrap()
− .as_str()
− .parse::<u64>()
− .unwrap()
− .into(),
− ),
− if let Some(caps) = PATHS_LINE.captures(data) {
− return Ok(ListLine::Position(Position {
− change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
− if data.starts_with("error:") {
− return Ok(ListLine::Error(data.split_at(6).1.to_string()));
− }
− });
− }
− }
− h,
− m,
− n: caps.name("num").unwrap().as_str().parse().unwrap(),
− if let Some(caps) = CHANGELIST_LINE.captures(data) {
− if let (Some(h), Some(m)) = (
− Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
− Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
− ) {
− return Ok(ListLine::Change {
− debug!("data = {:?}", data);
− }
−
− fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
− Error(String),
− Position(Position<Hash>),
− },
− tag: bool,
− Change {
− n: u64,
− h: Hash,
− m: Merkle,
− static ref PATHS_LINE: Regex =
− Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
− }
−
− enum ListLine {
− static ref CHANGELIST_LINE: Regex = Regex::new(
− r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
− )
− .unwrap();
− lazy_static! {
−
− }
− }
− Ok(())
− self.complete_changes(repo, txn, local_channel, &pullable, false)
− .await?;
− self.pull(repo, txn, local_channel, &pullable, &inodes, true)
− .await?;
− self.update_identities(repo, &remote_changes).await?;
−
− }
− }
− pullable.push(CS::Change(p.a.into()))
− for x in txn.iter_remote(&rem.remote, 0)? {
− let (_, p) = x?;
− let rem = remote_changes.lock();
− {
− let mut pullable = Vec::new();
− let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
− x
− } else {
− bail!("Channel not found")
− };
− path: &[String],
− ) -> Result<(), anyhow::Error> {
− &mut self,
− repo: &mut Repository,
− txn: &mut T,
− local_channel: &mut ChannelRef<T>,
− pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
− Ok(())
− }
−
−
− Ok(())
− })
− .await?;
−
− }
−
− for w in waiting {
− w.await?;
− }
− self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
− let mut waiting = Vec::new();
−
− for c in changes {
− let CS::Change(c) = c else { continue };
−
− let sc = c.into();
−
− if repo
− .changes
− .has_contents(*c, txn.get_internal(&sc)?.cloned())
− {
− debug!("has contents {:?}", c);
− continue;
− }
−
− if full {
− waiting.push(cx.download(CS::Change(*c))?);
− continue;
− }
−
− let Some(&change) = txn.get_internal(&sc)? else {
− debug!("could not find internal for {:?}", sc);
− continue;
− };
−
− // Check if at least one non-empty vertex from c is still alive.
− let v = libpijul::pristine::Vertex {
− change,
− start: ChangePosition(0u64.into()),
− end: ChangePosition(0u64.into()),
− };
−
− let channel = local_channel.read();
− let graph = txn.graph(&channel);
−
− for x in txn.iter_graph(graph, Some(&v))? {
− let (v, e) = x?;
− if v.change > change {
− break;
− } else if e.flag().is_alive_parent() {
− waiting.push(cx.download(CS::Change(*c))?);
− break;
− }
− }
−
− let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
− let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
−
− debug!("complete changes {:?}", changes);
− full: bool,
− ) -> Result<(), anyhow::Error> {
− changes: &[CS],
− local_channel: &ChannelRef<T>,
− txn: &T,
− repo: &pijul_repository::Repository,
− &mut self,
− pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
− Ok(())
− }
−
− .await?;
− self.complete_changes(repo, txn, channel, &to_pull, false)
− self.update_identities(repo, &remote).await?;
−
− .await?;
− self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
− }
− bail!("State not found: {:?}", state)
− found = true;
− break;
− }
− }
− if !found {
− if p.b == state {
− to_pull.push(CS::Change(p.a.into()));
− let (n, p) = x?;
− debug!("{:?} {:?}", n, p);
− for x in txn.iter_remote(&remote.lock().remote, 0)? {
− let mut to_pull = Vec::new();
− let mut found = false;
− let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
− self.update_changelist(txn, &[]).await?;
− let id = if let Some(id) = self.get_id(txn).await? {
− id
− } else {
− return Ok(());
− };
− ) -> Result<(), anyhow::Error> {
− &mut self,
− repo: &mut Repository,
− txn: &mut T,
− channel: &mut ChannelRef<T>,
− state: Merkle,
− pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
− Ok(())
− }
−
− self.complete_changes(repo, txn, channel, &hashes, false)
− .await?;
−
− }
−
− Ok(())
− })
− .await?;
− }
− while let Some(cs) = stream.try_next().await? {
− if let CS::Change(hash) = cs {
− let mut channel = channel.write();
− txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
− }
−
− hashes.push(cs);
−
− let mut ws = ApplyWorkspace::new();
−
− self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
− let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
− pin_mut!(stream);
−
− {
− let txn = &mut *txn;
− let hashes = &mut hashes;
−
− let mut hashes = Vec::new();
−
− let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
− tag: &[Hash],
− ) -> Result<(), anyhow::Error> {
− channel: &ChannelRef<T>,
− txn: &mut T,
− repo: &Repository,
− &mut self,
− pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
−
− for item in items {
− let item = *item.borrow();
− tasks.push(make_download_job(item));
−
− };
− }
− // there is probably a way to model this using futures, but I
− // couldn't find a nice way to do it
− // so here, have three funny collections
−
− let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
− let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
− let mut fetched = HashSet::new();
−
− let make_download_job = |cs| async move {
− match cx.download(cs) {
− Ok(v) => (cs, v.await),
− Err(e) => (cs, Err(e)),
−
− fn download_changes_rec<'a, I>(
− cx: &'a DownloadContext,
− repo: &'a Repository,
− items: I,
− ) -> impl Stream<Item = anyhow::Result<CS>> + 'a
− where
− I: IntoIterator + 'a,
− I::Item: Borrow<CS>,
− {
− try_stream(move |sender| async move {
− let mut tasks = FuturesUnordered::new();
−
− Ok(result)
− debug!("finished");
− }
− }
− let mut result = Vec::with_capacity(to_apply_inodes.len());
− for h in to_apply {
− if to_apply_inodes.contains(&h) {
− result.push(*h)
−
− }
−
− let mut ws = ApplyWorkspace::new();
−
− while let Some(h) = stream.try_next().await? {
− debug!("to_apply: {:?}", h);
−
− let mut touches_inodes = inodes.is_empty();
−
+
+ let mut ws = ApplyWorkspace::new();
+
+ while let Some(h) = stream.try_next().await? {
+ debug!("to_apply: {:?}", h);
+
+ let mut touches_inodes = inodes.is_empty();
+
+ debug!("inodes = {:?}", inodes);
+
+ if !touches_inodes {
+ if let CS::Change(ref h) = h {
+ let changes = repo.changes.get_changes(h)?;
+
+ touches_inodes |= changes.iter().any(|c| {
+ c.iter().any(|c| {
+ let inode = c.inode();
+ debug!("inode = {:?}", inode);
+ inodes.contains(&Position {
+ change: inode.change.unwrap_or(*h),
+ pos: inode.pos,
+ })
+ })
+ })
+ }
+ }
+
+ if !touches_inodes {
+ touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
+ }
+
+ if !touches_inodes {
+ continue;
+ }
+
+ to_apply_inodes.insert(h);
+
+ if let Some(apply_bar) = &apply_bar {
+ info!("Applying {:?}", h);
+ apply_bar.inc(1);
+ debug!("apply");
+
+ if let CS::Change(ref h) = h {
+ let mut channel = channel.write();
+ txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
+ }
+
+ debug!("applied");
+ } else {
+ debug!("not applying {:?}", h)
+ }
+ }
+
+ Ok(())
+ })
+ .await?;
+ }
+
+ let mut result = Vec::with_capacity(to_apply_inodes.len());
+ for h in to_apply {
+ if to_apply_inodes.contains(&h) {
+ result.push(*h)
+ }
+ }
+
+ debug!("finished");
+ Ok(result)
+ }
+
+ pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
+ &mut self,
+ repo: &Repository,
+ txn: &mut T,
+ channel: &ChannelRef<T>,
+ tag: &[Hash],
+ ) -> Result<(), anyhow::Error> {
+ let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
+
+ let mut hashes = Vec::new();
+
+ {
+ let txn = &mut *txn;
+ let hashes = &mut hashes;
+
+ self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
+ let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
+ pin_mut!(stream);
+
+ let mut ws = ApplyWorkspace::new();
+
+ while let Some(cs) = stream.try_next().await? {
+ if let CS::Change(hash) = cs {
+ let mut channel = channel.write();
+ txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
+ }
+
+ hashes.push(cs);
+ }
+
+ Ok(())
+ })
+ .await?;
+ }
+
+ self.complete_changes(repo, txn, channel, &hashes, false)
+ .await?;
+ Ok(())
+ }
+
+ pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
+ &mut self,
+ repo: &mut Repository,
+ txn: &mut T,
+ channel: &mut ChannelRef<T>,
+ state: Merkle,
+ ) -> Result<(), anyhow::Error> {
+ let id = if let Some(id) = self.get_id(txn).await? {
+ id
+ } else {
+ return Ok(());
+ };
+ self.update_changelist(txn, &[]).await?;
+ let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
+ let mut to_pull = Vec::new();
+ let mut found = false;
+ for x in txn.iter_remote(&remote.lock().remote, 0)? {
+ let (n, p) = x?;
+ debug!("{:?} {:?}", n, p);
+ to_pull.push(CS::Change(p.a.into()));
+ if p.b == state {
+ found = true;
+ break;
+ }
+ }
+ if !found {
+ bail!("State not found: {:?}", state)
+ }
+ self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
+ .await?;
+ self.update_identities(repo, &remote).await?;
+
+ self.complete_changes(repo, txn, channel, &to_pull, false)
+ .await?;
+ Ok(())
+ }
+
+ pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
+ &mut self,
+ repo: &pijul_repository::Repository,
+ txn: &T,
+ local_channel: &ChannelRef<T>,
+ changes: &[CS],
+ full: bool,
+ ) -> Result<(), anyhow::Error> {
+ debug!("complete changes {:?}", changes);
+
+ let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
+ let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
+
+ self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
+ let mut waiting = Vec::new();
+
+ for c in changes {
+ let CS::Change(c) = c else { continue };
+
+ let sc = c.into();
+
+ if repo
+ .changes
+ .has_contents(*c, txn.get_internal(&sc)?.cloned())
+ {
+ debug!("has contents {:?}", c);
+ continue;
+ }
+
+ if full {
+ waiting.push(cx.download(CS::Change(*c))?);
+ continue;
+ }
+
+ let Some(&change) = txn.get_internal(&sc)? else {
+ debug!("could not find internal for {:?}", sc);
+ continue;
+ };
+
+ // Check if at least one non-empty vertex from c is still alive.
+ let v = libpijul::pristine::Vertex {
+ change,
+ start: ChangePosition(0u64.into()),
+ end: ChangePosition(0u64.into()),
+ };
+
+ let channel = local_channel.read();
+ let graph = txn.graph(&channel);
+
+ for x in txn.iter_graph(graph, Some(&v))? {
+ let (v, e) = x?;
+ if v.change > change {
+ break;
+ } else if e.flag().is_alive_parent() {
+ waiting.push(cx.download(CS::Change(*c))?);
+ break;
+ }
+ }
+ }
+
+ for w in waiting {
+ w.await?;
+ }
+
+ Ok(())
+ })
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
+ &mut self,
+ repo: &mut Repository,
+ txn: &mut T,
+ local_channel: &mut ChannelRef<T>,
+ path: &[String],
+ ) -> Result<(), anyhow::Error> {
+ let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
+ x
+ } else {
+ bail!("Channel not found")
+ };
+ let mut pullable = Vec::new();
+ {
+ let rem = remote_changes.lock();
+ for x in txn.iter_remote(&rem.remote, 0)? {
+ let (_, p) = x?;
+ pullable.push(CS::Change(p.a.into()))
+ }
+ }
+ self.pull(repo, txn, local_channel, &pullable, &inodes, true)
+ .await?;
+ self.update_identities(repo, &remote_changes).await?;
+
+ self.complete_changes(repo, txn, local_channel, &pullable, false)
+ .await?;
+ Ok(())
+ }
+ }
+
+ lazy_static! {
+ static ref CHANGELIST_LINE: Regex = Regex::new(
+ r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
+ )
+ .unwrap();
+ static ref PATHS_LINE: Regex =
+ Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
+ }
+
+ enum ListLine {
+ Change {
+ n: u64,
+ h: Hash,
+ m: Merkle,
+ tag: bool,
+ },
+ Position(Position<Hash>),
+ Error(String),
+ }
+
+ fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
+ debug!("data = {:?}", data);
+ if let Some(caps) = CHANGELIST_LINE.captures(data) {
+ if let (Some(h), Some(m)) = (
+ Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
+ Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
+ ) {
+ return Ok(ListLine::Change {
+ n: caps.name("num").unwrap().as_str().parse().unwrap(),
+ h,
+ m,
+ tag: caps.name("tag").is_some(),
+ });
+ }
+ }
+ if data.starts_with("error:") {
+ return Ok(ListLine::Error(data.split_at(6).1.to_string()));
+ }
+ if let Some(caps) = PATHS_LINE.captures(data) {
+ return Ok(ListLine::Position(Position {
+ change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
+ pos: ChangePosition(
+ caps.name("num")
+ .unwrap()
+ .as_str()
+ .parse::<u64>()
+ .unwrap()
+ .into(),
+ ),
+ }));
+ }
+ debug!("offending line: {:?}", data);
+ bail!("Protocol error")
+ }
+
+ /// Compare the remote set (theirs_ge_dichotomy) with our current
+ /// version of that (ours_ge_dichotomy) and return the changes in our
+ /// current version that are not in the remote anymore.
+ fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
+ txn: &T,
+ current_channel: &ChannelRef<T>,
+ ours_ge_dichotomy: &[(u64, CS)],
+ theirs_ge_dichotomy_set: &HashSet<CS>,
+ ) -> Result<Vec<(u64, CS)>, anyhow::Error> {
+ let mut remote_unrecs = Vec::new();
+ for (n, hash) in ours_ge_dichotomy {
+ debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
+ if theirs_ge_dichotomy_set.contains(hash) {
+ // If this change is still present in the remote, skip
+ debug!("still present");
+ continue;
+ } else {
+ let has_it = match hash {
+ CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
+ CS::State(state) => {
+ let ch = current_channel.read();
+ if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
+ txn.is_tagged(txn.tags(&*ch), n.into())?
+ } else {
+ false
+ }
+ }
+ };
+ if has_it {
+ remote_unrecs.push((*n, *hash))
+ } else {
+ // If this unrecord wasn't in our current channel, skip
+ continue;
+ }
+ }
+ }
+ Ok(remote_unrecs)
+ }
+
+ fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
+ where
+ C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
+ F: Future<Output = Result<(), E>>,
+ {
+ stream(|sender| {
+ let fut = op(sender.clone());
+
+ async move {
+ if let Err(e) = fut.await {
+ let _ = sender.send(Err(e));
+ }
+ }
+ })
+ }
+
+ fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
+ where
+ C: FnOnce(mpsc::Sender<T>) -> F,
+ F: Future<Output = ()>,
+ {
+ struct Impl<F, T> {
+ fut: Once<F>,
+ rx: mpsc::Receiver<T>,
+ }
+
+ impl<F, T> Stream for Impl<F, T>
+ where
+ F: Future<Output = ()>,
+ {
+ type Item = T;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let (fut, mut rx) = unsafe {
+ let this = self.get_unchecked_mut();
+ (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
+ };
+
+ match (fut.poll_next(cx), rx.poll_recv(cx)) {
+ (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
+ (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
+ _ => Poll::Pending,
+ }
+ }
+ }
+
+ let (tx, rx) = mpsc::channel(1);
+
+ Impl {
+ fut: stream::once(op(tx)),
+ rx,
+ }
+ }
+
+ fn download_changes_rec<'a, I>(
+ cx: &'a DownloadContext,
+ repo: &'a Repository,
+ items: I,
+ ) -> impl Stream<Item = anyhow::Result<CS>> + 'a
+ where
+ I: IntoIterator + 'a,
+ I::Item: Borrow<CS>,
+ {
+ try_stream(move |sender| async move {
+ let mut tasks = FuturesUnordered::new();
+
+ // there is probably a way to model this using futures, but I
+ // couldn't find a nice way to do it
+ // so here, have three funny collections
+
+ let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
+ let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
+ let mut fetched = HashSet::new();
+
+ let make_download_job = |cs| async move {
+ match cx.download(cs) {
+ Ok(v) => (cs, v.await),
+ Err(e) => (cs, Err(e)),
+ }
+ };
+
+ for item in items {
+ let item = *item.borrow();
+ tasks.push(make_download_job(item));
+
+ if let CS::Change(hash) = item {
+ rev_deps.insert(hash, Vec::new());
+ }
+ }
+