[2/2] Pull code cleanup v1

dblsaiko
Apr 7, 2024, 1:57 PM
M32SWK6ZR3ZXXQCCGFCE4DZQIDKUGHESHECI6R2Q5WQLRWV52NTAC

Dependencies

  • [2] JGN2BVS5 Pull code cleanup v1
  • [3] HXEIH4UQ Pulling more than 100 changes at once
  • [4] 4XLHUME7 Fixing a interlocking when cloning from a particular patch
  • [5] JUYSZJSH Migrate from `pijul::progress` to `pijul_interaction::progress`
  • [6] PYTC7DPV Partial clones: including all the subtree of a directory when selecting patches
  • [7] G7HJHNFD Migrate from `pijul_interaction::progress` to `pijul_interaction`
  • [8] UDHP4ZVB Fixing SSH asynchronicity issues
  • [9] TKEVOH7H Fixing a bug when downloading changes, and making change download more efficient (more async)
  • [10] 5SLOJYHG Fixing the Git feature
  • [11] EUZFFJSO Updating Pijul with the latest changes in Libpijul
  • [12] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [13] I24UEJQL Various post-fire fixes
  • [14] 76PCXGML Pushing to, and pulling from the local repository
  • [15] L2VH4BYK Downloading changelists from channels without an id (Nest discussions)
  • [16] LKIKT4FR [2/?] Clean up change file name handling
  • [17] ZDK3GNDB Tag transactions (including a massive refactoring of errors)
  • [18] ZWVYH7WP Pulling local tags
  • [19] 4HTHYIA3 Fixing HTTP download
  • [20] A3RM526Y Integrating identity malleability
  • [21] ABQDWHNG Migrate from `pijul::repository` to `pijul-repository`
  • [22] DO2Y5TY5 Tag synchronisation
  • [23] XQHABMC2 Do not block when there is no patch to pull
  • [24] Q45QHPO4 Feedback on network stuff
  • [25] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)
  • [26] IBPVOKM5 Fixing a bug in patch download
  • [27] QWIYNMI5 Formatting + big-endian Sanakirja
  • [28] 2D7P2VKJ Change completions (where the whole progress bar story started)
  • [29] I52XSRUH Massive cleanup, and simplification
  • [30] BNPSVXIC Friendlier progress bars
  • [31] C5XGFNKI Simplify return type for remote get_id
  • [32] YN63NUZO Sanakirja 1.0
  • [33] UDGL7ER2 Clean up change file name handling
  • [34] 27RZYCM3 Pushing/pulling from/to Nest discussions again
  • [35] HKHMES6T Solving conflicts
  • [36] C3L2TLQW When downloading changes, check whether we have their dependencies and download them too
  • [37] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [38] 3E2KY6Y4 Deterministic ordering of pulled patches
  • [39] RM225IDQ Exchanging tagged states over SSH
  • [40] X6YFD4WV Do not download changes if we already have them
  • [41] 3WO4H2MM Fixing async issues in downloads
  • [42] 2GQCLJZG Displaying errors returned by the server in the protocol
  • [43] K6GWUOD5 Styling progress bars

Change contents

  • replacement in pijul-remote/src/lib.rs at line 1266
    [3.2393][2.3037:3096](),[3.437][3.2392:2393](),[3.17461][3.2392:2393](),[3.2392][3.2392:2393](),[3.17386][3.2312:2330](),[3.2312][3.2312:2330](),[2.3036][3.17364:17386](),[3.17364][3.17364:17386](),[3.17334][2.2981:3036](),[2.2980][3.17305:17334](),[3.2289][3.17305:17334](),[3.17304][2.2153:2980](),[3.539][3.17273:17304](),[3.17273][3.17273:17304](),[2.2152][3.504:539](),[3.504][3.504:539](),[3.1581][2.1499:2152](),[3.1528][3.1528:1581](),[3.1191][3.7755:7756](),[3.493][3.3302:3309](),[3.3302][3.3302:3309](),[2.8136][3.3532:3533](),[3.118][3.3532:3533](),[2.8015][2.8016:8136](),[3.70881][2.8015:8015](),[3.3818][3.18846:18895](),[3.20381][2.5695:6060](),[3.373][3.18896:20381](),[2.6061][2.6062:7077](),[3.79733][2.6061:6061](),[3.373][3.79731:79733](),[3.4202][3.79731:79733](),[2.6060][3.79731:79733](),[3.20381][3.79731:79733](),[3.79731][3.79731:79733](),[3.79713][3.303:373](),[3.4145][3.79707:79713](),[3.79707][3.79707:79713](),[3.8665][3.4132:4145](),[3.4132][3.4132:4145](),[3.4046][3.8429:8665](),[3.302][3.3850:4046](),[3.3850][3.3850:4046](),[3.3850][3.192:302](),[3.2676][3.3818:3850](),[3.18895][3.3818:3850](),[3.3818][3.3818:3850](),[3.191][3.3780:3818](),[3.3780][3.3780:3818](),[3.3716][3.119:191](),[3.78715][3.3406:3716](),[3.3405][3.78682:78715](),[3.78682][3.78682:78715](),[3.118][3.3339:3405](),[3.3339][3.3339:3405](),[3.3339][3.99:118](),[3.2620][3.3309:3339](),[3.3309][3.3309:3339](),[3.18845][3.2613:2620](),[3.2613][3.2613:2620](),[3.2588][3.18826:18845](),[3.3266][3.2523:2588](),[3.2522][3.3137:3266](),[3.3137][3.3137:3266](),[3.2996][3.2358:2522](),[3.2981][3.2981:2996](),[3.78607][3.78607:78608](),[3.950][3.78599:78607](),[3.2909][3.78599:78607](),[3.78599][3.78599:78607](),[3.2837][3.2894:2909](),[3.2894][3.2894:2909](),[3.1999][3.2742:2837](),[3.8609][3.2742:2837](),[3.2894][3.2742:2837](),[3.78459][3.1845:1999](),[3.8409][3.78449:78459](),[3.8428][3.78449:78459](),[3.78449][3.78449:78459](),[3.18825][3.8395:8409](),[3.8395][3.8395:8409](),[3.8353][3.18771:18825](),[3.917][3.8264:8353](),[3.8264][3.8264:8353](),[3.8218][3.872:917](),[3.78339][3.8208:8218](),[3.871][3.78300:78339](),[3.2988][3.78300:78339](),[3.78300][3.78300:78339](),[3.77979][3.691:871](),[3.77917][3.77917:77979](),[3.2741][3.77783:77897](),[3.4954][3.77783:77897](),[3.77783][3.77783:77897](),[3.77726][3.4875:4954](),[3.2671][3.77704:77726](),[2.5694][3.77704:77726](),[3.77704][3.77704:77726](),[3.76443][2.5645:5694](),[2.5644][3.76429:76443](),[3.76429][3.76429:76443](),[3.76261][2.5586:5644](),[2.5585][3.76247:76261](),[3.76247][3.76247:76261](),[3.76077][2.3996:5585](),[3.3376][3.76076:76077](),[3.76076][3.76076:76077](),[3.288][3.2835:2902](),[3.2835][3.2835:2902](),[3.2788][3.202:288](),[3.228][3.2787:2788](),[3.75363][3.2787:2788](),[3.75038][3.2057:2107](),[3.18533][3.74981:75038](),[3.74981][3.74981:75038](),[2.3995][3.18509:18533](),[3.74955][3.18509:18533](),[3.74912][2.3956:3995](),[3.721][3.74895:74912](),[3.74895][3.74895:74912](),[3.74849][3.676:721](),[3.855][3.74830:74849](),[3.2056][3.74830:74849](),[3.74830][3.74830:74849](),[3.74770][3.1987:2056](),[3.882][3.74748:74770](),[3.2757][3.74748:74770](),[3.74748][3.74748:74770](),[3.2117][3.2736:2757](),[3.2736][3.2736:2757](),[3.1844][3.2050:2117](),[3.2050][3.2050:2117](),[3.2050][3.1789:1844](),[3.1788][3.2029:2050](),[3.8207][3.2029:2050](),[3.2736][3.2029:2050](),[3.74687][3.1717:1788](),[3.9489][3.74677:74687](),[3.74677][3.74677:74687](),[3.74612][3.9439:9489](),[3.7944][3.74515:74612](),[3.74515][3.74515:74612](),[3.18508][3.7914:7944](),[3.7914][3.7914:7944](),[3.7876][3.18458:18508](),[3.641][3.7808:7876](),[3.685][3.7808:7876](),[3.9404][3.7808:7876](),[3.74338][3.579:641](),[3.8126][3.74269:74338](),[3.74269][3.74269:74338](),[3.73321][3.8043:8126](),[3.564][3.73272:73321](),[3.1567][3.73272:73321](),[3.8042][3.73272:73321](),[3.73272][3.73272:73321](),[3.73272][3.1436:1567](),[3.73235][3.73235:73272](),[3.2028][3.73084:73215](),[3.4874][3.73084:73215](),[3.73084][3.73084:73215](),[3.73029][3.4797:4874](),[3.1960][3.73007:73029](),[3.73007][3.73007:73029](),[3.3758][3.1873:1960](),[2.3955][3.1873:1960](),[3.73007][3.1873:1960](),[3.72935][2.3954:3955](),[3.344][3.72925:72935](),[3.1872][3.72925:72935](),[2.3953][3.72925:72935](),[3.7807][3.72925:72935](),[3.72925][3.72925:72935](),[3.18457][2.3893:3953](),[3.1403][3.18439:18457](),[2.3892][3.18439:18457](),[3.18439][3.18439:18457](),[3.72023][2.3563:3892](),[2.3562][3.72022:72023](),[3.3587][3.72022:72023](),[3.72022][3.72022:72023](),[3.572][2.3510:3562](),[2.3509][3.571:572](),[3.571][3.571:572](),[3.495][2.3279:3509](),[2.3278][3.494:495](),[3.72022][3.494:495](),[3.71943][2.3197:3278](),[2.3196][3.71942:71943](),[3.71942][3.71942:71943](),[3.2510][2.3159:3196](),[3.2509][3.2509:2510](),[3.140][3.2371:2453](),[3.3446][3.2371:2453](),[2.3158][3.71042:71101](),[3.71042][3.71042:71101](),[3.71005][2.3125:3158](),[2.3124][3.70984:71005](),[3.70984][3.70984:71005](),[3.70953][2.3097:3124](),[3.1471][3.70934:70953](),[3.4796][3.70934:70953](),[3.70934][3.70934:70953](),[3.3309][3.4721:4796](),[3.70881][3.4721:4796](),[3.2228][3.70880:70881](),[2.8014][3.70880:70881](),[3.70880][3.70880:70881](),[3.1957][2.7898:8014](),[2.7897][3.1956:1957](),[3.1956][3.1956:1957](),[3.74][2.7886:7897](),[3.218][3.60:74](),[3.2312][3.60:74](),[2.7885][3.60:74](),[3.60][3.60:74](),[3.3630][2.7379:7885](),[2.7378][3.3629:3630](),[3.3629][3.3629:3630](),[3.590][2.7078:7378](),[2.7077][3.589:590](),[3.70880][3.589:590](),[3.1405][3.295:314](),[3.7756][3.3637:3665](),[3.1191][3.3637:3665](),[3.294][3.1167:1191](),[3.1167][3.1167:1191](),[3.120][3.120:294](),[3.119][3.119:120](),[2.3096][3.109:119](),[3.109][3.109:119](),[3.258][3.0:1](),[3.592][3.0:1](),[2.1498][3.0:1](),[3.2868][3.0:1](),[3.69159][3.0:1](),[3.127][2.1268:1498](),[2.1267][3.126:127](),[3.544][3.126:127]()
    Ok(())
    })
    .await?;
    }
    }
    debug!("not applying {:?}", h)
    } else {
    }
    }
    if !touches_inodes {
    touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
    }
    if !touches_inodes {
    continue;
    }
    to_apply_inodes.insert(h);
    if let Some(apply_bar) = &apply_bar {
    info!("Applying {:?}", h);
    apply_bar.inc(1);
    debug!("apply");
    if let CS::Change(ref h) = h {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
    }
    debug!("applied");
    })
    })
    if !touches_inodes {
    if let CS::Change(ref h) = h {
    let changes = repo.changes.get_changes(h)?;
    touches_inodes |= changes.iter().any(|c| {
    c.iter().any(|c| {
    let inode = c.inode();
    debug!("inode = {:?}", inode);
    inodes.contains(&Position {
    change: inode.change.unwrap_or(*h),
    pos: inode.pos,
    })
    debug!("inodes = {:?}", inodes);
    }
    if let CS::Change(hash) = item {
    rev_deps.insert(hash, Vec::new());
    }
    }
    tag: caps.name("tag").is_some(),
    }
    fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
    where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
    {
    stream(|sender| {
    let fut = op(sender.clone());
    async move {
    if let Err(e) = fut.await {
    let _ = sender.send(Err(e));
    }
    }
    })
    }
    /// Compare the remote set (theirs_ge_dichotomy) with our current
    /// version of that (ours_ge_dichotomy) and return the changes in our
    /// current version that are not in the remote anymore.
    fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
    txn: &T,
    current_channel: &ChannelRef<T>,
    ours_ge_dichotomy: &[(u64, CS)],
    theirs_ge_dichotomy_set: &HashSet<CS>,
    ) -> Result<Vec<(u64, CS)>, anyhow::Error> {
    let mut remote_unrecs = Vec::new();
    for (n, hash) in ours_ge_dichotomy {
    debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
    if theirs_ge_dichotomy_set.contains(hash) {
    // If this change is still present in the remote, skip
    debug!("still present");
    continue;
    } else {
    let has_it = match hash {
    CS::Change(hash) => txn.get_revchanges(&current_channel, &hash)?.is_some(),
    CS::State(state) => {
    let ch = current_channel.read();
    if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
    txn.is_tagged(txn.tags(&*ch), n.into())?
    } else {
    false
    }
    }
    };
    if has_it {
    remote_unrecs.push((*n, *hash))
    } else {
    // If this unrecord wasn't in our current channel, skip
    continue;
    }
    }
    }
    Ok(remote_unrecs)
    fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
    where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>,
    {
    struct Impl<F, T> {
    fut: Once<F>,
    rx: mpsc::Receiver<T>,
    }
    impl<F, T> Stream for Impl<F, T>
    where
    F: Future<Output = ()>,
    {
    type Item = T;
    fn poll_next(
    self: Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
    let (fut, mut rx) = unsafe {
    let this = self.get_unchecked_mut();
    (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
    };
    match (fut.poll_next(cx), rx.poll_recv(cx)) {
    (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
    (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
    _ => Poll::Pending,
    }
    }
    }
    let (tx, rx) = mpsc::channel(1);
    Impl {
    fut: stream::once(op(tx)),
    rx,
    }
    }
    }
    debug!("offending line: {:?}", data);
    bail!("Protocol error")
    }
    }));
    pos: ChangePosition(
    caps.name("num")
    .unwrap()
    .as_str()
    .parse::<u64>()
    .unwrap()
    .into(),
    ),
    if let Some(caps) = PATHS_LINE.captures(data) {
    return Ok(ListLine::Position(Position {
    change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
    if data.starts_with("error:") {
    return Ok(ListLine::Error(data.split_at(6).1.to_string()));
    }
    });
    }
    }
    h,
    m,
    n: caps.name("num").unwrap().as_str().parse().unwrap(),
    if let Some(caps) = CHANGELIST_LINE.captures(data) {
    if let (Some(h), Some(m)) = (
    Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
    Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
    ) {
    return Ok(ListLine::Change {
    debug!("data = {:?}", data);
    }
    fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
    Error(String),
    Position(Position<Hash>),
    },
    tag: bool,
    Change {
    n: u64,
    h: Hash,
    m: Merkle,
    static ref PATHS_LINE: Regex =
    Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
    }
    enum ListLine {
    static ref CHANGELIST_LINE: Regex = Regex::new(
    r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
    )
    .unwrap();
    lazy_static! {
    }
    }
    Ok(())
    self.complete_changes(repo, txn, local_channel, &pullable, false)
    .await?;
    self.pull(repo, txn, local_channel, &pullable, &inodes, true)
    .await?;
    self.update_identities(repo, &remote_changes).await?;
    }
    }
    pullable.push(CS::Change(p.a.into()))
    for x in txn.iter_remote(&rem.remote, 0)? {
    let (_, p) = x?;
    let rem = remote_changes.lock();
    {
    let mut pullable = Vec::new();
    let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
    x
    } else {
    bail!("Channel not found")
    };
    path: &[String],
    ) -> Result<(), anyhow::Error> {
    &mut self,
    repo: &mut Repository,
    txn: &mut T,
    local_channel: &mut ChannelRef<T>,
    pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    Ok(())
    }
    Ok(())
    })
    .await?;
    }
    for w in waiting {
    w.await?;
    }
    self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
    let mut waiting = Vec::new();
    for c in changes {
    let CS::Change(c) = c else { continue };
    let sc = c.into();
    if repo
    .changes
    .has_contents(*c, txn.get_internal(&sc)?.cloned())
    {
    debug!("has contents {:?}", c);
    continue;
    }
    if full {
    waiting.push(cx.download(CS::Change(*c))?);
    continue;
    }
    let Some(&change) = txn.get_internal(&sc)? else {
    debug!("could not find internal for {:?}", sc);
    continue;
    };
    // Check if at least one non-empty vertex from c is still alive.
    let v = libpijul::pristine::Vertex {
    change,
    start: ChangePosition(0u64.into()),
    end: ChangePosition(0u64.into()),
    };
    let channel = local_channel.read();
    let graph = txn.graph(&channel);
    for x in txn.iter_graph(graph, Some(&v))? {
    let (v, e) = x?;
    if v.change > change {
    break;
    } else if e.flag().is_alive_parent() {
    waiting.push(cx.download(CS::Change(*c))?);
    break;
    }
    }
    let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
    let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
    debug!("complete changes {:?}", changes);
    full: bool,
    ) -> Result<(), anyhow::Error> {
    changes: &[CS],
    local_channel: &ChannelRef<T>,
    txn: &T,
    repo: &pijul_repository::Repository,
    &mut self,
    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
    Ok(())
    }
    .await?;
    self.complete_changes(repo, txn, channel, &to_pull, false)
    self.update_identities(repo, &remote).await?;
    .await?;
    self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
    }
    bail!("State not found: {:?}", state)
    found = true;
    break;
    }
    }
    if !found {
    if p.b == state {
    to_pull.push(CS::Change(p.a.into()));
    let (n, p) = x?;
    debug!("{:?} {:?}", n, p);
    for x in txn.iter_remote(&remote.lock().remote, 0)? {
    let mut to_pull = Vec::new();
    let mut found = false;
    let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
    self.update_changelist(txn, &[]).await?;
    let id = if let Some(id) = self.get_id(txn).await? {
    id
    } else {
    return Ok(());
    };
    ) -> Result<(), anyhow::Error> {
    &mut self,
    repo: &mut Repository,
    txn: &mut T,
    channel: &mut ChannelRef<T>,
    state: Merkle,
    pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    Ok(())
    }
    self.complete_changes(repo, txn, channel, &hashes, false)
    .await?;
    }
    Ok(())
    })
    .await?;
    }
    while let Some(cs) = stream.try_next().await? {
    if let CS::Change(hash) = cs {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
    }
    hashes.push(cs);
    let mut ws = ApplyWorkspace::new();
    self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
    let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
    pin_mut!(stream);
    {
    let txn = &mut *txn;
    let hashes = &mut hashes;
    let mut hashes = Vec::new();
    let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
    tag: &[Hash],
    ) -> Result<(), anyhow::Error> {
    channel: &ChannelRef<T>,
    txn: &mut T,
    repo: &Repository,
    &mut self,
    pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    for item in items {
    let item = *item.borrow();
    tasks.push(make_download_job(item));
    };
    }
    // there is probably a way to model this using futures, but I
    // couldn't find a nice way to do it
    // so here, have three funny collections
    let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
    let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
    let mut fetched = HashSet::new();
    let make_download_job = |cs| async move {
    match cx.download(cs) {
    Ok(v) => (cs, v.await),
    Err(e) => (cs, Err(e)),
    fn download_changes_rec<'a, I>(
    cx: &'a DownloadContext,
    repo: &'a Repository,
    items: I,
    ) -> impl Stream<Item = anyhow::Result<CS>> + 'a
    where
    I: IntoIterator + 'a,
    I::Item: Borrow<CS>,
    {
    try_stream(move |sender| async move {
    let mut tasks = FuturesUnordered::new();
    Ok(result)
    debug!("finished");
    }
    }
    let mut result = Vec::with_capacity(to_apply_inodes.len());
    for h in to_apply {
    if to_apply_inodes.contains(&h) {
    result.push(*h)
    }
    let mut ws = ApplyWorkspace::new();
    while let Some(h) = stream.try_next().await? {
    debug!("to_apply: {:?}", h);
    let mut touches_inodes = inodes.is_empty();
    [2.1267]
    [2.8137]
    let mut ws = ApplyWorkspace::new();
    while let Some(h) = stream.try_next().await? {
    debug!("to_apply: {:?}", h);
    let mut touches_inodes = inodes.is_empty();
    debug!("inodes = {:?}", inodes);
    if !touches_inodes {
    if let CS::Change(ref h) = h {
    let changes = repo.changes.get_changes(h)?;
    touches_inodes |= changes.iter().any(|c| {
    c.iter().any(|c| {
    let inode = c.inode();
    debug!("inode = {:?}", inode);
    inodes.contains(&Position {
    change: inode.change.unwrap_or(*h),
    pos: inode.pos,
    })
    })
    })
    }
    }
    if !touches_inodes {
    touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
    }
    if !touches_inodes {
    continue;
    }
    to_apply_inodes.insert(h);
    if let Some(apply_bar) = &apply_bar {
    info!("Applying {:?}", h);
    apply_bar.inc(1);
    debug!("apply");
    if let CS::Change(ref h) = h {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
    }
    debug!("applied");
    } else {
    debug!("not applying {:?}", h)
    }
    }
    Ok(())
    })
    .await?;
    }
    let mut result = Vec::with_capacity(to_apply_inodes.len());
    for h in to_apply {
    if to_apply_inodes.contains(&h) {
    result.push(*h)
    }
    }
    debug!("finished");
    Ok(result)
    }
    pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    &mut self,
    repo: &Repository,
    txn: &mut T,
    channel: &ChannelRef<T>,
    tag: &[Hash],
    ) -> Result<(), anyhow::Error> {
    let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
    let mut hashes = Vec::new();
    {
    let txn = &mut *txn;
    let hashes = &mut hashes;
    self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
    let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
    pin_mut!(stream);
    let mut ws = ApplyWorkspace::new();
    while let Some(cs) = stream.try_next().await? {
    if let CS::Change(hash) = cs {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
    }
    hashes.push(cs);
    }
    Ok(())
    })
    .await?;
    }
    self.complete_changes(repo, txn, channel, &hashes, false)
    .await?;
    Ok(())
    }
    pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    &mut self,
    repo: &mut Repository,
    txn: &mut T,
    channel: &mut ChannelRef<T>,
    state: Merkle,
    ) -> Result<(), anyhow::Error> {
    let id = if let Some(id) = self.get_id(txn).await? {
    id
    } else {
    return Ok(());
    };
    self.update_changelist(txn, &[]).await?;
    let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
    let mut to_pull = Vec::new();
    let mut found = false;
    for x in txn.iter_remote(&remote.lock().remote, 0)? {
    let (n, p) = x?;
    debug!("{:?} {:?}", n, p);
    to_pull.push(CS::Change(p.a.into()));
    if p.b == state {
    found = true;
    break;
    }
    }
    if !found {
    bail!("State not found: {:?}", state)
    }
    self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
    .await?;
    self.update_identities(repo, &remote).await?;
    self.complete_changes(repo, txn, channel, &to_pull, false)
    .await?;
    Ok(())
    }
    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
    &mut self,
    repo: &pijul_repository::Repository,
    txn: &T,
    local_channel: &ChannelRef<T>,
    changes: &[CS],
    full: bool,
    ) -> Result<(), anyhow::Error> {
    debug!("complete changes {:?}", changes);
    let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
    let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
    self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
    let mut waiting = Vec::new();
    for c in changes {
    let CS::Change(c) = c else { continue };
    let sc = c.into();
    if repo
    .changes
    .has_contents(*c, txn.get_internal(&sc)?.cloned())
    {
    debug!("has contents {:?}", c);
    continue;
    }
    if full {
    waiting.push(cx.download(CS::Change(*c))?);
    continue;
    }
    let Some(&change) = txn.get_internal(&sc)? else {
    debug!("could not find internal for {:?}", sc);
    continue;
    };
    // Check if at least one non-empty vertex from c is still alive.
    let v = libpijul::pristine::Vertex {
    change,
    start: ChangePosition(0u64.into()),
    end: ChangePosition(0u64.into()),
    };
    let channel = local_channel.read();
    let graph = txn.graph(&channel);
    for x in txn.iter_graph(graph, Some(&v))? {
    let (v, e) = x?;
    if v.change > change {
    break;
    } else if e.flag().is_alive_parent() {
    waiting.push(cx.download(CS::Change(*c))?);
    break;
    }
    }
    }
    for w in waiting {
    w.await?;
    }
    Ok(())
    })
    .await?;
    Ok(())
    }
    pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
    &mut self,
    repo: &mut Repository,
    txn: &mut T,
    local_channel: &mut ChannelRef<T>,
    path: &[String],
    ) -> Result<(), anyhow::Error> {
    let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
    x
    } else {
    bail!("Channel not found")
    };
    let mut pullable = Vec::new();
    {
    let rem = remote_changes.lock();
    for x in txn.iter_remote(&rem.remote, 0)? {
    let (_, p) = x?;
    pullable.push(CS::Change(p.a.into()))
    }
    }
    self.pull(repo, txn, local_channel, &pullable, &inodes, true)
    .await?;
    self.update_identities(repo, &remote_changes).await?;
    self.complete_changes(repo, txn, local_channel, &pullable, false)
    .await?;
    Ok(())
    }
    }
    lazy_static! {
    static ref CHANGELIST_LINE: Regex = Regex::new(
    r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
    )
    .unwrap();
    static ref PATHS_LINE: Regex =
    Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
    }
    enum ListLine {
    Change {
    n: u64,
    h: Hash,
    m: Merkle,
    tag: bool,
    },
    Position(Position<Hash>),
    Error(String),
    }
    fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
    debug!("data = {:?}", data);
    if let Some(caps) = CHANGELIST_LINE.captures(data) {
    if let (Some(h), Some(m)) = (
    Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
    Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
    ) {
    return Ok(ListLine::Change {
    n: caps.name("num").unwrap().as_str().parse().unwrap(),
    h,
    m,
    tag: caps.name("tag").is_some(),
    });
    }
    }
    if data.starts_with("error:") {
    return Ok(ListLine::Error(data.split_at(6).1.to_string()));
    }
    if let Some(caps) = PATHS_LINE.captures(data) {
    return Ok(ListLine::Position(Position {
    change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
    pos: ChangePosition(
    caps.name("num")
    .unwrap()
    .as_str()
    .parse::<u64>()
    .unwrap()
    .into(),
    ),
    }));
    }
    debug!("offending line: {:?}", data);
    bail!("Protocol error")
    }
    /// Compare the remote set (theirs_ge_dichotomy) with our current
    /// version of that (ours_ge_dichotomy) and return the changes in our
    /// current version that are not in the remote anymore.
    fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
    txn: &T,
    current_channel: &ChannelRef<T>,
    ours_ge_dichotomy: &[(u64, CS)],
    theirs_ge_dichotomy_set: &HashSet<CS>,
    ) -> Result<Vec<(u64, CS)>, anyhow::Error> {
    let mut remote_unrecs = Vec::new();
    for (n, hash) in ours_ge_dichotomy {
    debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
    if theirs_ge_dichotomy_set.contains(hash) {
    // If this change is still present in the remote, skip
    debug!("still present");
    continue;
    } else {
    let has_it = match hash {
    CS::Change(hash) => txn.get_revchanges(&current_channel, &hash)?.is_some(),
    CS::State(state) => {
    let ch = current_channel.read();
    if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
    txn.is_tagged(txn.tags(&*ch), n.into())?
    } else {
    false
    }
    }
    };
    if has_it {
    remote_unrecs.push((*n, *hash))
    } else {
    // If this unrecord wasn't in our current channel, skip
    continue;
    }
    }
    }
    Ok(remote_unrecs)
    }
    fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
    where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
    {
    stream(|sender| {
    let fut = op(sender.clone());
    async move {
    if let Err(e) = fut.await {
    let _ = sender.send(Err(e));
    }
    }
    })
    }
    fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
    where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>,
    {
    struct Impl<F, T> {
    fut: Once<F>,
    rx: mpsc::Receiver<T>,
    }
    impl<F, T> Stream for Impl<F, T>
    where
    F: Future<Output = ()>,
    {
    type Item = T;
    fn poll_next(
    self: Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
    let (fut, mut rx) = unsafe {
    let this = self.get_unchecked_mut();
    (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
    };
    match (fut.poll_next(cx), rx.poll_recv(cx)) {
    (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
    (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
    _ => Poll::Pending,
    }
    }
    }
    let (tx, rx) = mpsc::channel(1);
    Impl {
    fut: stream::once(op(tx)),
    rx,
    }
    }
    fn download_changes_rec<'a, I>(
    cx: &'a DownloadContext,
    repo: &'a Repository,
    items: I,
    ) -> impl Stream<Item = anyhow::Result<CS>> + 'a
    where
    I: IntoIterator + 'a,
    I::Item: Borrow<CS>,
    {
    try_stream(move |sender| async move {
    let mut tasks = FuturesUnordered::new();
    // there is probably a way to model this using futures, but I
    // couldn't find a nice way to do it
    // so here, have three funny collections
    let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
    let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
    let mut fetched = HashSet::new();
    let make_download_job = |cs| async move {
    match cx.download(cs) {
    Ok(v) => (cs, v.await),
    Err(e) => (cs, Err(e)),
    }
    };
    for item in items {
    let item = *item.borrow();
    tasks.push(make_download_job(item));
    if let CS::Change(hash) = item {
    rev_deps.insert(hash, Vec::new());
    }
    }