WIP

dblsaiko
Apr 5, 2024, 6:41 PM
ZLDJRMVBOGTLW6BAS3IOVYXR4BLUZ2YWGWIIPA3NFUG4RRTOGZKQC

Dependencies

  • [2] 3E2KY6Y4 Deterministic ordering of pulled patches
  • [3] 5Z2Y7VGV Migrate `pijul::identity::Complete::prove` to `pijul::remote::prove`
  • [4] RIZ4IP76 Solving more conflicts
  • [5] LKIKT4FR [2/?] Clean up change file name handling
  • [6] 6WIH3U43 WIP
  • [7] GNMZNKB4 Cursors cleanup
  • [8] 5SLOJYHG Fixing the Git feature
  • [9] ISCWVXO6 Progress bar for push
  • [10] PYTC7DPV Partial clones: including all the subtree of a directory when selecting patches
  • [11] UDGL7ER2 Clean up change file name handling
  • [12] I52XSRUH Massive cleanup, and simplification
  • [13] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [14] ZDK3GNDB Tag transactions (including a massive refactoring of errors)
  • [15] G7HJHNFD Migrate from `pijul_interaction::progress` to `pijul_interaction`
  • [16] K6GWUOD5 Styling progress bars
  • [17] 4RV7T4SR Migrate from `pijul::config` to `pijul-config`
  • [18] X6YFD4WV Do not download changes if we already have them
  • [19] YN63NUZO Sanakirja 1.0
  • [20] OYN2YVPA Create `pijul_remote` crate
  • [21] C3L2TLQW When downloading changes, check whether we have their dependencies and download them too
  • [22] 5XMUEZMZ pijul-clone: avoid panics on parsing remote URLs
  • [23] HXEIH4UQ Pulling more than 100 changes at once
  • [24] DVBSW7SI Bump dependencies with minor-level changes
  • [25] 2D7P2VKJ Change completions (where the whole progress bar story started)
  • [26] EUZFFJSO Updating Pijul with the latest changes in Libpijul
  • [27] TKEVOH7H Fixing a bug when downloading changes, and making change download more efficient (more async)
  • [28] ZWVYH7WP Pulling local tags
  • [29] Z6ASIMOR Avoid increasing the download progress bar length if we need the same dependency more than once
  • [30] PNJL5TPZ Version bump
  • [31] OGJFEWHU Fixing missing dependencies on partial clones
  • [32] 5QTMRUXN Fixing a race condition between progress bars
  • [33] JUYSZJSH Migrate from `pijul::progress` to `pijul_interaction::progress`
  • [34] RVAH6PXA Getting libpijul to compile to WASM32
  • [35] EJ7TFFOW Re-adding Cargo.lock
  • [36] IVLLXQ5Z Improved push/pull reporting
  • [37] 3WO4H2MM Fixing async issues in downloads
  • [38] A3RM526Y Integrating identity malleability
  • [39] DO2Y5TY5 Tag synchronisation
  • [40] XQHABMC2 Do not block when there is no patch to pull
  • [41] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)
  • [42] 4HTHYIA3 Fixing HTTP download
  • [43] QWIYNMI5 Formatting + big-endian Sanakirja
  • [44] WCA7X6W6 Create `pijul-repository` crate
  • [45] A6JQQNNJ More appropriate debug messages
  • [46] UDHP4ZVB Fixing SSH asynchronicity issues
  • [47] Q45QHPO4 Feedback on network stuff
  • [48] L4JXJHWX pijul/*: reorganize imports and remove extern crate
  • [49] IBPVOKM5 Fixing a bug in patch download
  • [50] HKHMES6T Solving conflicts
  • [51] BNPSVXIC Friendlier progress bars

Change contents

  • edit in pijul-remote/src/lib.rs at line 1
    [7.52474]
    [7.853]
    use std::cell::RefCell;
  • edit in pijul-remote/src/lib.rs at line 3
    [7.884]
    [3.0]
    use std::future::Future;
  • edit in pijul-remote/src/lib.rs at line 13
    [6.1902]
    [7.363]
    use futures_util::future::{FusedFuture, LocalBoxFuture};
    use futures_util::stream::{FuturesUnordered, Once};
    use futures_util::{FutureExt, stream, StreamExt};
    use futures_util::task::LocalSpawnExt;
  • replacement in pijul-remote/src/lib.rs at line 18
    [7.393][7.2672:2711]()
    use libpijul::changestore::filesystem;
    [7.393]
    [7.2711]
    use libpijul::changestore::{ChangeStore, filesystem};
  • replacement in pijul-remote/src/lib.rs at line 24
    [7.137][7.52599:52622](),[7.602][7.52599:52622](),[7.1128][7.52599:52622](),[7.52599][7.52599:52622]()
    use libpijul::DOT_DIR;
    [7.137]
    [7.138]
    use libpijul::{DOT_DIR, HashMap};
  • replacement in pijul-remote/src/lib.rs at line 41
    [7.344][6.1903:1913]()
    mod pipe;
    [7.344]
    [6.1913]
    mod context;
  • replacement in pijul-remote/src/lib.rs at line 1172
    [7.66586][6.1915:1920]()
    [7.66586]
    [7.66587]
  • replacement in pijul-remote/src/lib.rs at line 1174
    [7.66651][7.10619:10654]()
    pub async fn download_changes(
    [7.66651]
    [7.66691]
    async fn download_changes(
  • edit in pijul-remote/src/lib.rs at line 1251
    [7.1155][7.2674:2675](),[7.2674][7.2674:2675](),[7.2675][7.0:64]()
    let (mut send, recv) = tokio::sync::mpsc::channel(100);
  • edit in pijul-remote/src/lib.rs at line 1252
    [7.1207][7.1207:1274](),[7.1274][7.2676:2757](),[7.1472][7.1638:1694]()
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
    let cloned_download_bar = download_bar.clone();
  • replacement in pijul-remote/src/lib.rs at line 1253
    [5.52][7.1472:1532](),[7.1694][7.1472:1532](),[7.2806][7.1472:1532](),[7.3492][7.1472:1532](),[7.1472][7.1472:1532](),[7.1532][7.1695:1838](),[7.1838][5.53:87](),[5.87][7.1877:1922](),[7.3531][7.1877:1922](),[7.1877][7.1877:1922](),[7.1250][7.1619:1644](),[7.1922][7.1619:1644](),[7.3024][7.1619:1644](),[7.1619][7.1619:1644](),[7.1644][7.1923:1924](),[7.1924][7.1644:1698](),[7.1644][7.1644:1698](),[7.1698][7.69053:69054](),[7.69053][7.69053:69054](),[7.3095][7.65:94](),[7.94][7.0:77](),[7.77][7.0:40](),[7.40][7.231:259](),[7.77][7.231:259](),[7.94][7.231:259](),[7.1724][7.231:259](),[7.3095][7.231:259](),[7.231][7.231:259](),[7.259][7.78:118](),[7.118][7.3532:3533](),[7.3533][7.41:71](),[7.454][7.41:71](),[7.71][7.119:178](),[7.454][7.119:178](),[7.529][7.534:544](),[7.534][7.534:544](),[7.544][7.126:127](),[7.127][7.179:200](),[7.200][7.1925:2151](),[7.164][7.237:258](),[7.285][7.237:258](),[7.2151][7.237:258](),[7.237][7.237:258](),[7.258][7.0:1](),[7.592][7.0:1](),[7.2868][7.0:1](),[7.69159][7.0:1](),[7.69755][7.69755:69809](),[7.69809][2.0:50](),[2.50][7.286:340](),[7.1454][7.286:340](),[7.311][7.3174:3215](),[7.340][7.3174:3215](),[7.621][7.3174:3215](),[7.1456][7.1456:1641](),[7.1641][7.312:363](),[7.363][7.16600:16882](),[7.16600][7.16600:16882](),[7.16882][7.320:539](),[7.539][7.17273:17304](),[7.17273][7.17273:17304](),[7.17304][7.2262:2289](),[7.2262][7.2262:2289](),[7.2289][7.17305:17386](),[7.17386][7.2312:2330](),[7.2312][7.2312:2330](),[7.2330][7.364:437](),[7.437][7.2392:2425](),[7.17461][7.2392:2425](),[7.2392][7.2392:2425](),[7.2425][2.51:94](),[2.94][7.2467:2529](),[7.479][7.2467:2529](),[7.2467][7.2467:2529](),[7.2529][7.144:201](),[7.201][7.972:1015](),[7.1292][7.972:1015](),[7.2209][7.972:1015](),[7.3500][7.972:1015](),[7.972][7.972:1015](),[7.1015][7.2210:2244](),[7.221][7.3567:3600](),[7.1355][7.3567:3600](),[7.2244][7.3567:3600](),[7.3567][7.3567:3600](),[7.3600][7.17462:17560](),[7.17560][7.480:568](),[7.568][7.17643:17661](),[7.17643][7.17643:17661](),[7.139][7.3601:3636](),[7.17661][7.3601:3636](),[7.7732][7.3601:3636](),[7.3636][7.70594:70662](),[7.7732][7.70594:70662](),[7.70594][7.70594:70662]()
    let t = tokio::spawn(async move {
    self_
    .download_changes(
    cloned_download_bar,
    &mut hash_recv,
    &mut send,
    &changes_dir,
    false,
    )
    .await?;
    Ok::<_, anyhow::Error>(self_)
    });
    let mut waiting = 0;
    let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
    let mut asked = HashSet::new();
    for h in to_apply {
    debug!("to_apply {:?}", h);
    asked.insert(*h);
    hash_send.send(*h)?;
    waiting += 1;
    }
    let u = self
    .download_changes_rec(
    repo,
    hash_send,
    recv,
    send_ready,
    download_bar,
    waiting,
    asked,
    )
    .await?;
    let mut ws = libpijul::ApplyWorkspace::new();
    let mut to_apply_inodes = HashSet::new();
    while let Some(h) = recv_ready.recv().await {
    debug!("to_apply: {:?}", h);
    let touches_inodes = inodes.is_empty()
    || {
    debug!("inodes = {:?}", inodes);
    use libpijul::changestore::ChangeStore;
    if let CS::Change(ref h) = h {
    let changes = repo.changes.get_changes(h)?;
    changes.iter().any(|c| {
    c.iter().any(|c| {
    let inode = c.inode();
    debug!("inode = {:?}", inode);
    inodes.contains(&Position {
    change: inode.change.unwrap_or(*h),
    pos: inode.pos,
    })
    })
    })
    } else {
    false
    }
    }
    || { inodes.iter().any(|i| CS::Change(i.change) == h) };
    if touches_inodes {
    to_apply_inodes.insert(h);
    } else {
    continue;
    }
    if let Some(apply_bar) = apply_bar.clone() {
    info!("Applying {:?}", h);
    apply_bar.inc(1);
    debug!("apply");
    if let CS::Change(h) = h {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
    }
    debug!("applied");
    } else {
    debug!("not applying {:?}", h)
    [5.52]
    [2.95]
    self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
    for h in to_apply {
  • replacement in pijul-remote/src/lib.rs at line 1260
    [2.109][2.109:119]()
    }
    [2.109]
    [2.119]
    Ok(())
    }).await?;
  • replacement in pijul-remote/src/lib.rs at line 1264
    [2.120][2.120:294](),[2.294][7.1167:1191](),[7.1167][7.1167:1191]()
    let mut result = Vec::with_capacity(to_apply_inodes.len());
    for h in to_apply {
    if to_apply_inodes.contains(&h) {
    result.push(*h)
    }
    }
    [2.120]
    [7.7755]
    todo!()
  • replacement in pijul-remote/src/lib.rs at line 1266
    [7.7756][7.3637:3665](),[7.1191][7.3637:3665](),[7.70716][7.700:747](),[7.747][7.294:321](),[7.70716][7.294:321](),[7.321][7.569:588](),[7.1405][2.295:314]()
    debug!("finished");
    debug!("waiting for spawned process");
    *self = t.await??;
    u.await??;
    Ok(result)
    [7.7756]
    [7.70874]
    // let mut waiting = 0;
    // let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
    //
    // let mut asked = HashSet::new();
    // for h in to_apply {
    // debug!("to_apply {:?}", h);
    //
    // asked.insert(*h);
    // hash_send.send(*h)?;
    // waiting += 1;
    // }
    //
    // let u = self
    // .download_changes_rec(
    // repo,
    // hash_send,
    // recv,
    // send_ready,
    // download_bar,
    // waiting,
    // asked,
    // )
    // .await?;
    //
    // let mut ws = libpijul::ApplyWorkspace::new();
    // let mut to_apply_inodes = HashSet::new();
    // while let Some(h) = recv_ready.recv().await {
    // debug!("to_apply: {:?}", h);
    // let touches_inodes = inodes.is_empty()
    // || {
    // debug!("inodes = {:?}", inodes);
    // use libpijul::changestore::ChangeStore;
    // if let CS::Change(ref h) = h {
    // let changes = repo.changes.get_changes(h)?;
    // changes.iter().any(|c| {
    // c.iter().any(|c| {
    // let inode = c.inode();
    // debug!("inode = {:?}", inode);
    // inodes.contains(&Position {
    // change: inode.change.unwrap_or(*h),
    // pos: inode.pos,
    // })
    // })
    // })
    // } else {
    // false
    // }
    // }
    // || { inodes.iter().any(|i| CS::Change(i.change) == h) };
    //
    // if touches_inodes {
    // to_apply_inodes.insert(h);
    // } else {
    // continue;
    // }
    //
    // if let Some(apply_bar) = apply_bar.clone() {
    // info!("Applying {:?}", h);
    // apply_bar.inc(1);
    // debug!("apply");
    // if let CS::Change(h) = h {
    // let mut channel = channel.write();
    // txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
    // }
    // debug!("applied");
    // } else {
    // debug!("not applying {:?}", h)
    // }
    // }
    //
    // let mut result = Vec::with_capacity(to_apply_inodes.len());
    // for h in to_apply {
    // if to_apply_inodes.contains(&h) {
    // result.push(*h)
    // }
    // }
    //
    // debug!("finished");
    // debug!("waiting for spawned process");
    // *self = t.await??;
    // u.await??;
    // Ok(result)
  • replacement in pijul-remote/src/lib.rs at line 1350
    [7.590][7.590:625]()
    async fn download_changes_rec(
    [7.590]
    [7.625]
    async fn download_changes_rec<'a>(
  • replacement in pijul-remote/src/lib.rs at line 1352
    [7.644][7.644:800](),[7.800][7.341:392]()
    repo: &mut Repository,
    send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
    mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
    send_ready: tokio::sync::mpsc::Sender<CS>,
    [7.644]
    [7.2245]
    cx: DownloadContext<'a>,
    repo: &'a Repository,
  • replacement in pijul-remote/src/lib.rs at line 1355
    [7.2280][7.822:850](),[7.822][7.822:850](),[7.850][7.165:197](),[7.197][7.393:478](),[7.850][7.393:478](),[7.478][7.3534:3586](),[7.3586][7.1209:1295](),[7.1209][7.1209:1295](),[7.1295][7.3587:3629]()
    mut waiting: usize,
    mut asked: HashSet<CS>,
    ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
    let changes_dir = repo.changes_dir.clone();
    let changes = repo.changes.clone();
    let t = tokio::spawn(async move {
    let mut buf = PathBuf::new();
    [7.2280]
    [7.3629]
    items: impl IntoIterator<Item = CS> + 'a,
    ) -> impl Stream<Item=anyhow::Result<CS>> + 'a {
    try_stream(|sender| async move {
    struct State<'a> {
    cx: DownloadContext<'a>,
    repo: &'a Repository,
    sender: mpsc::Sender<anyhow::Result<CS>>,
    futs: FuturesUnordered<LocalBoxFuture<'a, anyhow::Result<()>>>,
    barriers: RefCell<HashMap<CS, watch::Receiver<()>>>,
    }
    let state = State {
    cx,
    repo,
    sender,
    futs: Default::default(),
    barriers: RefCell::new(Default::default()),
    };
    fn go<'a>(state: &'a State<'a>, cs: CS) -> watch::Receiver<()> {
    state.barriers.borrow_mut().entry(cs).or_insert_with(|| {
    let (tx, mut rx) = watch::channel(());
    rx.mark_unchanged();
  • replacement in pijul-remote/src/lib.rs at line 1379
    [7.3630][7.0:30](),[7.1295][7.0:30](),[7.30][7.2281:2312]()
    if waiting == 0 {
    return Ok(());
    [7.3630]
    [7.60]
    state.futs.push(async move {
    let _tx = tx;
    go2(state, cs).await?;
    state.sender.send(Ok(cs)).await?;
    Ok(())
    }.boxed_local());
    rx
    }).clone()
  • replacement in pijul-remote/src/lib.rs at line 1389
    [7.74][7.1295:1407](),[7.1295][7.1295:1407](),[7.1407][7.198:258](),[7.258][7.1407:1522](),[7.1407][7.1407:1522](),[7.1702][7.1702:1956]()
    let mut ready = Vec::new();
    while let Some((hash, follow)) = recv_signal.recv().await {
    debug!("received {:?} {:?}", hash, follow);
    if let CS::Change(hash) = hash {
    waiting -= 1;
    if follow {
    use libpijul::changestore::ChangeStore;
    let mut needs_dep = false;
    for dep in changes.get_dependencies(&hash)? {
    let dep: libpijul::pristine::Hash = dep;
    [7.74]
    [7.1956]
    async fn go2<'a>(state: &'a State<'a>, cs: CS) -> anyhow::Result<()> {
    let follow = state.cx.download(cs).await?;
  • replacement in pijul-remote/src/lib.rs at line 1393
    [7.1957][7.3631:3718](),[7.3718][7.2056:2136](),[7.2056][7.2056:2136]()
    let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
    let has_dep = std::fs::metadata(&dep_path).is_ok();
    [7.1957]
    [7.70880]
    let CS::Change(hash) = cs else {
    return Ok(());
    };
  • replacement in pijul-remote/src/lib.rs at line 1397
    [7.70881][7.2229:2321](),[7.2403][7.259:326](),[7.326][7.2313:2370](),[7.86][7.326:479](),[7.2370][7.326:479](),[7.326][7.326:479](),[7.479][7.2514:2570](),[7.2514][7.2514:2570]()
    if !has_dep {
    needs_dep = true;
    if asked.insert(CS::Change(dep)) {
    progress_bar.inc(1);
    send_hash.send(CS::Change(dep))?;
    waiting += 1
    }
    }
    }
    [7.70881]
    [7.2661]
    if !follow {
    return Ok(());
    }
    wait_for(state, state.repo.changes.get_dependencies(&hash)?.into_iter().map(CS::Change)).await;
    Ok(())
    }
    async fn wait_for<'a>(state: &'a State<'a>, items: impl IntoIterator<Item=CS>) {
    let mut v = Vec::new();
  • replacement in pijul-remote/src/lib.rs at line 1409
    [7.2662][7.2662:3005]()
    if !needs_dep {
    send_ready.send(CS::Change(hash)).await?;
    } else {
    ready.push(CS::Change(hash))
    }
    } else {
    send_ready.send(CS::Change(hash)).await?;
    }
    [7.2662]
    [7.3005]
    for item in items {
    v.push(go(state, item));
  • replacement in pijul-remote/src/lib.rs at line 1412
    [7.3023][7.3023:3084]()
    if waiting == 0 {
    break;
    [7.3023]
    [7.3084]
    for mut r in v {
    let _ = r.changed().await;
  • replacement in pijul-remote/src/lib.rs at line 1417
    [7.3116][7.480:520](),[7.520][7.3116:3188](),[7.3116][7.3116:3188]()
    info!("waiting loop done");
    for r in ready {
    send_ready.send(r).await?;
    [7.3116]
    [7.3188]
    state.futs.push(async move {
    wait_for(&state, items).await;
    Ok(())
    }.boxed_local());
    while let Some(v) = state.futs.next().await {
    v?;
  • replacement in pijul-remote/src/lib.rs at line 1426
    [7.3202][7.3202:3243]()
    std::mem::drop(recv_signal);
    [7.3202]
    [7.3243]
  • replacement in pijul-remote/src/lib.rs at line 1428
    [7.3262][7.3262:3274](),[7.3274][7.479:493]()
    });
    Ok(t)
    [7.3262]
    [7.3302]
    })
    // let t = tokio::spawn(async move {
    // let mut buf = PathBuf::new();
    //
    // if waiting == 0 {
    // return Ok(());
    // }
    // let mut ready = Vec::new();
    // while let Some((hash, follow)) = recv_signal.recv().await {
    // if let CS::Change(hash) = hash {
    // waiting -= 1;
    // if follow {
    // use libpijul::changestore::ChangeStore;
    // let mut needs_dep = false;
    // for dep in changes.get_dependencies(&hash)? {
    // let dep: libpijul::pristine::Hash = dep;
    //
    // let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
    // let has_dep = std::fs::metadata(&dep_path).is_ok();
    //
    // if !has_dep {
    // needs_dep = true;
    // if asked.insert(CS::Change(dep)) {
    // progress_bar.inc(1);
    // send_hash.send(CS::Change(dep))?;
    // waiting += 1
    // }
    // }
    // }
    //
    // if !needs_dep {
    // send_ready.send(CS::Change(hash)).await?;
    // } else {
    // ready.push(CS::Change(hash))
    // }
    // } else {
    // send_ready.send(CS::Change(hash)).await?;
    // }
    // }
    // if waiting == 0 {
    // break;
    // }
    // }
    // info!("waiting loop done");
    // for r in ready {
    // send_ready.send(r).await?;
    // }
    // std::mem::drop(recv_signal);
    // Ok(())
    // });
    // Ok(t)
  • edit in pijul-remote/src/lib.rs at line 1696
    [7.2980]
    [7.2980]
    use tokio::sync::{mpsc, watch};
    use tokio::task::LocalSet;
    use crate::context::DownloadContext;
  • edit in pijul-remote/src/lib.rs at line 1793
    [7.79733]
    fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
    where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
    {
    stream(|sender| {
    let fut = op(sender.clone());
    async move {
    if let Err(e) = fut.await {
    let _ = sender.send(Err(e));
    }
    }
    })
    }
    fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
    where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>
    {
    struct Impl<F, T> {
    fut: Once<F>,
    rx: mpsc::Receiver<T>,
    }
    impl<F, T> Stream for Impl<F, T>
    where
    F: Future<Output=()>,
    {
    type Item = T;
    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
    let (fut, mut rx) = unsafe {
    let this = self.get_unchecked_mut();
    (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
    };
    match (fut.poll_next(cx), rx.poll_recv(cx)) {
    (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
    (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
    _ => Poll::Pending,
    }
    }
    }
    let (tx, rx) = mpsc::channel(1);
    Impl {
    fut: stream::once(op(tx)),
    rx,
    }
    }
  • file move: pipe.rs (----------)context.rs (----------)
    [7.41]
    [6.32]
  • replacement in pijul-remote/src/context.rs at line 1
    [6.32][6.33:65]()
    use std::collections::BTreeMap;
    [6.32]
    [6.65]
    use anyhow::bail;
    use futures::pin_mut;
    use futures_util::future::Fuse;
    use futures_util::FutureExt;
    use std::cell::RefCell;
    use std::collections::hash_map::Entry;
  • replacement in pijul-remote/src/context.rs at line 8
    [6.90][6.90:202]()
    use std::path::{Path, PathBuf};
    use std::pin::Pin;
    use std::sync::Mutex;
    use std::task::{Context, Poll, Waker};
    [6.90]
    [6.202]
    use std::path::Path;
  • replacement in pijul-remote/src/context.rs at line 10
    [6.203][6.203:269]()
    use futures_util::future::BoxFuture;
    use futures_util::FutureExt;
    [6.203]
    [6.269]
    use libpijul::HashMap;
  • replacement in pijul-remote/src/context.rs at line 12
    [6.305][6.305:328]()
    use tokio::sync::mpsc;
    [6.305]
    [6.328]
    use tokio::sync::{mpsc, watch};
  • replacement in pijul-remote/src/context.rs at line 16
    [6.359][6.359:681]()
    pub struct FetchChangesState<'a> {
    shared: Mutex<Shared<'a>>,
    }
    struct Shared<'a> {
    task: BoxFuture<'a, Result<bool, anyhow::Error>>,
    changes: BTreeMap<CS, ChangeState>,
    }
    struct ChangeState {
    wakers: Vec<Waker>,
    }
    impl<'a> FetchChangesState<'a> {
    pub fn new<T>(
    remote: &'a mut RemoteRepo,
    [6.359]
    [6.681]
    impl RemoteRepo {
    pub async fn download_changes_with<C, F, R>(
    &mut self,
  • replacement in pijul-remote/src/context.rs at line 20
    [6.716][6.716:737]()
    dest_dir: T,
    [6.716]
    [6.737]
    dest_dir: &Path,
  • replacement in pijul-remote/src/context.rs at line 22
    [6.757][6.757:771]()
    ) -> Self
    [6.757]
    [6.771]
    op: C,
    ) -> anyhow::Result<R>
  • replacement in pijul-remote/src/context.rs at line 25
    [6.781][6.781:817]()
    T: AsRef<Path> + Send + 'a,
    [6.781]
    [6.817]
    C: FnOnce(DownloadContext) -> F,
    F: Future<Output = anyhow::Result<R>>,
  • edit in pijul-remote/src/context.rs at line 30
    [6.938]
    [6.938]
    let notifiers: RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>> =
    RefCell::new(Some(HashMap::new()));
    let dc = DownloadContext {
    data: Default::default(),
    notifiers: &notifiers,
    in_tx,
    };
    let download_task = self
    .download_changes(progress_bar, &mut in_rx, &mut out_tx, dest_dir, full)
    .fuse();
    let control_task = op(dc).fuse();
  • replacement in pijul-remote/src/context.rs at line 45
    [6.939][6.939:1230]()
    let task = async move {
    remote
    .download_changes(
    progress_bar,
    &mut in_rx,
    &mut out_tx,
    dest_dir.as_ref(),
    full,
    )
    .await
    [6.939]
    [6.1230]
    pin_mut!(download_task);
    pin_mut!(control_task);
    let mut out_rx = Some(out_rx);
    let mut dl_err = None;
    let mut control_res = None;
    loop {
    futures::select_biased! {
    next = out_rx.as_mut().map(|v| v.recv().fuse()).unwrap_or(Fuse::terminated()) => {
    let Some((cs, value)) = next else {
    out_rx.take();
    continue;
    };
    if let Some(ref notifiers) = *notifiers.borrow() {
    if let Some(tx) = notifiers.get(&cs) {
    let _ = tx.send(Some(value));
    }
    };
    }
    res = download_task => {
    notifiers.borrow_mut().take();
    dl_err = res.err();
    }
    res = control_task => {
    control_res = Some(res);
    }
    complete => break,
    }
  • edit in pijul-remote/src/context.rs at line 76
    [6.1240][6.1240:1258]()
    .boxed();
  • replacement in pijul-remote/src/context.rs at line 77
    [6.1259][6.1259:1410]()
    FetchChangesState {
    shared: Mutex::new(Shared {
    task,
    changes: Default::default(),
    }),
    [6.1259]
    [6.1410]
    let control_res = control_res.unwrap();
    match dl_err {
    None => control_res,
    Some(ce) => control_res.map_err(|e| e.context(ce)),
  • edit in pijul-remote/src/context.rs at line 84
    [6.1426]
    [6.1426]
    }
  • replacement in pijul-remote/src/context.rs at line 86
    [6.1427][6.1427:1499]()
    pub fn fetch(&mut self, cs: CS) -> FetchJob {
    todo!()
    }
    [6.1427]
    [6.1499]
    pub struct DownloadContext<'a> {
    data: RefCell<HashMap<CS, ChangeData>>,
    notifiers: &'a RefCell<Option<HashMap<CS, watch::Sender<Option<bool>>>>>,
    in_tx: mpsc::UnboundedSender<CS>,
  • replacement in pijul-remote/src/context.rs at line 92
    [6.1502][6.1502:1563]()
    pub struct FetchJob<'a> {
    shared: &'a Mutex<Shared<'a>>,
    [6.1502]
    [6.1563]
    struct ChangeData {
    rx: watch::Receiver<Option<bool>>,
  • replacement in pijul-remote/src/context.rs at line 96
    [6.1566][6.1566:1625]()
    impl<'a> Future for FetchJob<'a> {
    type Output = bool;
    [6.1566]
    [6.1625]
    impl<'a> DownloadContext<'a> {
    pub async fn download(&self, cs: CS) -> anyhow::Result<bool> {
    let mut rx = {
    let mut lock = self.data.borrow_mut();
  • replacement in pijul-remote/src/context.rs at line 101
    [6.1626][6.1626:1760]()
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut shared = self.shared.lock().unwrap();
    [6.1626]
    [6.1760]
    match lock.entry(cs) {
    Entry::Occupied(v) => v.get().rx.clone(),
    Entry::Vacant(v) => {
    let (tx, rx) = watch::channel(None);
  • replacement in pijul-remote/src/context.rs at line 106
    [6.1761][6.1761:1822]()
    shared.task.poll_unpin(cx);
    todo!()
    [6.1761]
    [6.1822]
    match *self.notifiers.borrow_mut() {
    None => {
    bail!("map is gone");
    }
    Some(ref mut v) => {
    assert!(v.insert(cs, tx).is_none());
    }
    }
    self.in_tx.send(cs)?;
    v.insert(ChangeData { rx }).rx.clone()
    }
    }
    };
    rx.changed().await?;
    let res = rx.borrow_and_update().unwrap();
    Ok(res)
  • replacement in Cargo.lock at line 2533
    [7.48913][7.11891:11910]()
    version = "0.2.13"
    [7.48913]
    [4.11212]
    version = "0.2.14"
  • replacement in Cargo.lock at line 2535
    [7.48996][7.11911:11989]()
    checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
    [7.48996]
    [4.11213]
    checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"