Pull code cleanup v1

dblsaiko
Apr 7, 2024, 1:57 PM
JGN2BVS5RYX6EFOG7WJGIPTSWG3KE757J5BUGCC64QWCUSXZKZPQC

Dependencies

  • [2] JLVRAJA5 Using iter_graph in pijul
  • [3] 3E2KY6Y4 Deterministic ordering of pulled patches
  • [4] QWIYNMI5 Formatting + big-endian Sanakirja
  • [5] 5Z2Y7VGV Migrate `pijul::identity::Complete::prove` to `pijul::remote::prove`
  • [6] G7HJHNFD Migrate from `pijul_interaction::progress` to `pijul_interaction`
  • [7] LKIKT4FR [2/?] Clean up change file name handling
  • [8] IBPVOKM5 Fixing a bug in patch download
  • [9] Q45QHPO4 Feedback on network stuff
  • [10] JUYSZJSH Migrate from `pijul::progress` to `pijul_interaction::progress`
  • [11] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [12] 5XMUEZMZ pijul-clone: avoid panics on parsing remote URLs
  • [13] 3WO4H2MM Fixing async issues in downloads
  • [14] A6JQQNNJ More appropriate debug messages
  • [15] 2D7P2VKJ Change completions (where the whole progress bar story started)
  • [16] UDGL7ER2 Clean up change file name handling
  • [17] L4JXJHWX pijul/*: reorganize imports and remove extern crate
  • [18] I52XSRUH Massive cleanup, and simplification
  • [19] TKEVOH7H Fixing a bug when downloading changes, and making change download more efficient (more async)
  • [20] BNPSVXIC Friendlier progress bars
  • [21] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [22] YN63NUZO Sanakirja 1.0
  • [23] UDHP4ZVB Fixing SSH asynchronicity issues
  • [24] A3RM526Y Integrating identity malleability
  • [25] IVLLXQ5Z Improved push/pull reporting
  • [26] X6YFD4WV Do not download changes if we already have them
  • [27] GNMZNKB4 Cursors cleanup
  • [28] ZWVYH7WP Pulling local tags
  • [29] 4XLHUME7 Fixing a interlocking when cloning from a particular patch
  • [30] XQHABMC2 Do not block when there is no patch to pull
  • [31] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)
  • [32] C3L2TLQW When downloading changes, check whether we have their dependencies and download them too
  • [33] 4RV7T4SR Migrate from `pijul::config` to `pijul-config`
  • [34] ABQDWHNG Migrate from `pijul::repository` to `pijul-repository`
  • [35] 5QTMRUXN Fixing a race condition between progress bars
  • [36] ZDK3GNDB Tag transactions (including a massive refactoring of errors)
  • [37] DO2Y5TY5 Tag synchronisation
  • [38] HXEIH4UQ Pulling more than 100 changes at once
  • [39] TPEH2XNB 1.0.0-alpha.28, with Tokio 1.0
  • [40] 4HTHYIA3 Fixing HTTP download
  • [41] 5SLOJYHG Fixing the Git feature
  • [42] Z6ASIMOR Avoid increasing the download progress bar length if we need the same dependency more than once
  • [43] HKHMES6T Solving conflicts
  • [44] PYTC7DPV Partial clones: including all the subtree of a directory when selecting patches
  • [45] OGJFEWHU Fixing missing dependencies on partial clones
  • [46] EUZFFJSO Updating Pijul with the latest changes in Libpijul
  • [47] I24UEJQL Various post-fire fixes

Change contents

  • edit in pijul-remote/src/lib.rs at line 1
    [8.52474]
    [8.853]
    use std::borrow::Borrow;
    use std::collections::hash_map::Entry;
  • edit in pijul-remote/src/lib.rs at line 4
    [8.884]
    [5.0]
    use std::future::Future;
  • replacement in pijul-remote/src/lib.rs at line 6
    [5.20][8.406:438](),[8.884][8.406:438](),[8.406][8.406:438]()
    use std::path::{Path, PathBuf};
    [5.20]
    [8.10335]
    use std::path::Path;
    use std::pin::Pin;
  • edit in pijul-remote/src/lib.rs at line 9
    [8.10355]
    [8.438]
    use std::task::Poll;
  • edit in pijul-remote/src/lib.rs at line 13
    [8.245]
    [8.363]
    use futures::{pin_mut, Stream};
    use futures_util::stream::{FuturesUnordered, Once};
    use futures_util::{stream, StreamExt, TryStreamExt};
  • replacement in pijul-remote/src/lib.rs at line 17
    [8.393][8.2672:2711](),[8.2711][8.2711:2764]()
    use libpijul::changestore::filesystem;
    use libpijul::changestore::filesystem::fmt_filename;
    [8.393]
    [8.0]
    use crate::context::DownloadContext;
    use libpijul::changestore::{filesystem, ChangeStore};
  • replacement in pijul-remote/src/lib.rs at line 24
    [8.137][8.52599:52622](),[8.602][8.52599:52622](),[8.1128][8.52599:52622](),[8.52599][8.52599:52622]()
    use libpijul::DOT_DIR;
    [8.137]
    [8.138]
    use libpijul::pristine::{ChangePosition, Position};
    use libpijul::{ApplyWorkspace, HashMap, DOT_DIR};
  • edit in pijul-remote/src/lib.rs at line 28
    [8.573]
    [8.456]
    use regex::Regex;
    use tokio::sync::mpsc;
  • edit in pijul-remote/src/lib.rs at line 44
    [8.344]
    [6.37]
    mod context;
  • replacement in pijul-remote/src/lib.rs at line 1177
    [8.66651][8.10619:10654]()
    pub async fn download_changes(
    [8.66651]
    [8.66691]
    async fn download_changes(
  • replacement in pijul-remote/src/lib.rs at line 1180
    [8.1040][8.15968:16031](),[8.398][8.15968:16031](),[8.16031][8.198:256]()
    hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
    send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
    [8.1040]
    [8.3414]
    mut hashes: mpsc::UnboundedReceiver<CS>,
    mut send: mpsc::Sender<(CS, bool)>,
  • replacement in pijul-remote/src/lib.rs at line 1188
    [8.1087][8.1087:1163]()
    l.download_changes(progress_bar, hashes, send, path).await?
    [8.1087]
    [8.1163]
    l.download_changes(progress_bar, &mut hashes, &mut send, path)
    .await?
  • replacement in pijul-remote/src/lib.rs at line 1192
    [8.1851][8.1178:1253]()
    s.download_changes(progress_bar, hashes, send, path, full)
    [8.1851]
    [8.1253]
    s.download_changes(progress_bar, &mut hashes, &mut send, path, full)
  • replacement in pijul-remote/src/lib.rs at line 1196
    [8.2009][8.1282:1357]()
    h.download_changes(progress_bar, hashes, send, path, full)
    [8.2009]
    [8.1357]
    h.download_changes(progress_bar, &mut hashes, &mut send, path, full)
  • edit in pijul-remote/src/lib.rs at line 1247
    [8.1155][8.2674:2675](),[8.2674][8.2674:2675](),[8.2675][8.0:64]()
    let (mut send, recv) = tokio::sync::mpsc::channel(100);
  • edit in pijul-remote/src/lib.rs at line 1256
    [8.1207][8.1207:1274](),[8.1274][8.2676:2757](),[8.1472][8.1638:1694]()
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
    let cloned_download_bar = download_bar.clone();
  • edit in pijul-remote/src/lib.rs at line 1257
    [7.52][8.1472:1532](),[8.1694][8.1472:1532](),[8.2806][8.1472:1532](),[8.3492][8.1472:1532](),[8.1472][8.1472:1532](),[8.1532][8.1695:1838](),[8.1838][7.53:87](),[7.87][8.1877:1922](),[8.3531][8.1877:1922](),[8.1877][8.1877:1922](),[8.1250][8.1619:1644](),[8.1922][8.1619:1644](),[8.3024][8.1619:1644](),[8.1619][8.1619:1644]()
    let t = tokio::spawn(async move {
    self_
    .download_changes(
    cloned_download_bar,
    &mut hash_recv,
    &mut send,
    &changes_dir,
    false,
    )
    .await?;
  • replacement in pijul-remote/src/lib.rs at line 1258
    [8.1924][8.1644:1698](),[8.1644][8.1644:1698]()
    Ok::<_, anyhow::Error>(self_)
    });
    [8.1924]
    [8.69053]
    let mut to_apply_inodes = HashSet::new();
  • replacement in pijul-remote/src/lib.rs at line 1260
    [8.3095][8.65:94](),[8.94][8.0:76]()
    let mut waiting = 0;
    let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
    [8.69054]
    [8.76]
    {
    let to_apply_inodes = &mut to_apply_inodes;
  • replacement in pijul-remote/src/lib.rs at line 1263
    [8.77][8.0:40](),[8.40][8.231:259](),[8.77][8.231:259](),[8.94][8.231:259](),[8.1724][8.231:259](),[8.3095][8.231:259](),[8.231][8.231:259](),[8.529][8.534:544](),[8.534][8.534:544]()
    let mut asked = HashSet::new();
    for h in to_apply {
    }
    [8.77]
    [8.126]
    self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
    let stream = download_changes_rec(&cx, repo, to_apply);
    pin_mut!(stream);
  • replacement in pijul-remote/src/lib.rs at line 1267
    [8.127][8.179:200](),[8.200][8.1925:2151](),[8.164][8.237:258](),[8.285][8.237:258](),[8.2151][8.237:258](),[8.237][8.237:258]()
    let u = self
    .download_changes_rec(
    repo,
    hash_send,
    recv,
    send_ready,
    download_bar,
    waiting,
    asked,
    )
    .await?;
    [8.127]
    [8.0]
    let mut ws = ApplyWorkspace::new();
    while let Some(h) = stream.try_next().await? {
    debug!("to_apply: {:?}", h);
    let mut touches_inodes = inodes.is_empty();
  • edit in pijul-remote/src/lib.rs at line 1274
    [8.69755][8.69755:69809](),[8.69809][3.0:50](),[3.50][8.286:340](),[8.1454][8.286:340](),[8.311][8.3174:3215](),[8.340][8.3174:3215](),[8.621][8.3174:3215](),[8.1456][8.1456:1528]()
    let mut ws = libpijul::ApplyWorkspace::new();
    let mut to_apply_inodes = HashSet::new();
    while let Some(h) = recv_ready.recv().await {
    debug!("to_apply: {:?}", h);
    let touches_inodes = inodes.is_empty()
    || {
  • replacement in pijul-remote/src/lib.rs at line 1275
    [8.1581][8.1581:1641](),[8.1641][8.312:363](),[8.363][8.16600:16882](),[8.16600][8.16600:16882](),[8.16882][8.320:504]()
    use libpijul::changestore::ChangeStore;
    if let CS::Change(ref h) = h {
    let changes = repo.changes.get_changes(h)?;
    changes.iter().any(|c| {
    c.iter().any(|c| {
    let inode = c.inode();
    debug!("inode = {:?}", inode);
    inodes.contains(&Position {
    change: inode.change.unwrap_or(*h),
    pos: inode.pos,
    [8.1581]
    [8.504]
    if !touches_inodes {
    if let CS::Change(ref h) = h {
    let changes = repo.changes.get_changes(h)?;
    touches_inodes |= changes.iter().any(|c| {
    c.iter().any(|c| {
    let inode = c.inode();
    debug!("inode = {:?}", inode);
    inodes.contains(&Position {
    change: inode.change.unwrap_or(*h),
    pos: inode.pos,
    })
  • replacement in pijul-remote/src/lib.rs at line 1290
    [8.17304][8.2262:2289](),[8.2262][8.2262:2289]()
    })
    [8.17304]
    [8.17305]
    }
    }
    if !touches_inodes {
    touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
    }
    if !touches_inodes {
    continue;
    }
    to_apply_inodes.insert(h);
    if let Some(apply_bar) = &apply_bar {
    info!("Applying {:?}", h);
    apply_bar.inc(1);
    debug!("apply");
    if let CS::Change(ref h) = h {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
    }
    debug!("applied");
  • replacement in pijul-remote/src/lib.rs at line 1315
    [8.17334][8.17334:17364]()
    false
    [8.17334]
    [8.17364]
    debug!("not applying {:?}", h)
  • edit in pijul-remote/src/lib.rs at line 1318
    [8.2330][8.364:437]()
    || { inodes.iter().any(|i| CS::Change(i.change) == h) };
  • replacement in pijul-remote/src/lib.rs at line 1319
    [8.2393][8.2393:2425](),[8.2425][3.51:94](),[3.94][8.2467:2528](),[8.479][8.2467:2528](),[8.2467][8.2467:2528](),[8.2528][8.2528:2529](),[8.2529][6.144:201](),[6.201][8.972:1015](),[8.1292][8.972:1015](),[8.2209][8.972:1015](),[8.3500][8.972:1015](),[8.972][8.972:1015](),[8.1015][8.2210:2244](),[8.221][8.3567:3600](),[8.1355][8.3567:3600](),[8.2244][8.3567:3600](),[8.3567][8.3567:3600](),[8.3600][8.17462:17560](),[8.17560][8.480:568](),[8.568][8.17643:17661](),[8.17643][8.17643:17661](),[8.139][8.3601:3636](),[8.17661][8.3601:3636](),[8.7732][8.3601:3636](),[8.3636][8.70594:70662](),[8.7732][8.70594:70662](),[8.70594][8.70594:70662](),[8.70662][3.95:109]()
    if touches_inodes {
    to_apply_inodes.insert(h);
    } else {
    continue;
    }
    if let Some(apply_bar) = apply_bar.clone() {
    info!("Applying {:?}", h);
    apply_bar.inc(1);
    debug!("apply");
    if let CS::Change(h) = h {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
    }
    debug!("applied");
    } else {
    debug!("not applying {:?}", h)
    }
    [8.2393]
    [3.109]
    Ok(())
    })
    .await?;
  • edit in pijul-remote/src/lib.rs at line 1332
    [8.70716][8.700:747](),[8.747][8.294:321](),[8.70716][8.294:321](),[8.321][8.569:588]()
    debug!("waiting for spawned process");
    *self = t.await??;
    u.await??;
  • edit in pijul-remote/src/lib.rs at line 1333
    [3.314][8.70874:70880](),[8.2558][8.70874:70880](),[8.70874][8.70874:70880](),[8.2661][8.2661:2662](),[8.2662][8.2662:3005](),[8.3005][8.3005:3023](),[8.3023][8.3023:3084](),[8.3084][8.3084:3116](),[8.3116][8.480:520](),[8.520][8.3116:3188](),[8.3116][8.3116:3188](),[8.3188][8.3188:3202](),[8.3202][8.3202:3243](),[8.3243][8.3243:3262](),[8.3262][8.3262:3274](),[8.3274][8.479:493]()
    }
    if !needs_dep {
    send_ready.send(CS::Change(hash)).await?;
    } else {
    ready.push(CS::Change(hash))
    }
    } else {
    send_ready.send(CS::Change(hash)).await?;
    }
    }
    if waiting == 0 {
    break;
    }
    }
    info!("waiting loop done");
    for r in ready {
    send_ready.send(r).await?;
    }
    std::mem::drop(recv_signal);
    Ok(())
    });
    Ok(t)
  • replacement in pijul-remote/src/lib.rs at line 1337
    [8.70953][8.70953:70984]()
    repo: &mut Repository,
    [8.70953]
    [8.70984]
    repo: &Repository,
  • replacement in pijul-remote/src/lib.rs at line 1339
    [8.71005][8.71005:71042]()
    channel: &mut ChannelRef<T>,
    [8.71005]
    [8.71042]
    channel: &ChannelRef<T>,
  • edit in pijul-remote/src/lib.rs at line 1342
    [8.71183][8.3692:3773](),[8.3773][8.3310:3388](),[8.3388][8.71319:71386](),[8.71319][8.71319:71386](),[8.71386][7.88:140]()
    let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
    let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let changes_dir = repo.changes_dir.clone();
  • edit in pijul-remote/src/lib.rs at line 1343
    [8.2453][8.2453:2509]()
    let cloned_download_bar = download_bar.clone();
  • replacement in pijul-remote/src/lib.rs at line 1344
    [8.1734][8.71386:71428](),[8.2510][8.71386:71428](),[8.3446][8.71386:71428](),[8.3830][8.71386:71428](),[8.71386][8.71386:71428](),[8.71428][8.1849:1867](),[8.1867][8.3831:3866](),[8.3866][8.2511:2552](),[8.1762][8.3910:3984](),[8.2552][8.3910:3984](),[8.3910][8.3910:3984](),[8.3984][7.141:175](),[7.175][8.4023:4068](),[8.4023][8.4023:4068](),[8.4068][8.1961:1986](),[8.1961][8.1961:1986](),[8.1986][8.71908:71942](),[8.71908][8.71908:71942]()
    let t = tokio::spawn(async move {
    self_
    .download_changes(
    cloned_download_bar,
    &mut recv_hash,
    &mut send_signal,
    &changes_dir,
    false,
    )
    .await?;
    Ok(self_)
    });
    [8.2510]
    [8.71942]
    let mut hashes = Vec::new();
  • replacement in pijul-remote/src/lib.rs at line 1346
    [8.71943][8.316:345](),[8.345][8.521:561](),[8.345][8.71943:71974](),[8.561][8.71943:71974](),[8.71943][8.71943:71974](),[8.71974][8.346:372](),[8.372][8.17662:17706](),[8.71974][8.17662:17706](),[8.17706][8.562:603](),[8.603][8.72012:72022](),[8.4101][8.72012:72022](),[8.17706][8.72012:72022](),[8.72012][8.72012:72022]()
    let mut waiting = 0;
    let mut asked = HashSet::new();
    for &h in tag.iter() {
    waiting += 1;
    send_hash.send(CS::Change(h))?;
    asked.insert(CS::Change(h));
    }
    [8.71943]
    [8.494]
    {
    let txn = &mut *txn;
    let hashes = &mut hashes;
  • replacement in pijul-remote/src/lib.rs at line 1350
    [8.495][8.495:571]()
    let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
    [8.495]
    [8.571]
    self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
    let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
    pin_mut!(stream);
  • replacement in pijul-remote/src/lib.rs at line 1354
    [8.572][8.572:593](),[8.593][8.2553:2786](),[4.445][8.3566:3587](),[8.685][8.3566:3587](),[8.703][8.3566:3587](),[8.2786][8.3566:3587](),[8.3566][8.3566:3587]()
    let u = self
    .download_changes_rec(
    repo,
    send_hash,
    recv_signal,
    send_ready,
    download_bar,
    waiting,
    asked,
    )
    .await?;
    [8.572]
    [8.72022]
    let mut ws = ApplyWorkspace::new();
  • replacement in pijul-remote/src/lib.rs at line 1356
    [8.72079][8.72079:72116](),[8.72752][8.72752:72806](),[8.72806][8.140:150](),[8.150][8.530:578](),[8.578][8.3588:3702](),[8.3702][8.1312:1403](),[8.18352][8.1312:1403]()
    let mut hashes = Vec::new();
    let mut ws = libpijul::ApplyWorkspace::new();
    {
    let mut channel_ = channel.write();
    while let Some(hash) = recv_ready.recv().await {
    if let CS::Change(ref hash) = hash {
    txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
    [8.72023]
    [8.18439]
    while let Some(cs) = stream.try_next().await? {
    if let CS::Change(hash) = cs {
    let mut channel = channel.write();
    txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
    }
    hashes.push(cs);
  • replacement in pijul-remote/src/lib.rs at line 1364
    [8.18457][8.3703:3738](),[8.3738][8.330:344](),[8.18457][8.330:344](),[8.330][8.330:344]()
    hashes.push(hash);
    }
    [8.18457]
    [8.72925]
    Ok(())
    })
    .await?;
  • replacement in pijul-remote/src/lib.rs at line 1369
    [8.72935][8.72935:73007](),[8.73007][8.3739:3758]()
    let r: Result<_, anyhow::Error> = t.await?;
    *self = r?;
    u.await??;
    [8.72935]
    [8.1873]
  • replacement in pijul-remote/src/lib.rs at line 1416
    [8.74912][8.74912:74955]()
    local_channel: &mut ChannelRef<T>,
    [8.74912]
    [8.18509]
    local_channel: &ChannelRef<T>,
  • edit in pijul-remote/src/lib.rs at line 1421
    [8.2107][8.75038:75086](),[8.75038][8.75038:75086](),[8.75086][8.4141:4222](),[8.917][8.75164:75307](),[8.4222][8.75164:75307](),[8.75164][8.75164:75307](),[8.75307][7.176:228]()
    use libpijul::changestore::ChangeStore;
    let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
    let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let changes_dir = repo.changes_dir.clone();
  • edit in pijul-remote/src/lib.rs at line 1424
    [8.2902][8.2902:3077](),[8.3077][6.289:327](),[6.327][8.3112:3191](),[8.3112][8.3112:3191](),[8.3191][7.229:267](),[7.267][8.3233:3376](),[8.3233][8.3233:3376]()
    let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
    tokio::spawn(async move {
    self_
    .download_changes(
    download_bar,
    &mut recv_hash,
    &mut send_sig,
    &changes_dir,
    true,
    )
    .await?;
    Ok::<_, anyhow::Error>(self_)
    });
  • replacement in pijul-remote/src/lib.rs at line 1425
    [8.76077][8.76077:76104](),[8.76104][8.530:600](),[8.600][8.7945:8102](),[8.18666][8.7945:8102](),[8.76104][8.7945:8102](),[8.8102][8.76173:76247](),[8.9560][8.76173:76247](),[8.76173][8.76173:76247]()
    for c in changes {
    let c = if let CS::Change(c) = c { c } else { continue };
    let sc = c.into();
    if repo
    .changes
    .has_contents(*c, txn.get_internal(&sc)?.cloned())
    {
    debug!("has contents {:?}", c);
    continue;
    [8.76077]
    [8.76247]
    self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
    let mut waiting = Vec::new();
    for c in changes {
    let CS::Change(c) = c else { continue };
    let sc = c.into();
    if repo
    .changes
    .has_contents(*c, txn.get_internal(&sc)?.cloned())
    {
    debug!("has contents {:?}", c);
    continue;
    }
    if full {
    waiting.push(cx.download(CS::Change(*c))?);
    continue;
    }
    let Some(&change) = txn.get_internal(&sc)? else {
    debug!("could not find internal for {:?}", sc);
    continue;
    };
    // Check if at least one non-empty vertex from c is still alive.
    let v = libpijul::pristine::Vertex {
    change,
    start: ChangePosition(0u64.into()),
    end: ChangePosition(0u64.into()),
    };
    let channel = local_channel.read();
    let graph = txn.graph(&channel);
    for x in txn.iter_graph(graph, Some(&v))? {
    let (v, e) = x?;
    if v.change > change {
    break;
    } else if e.flag().is_alive_parent() {
    waiting.push(cx.download(CS::Change(*c))?);
    break;
    }
    }
  • replacement in pijul-remote/src/lib.rs at line 1471
    [8.76261][8.76261:76328](),[8.2574][8.76371:76429](),[8.4550][8.76371:76429](),[8.76371][8.76371:76429]()
    if full {
    debug!("sending send_hash");
    debug!("sent");
    continue;
    [8.76261]
    [8.76429]
    for w in waiting {
    w.await?;
  • replacement in pijul-remote/src/lib.rs at line 1475
    [8.76443][8.8103:8171](),[8.8171][8.76508:76547](),[8.9627][8.76508:76547](),[8.76508][8.76508:76547](),[8.76547][8.2283:2347](),[8.2347][8.76547:76738](),[8.76547][8.76547:76738](),[8.76738][8.8172:8314](),[8.8314][8.76860:76875](),[8.76860][8.76860:76875](),[8.76875][8.642:690](),[8.690][8.2348:2393](),[8.735][8.2348:2393](),[8.76925][8.2348:2393](),[8.2393][2.0:56](),[2.56][8.2518:2672](),[8.2518][8.2518:2672](),[8.2672][8.18717:18770](),[8.2645][8.2719:2746](),[8.4592][8.2719:2746](),[8.2719][8.2719:2746](),[8.2746][8.2746:2764](),[8.942][8.77531:77545](),[8.2764][8.77531:77545](),[8.77531][8.77531:77545](),[8.77545][8.77545:77704]()
    let change = if let Some(&i) = txn.get_internal(&sc)? {
    i
    } else {
    debug!("could not find internal for {:?}", sc);
    continue;
    };
    // Check if at least one non-empty vertex from c is still alive.
    let v = libpijul::pristine::Vertex {
    change,
    start: libpijul::pristine::ChangePosition(0u64.into()),
    end: libpijul::pristine::ChangePosition(0u64.into()),
    };
    let channel = local_channel.read();
    let graph = txn.graph(&channel);
    for x in txn.iter_graph(graph, Some(&v))? {
    let (v, e) = x?;
    if v.change > change {
    break;
    } else if e.flag().is_alive_parent() {
    send_hash.send(CS::Change(*c))?;
    break;
    }
    }
    }
    debug!("dropping send_hash");
    std::mem::drop(send_hash);
    while recv_sig.recv().await.is_some() {}
    *self = t.await??;
    [8.76443]
    [8.77704]
    Ok(())
    })
    .await?;
  • edit in pijul-remote/src/lib.rs at line 1513
    [8.78608][8.2910:2980](),[8.2980][8.2980:2981]()
    use libpijul::pristine::{ChangePosition, Position};
    use regex::Regex;
  • edit in pijul-remote/src/lib.rs at line 1605
    [8.20381]
    [8.79731]
    }
    fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
    where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
    {
    stream(|sender| {
    let fut = op(sender.clone());
    async move {
    if let Err(e) = fut.await {
    let _ = sender.send(Err(e));
    }
    }
    })
  • edit in pijul-remote/src/lib.rs at line 1622
    [8.76328][8.18667:18716]()
    send_hash.send(CS::Change(*c))?;
  • resolve order conflict in pijul-remote/src/lib.rs at line 1622
    [8.79733]
  • edit in pijul-remote/src/lib.rs at line 1622
    [0.6061]
    [8.589]
    fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
    where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>,
    {
    struct Impl<F, T> {
    fut: Once<F>,
    rx: mpsc::Receiver<T>,
    }
    impl<F, T> Stream for Impl<F, T>
    where
    F: Future<Output = ()>,
    {
    type Item = T;
    fn poll_next(
    self: Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
    let (fut, mut rx) = unsafe {
    let this = self.get_unchecked_mut();
    (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
    };
    match (fut.poll_next(cx), rx.poll_recv(cx)) {
    (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
    (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
    _ => Poll::Pending,
    }
    }
    }
    let (tx, rx) = mpsc::channel(1);
    Impl {
    fut: stream::once(op(tx)),
    rx,
    }
    }
  • replacement in pijul-remote/src/lib.rs at line 1664
    [8.590][8.590:625](),[8.625][8.625:644](),[8.644][8.644:800](),[8.800][8.341:392](),[8.392][8.2245:2280](),[8.2280][8.822:850](),[8.822][8.822:850](),[8.850][8.165:197](),[8.197][8.393:478](),[8.850][8.393:478](),[8.478][8.3534:3586](),[8.3586][8.1209:1295](),[8.1209][8.1209:1295](),[8.1295][8.3587:3629]()
    async fn download_changes_rec(
    &mut self,
    repo: &mut Repository,
    send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
    mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
    send_ready: tokio::sync::mpsc::Sender<CS>,
    progress_bar: ProgressBar,
    mut waiting: usize,
    mut asked: HashSet<CS>,
    ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
    let changes_dir = repo.changes_dir.clone();
    let changes = repo.changes.clone();
    let t = tokio::spawn(async move {
    let mut buf = PathBuf::new();
    [8.590]
    [8.3629]
    fn download_changes_rec<'a, I>(
    cx: &'a DownloadContext,
    repo: &'a Repository,
    items: I,
    ) -> impl Stream<Item = anyhow::Result<CS>> + 'a
    where
    I: IntoIterator + 'a,
    I::Item: Borrow<CS>,
    {
    try_stream(move |sender| async move {
    let mut tasks = FuturesUnordered::new();
  • replacement in pijul-remote/src/lib.rs at line 1676
    [8.3630][8.0:30](),[8.1295][8.0:30](),[8.30][8.2281:2312]()
    if waiting == 0 {
    return Ok(());
    [8.3630]
    [8.60]
    // there is probably a way to model this using futures, but I
    // couldn't find a nice way to do it
    // so here, have three funny collections
    let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
    let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
    let mut fetched = HashSet::new();
    let make_download_job = |cs| async move {
    match cx.download(cs) {
    Ok(v) => (cs, v.await),
    Err(e) => (cs, Err(e)),
  • replacement in pijul-remote/src/lib.rs at line 1689
    [8.74][8.1295:1407](),[8.1295][8.1295:1407](),[8.1407][8.198:258](),[8.258][8.1407:1522](),[8.1407][8.1407:1522](),[8.1702][8.1702:1956]()
    let mut ready = Vec::new();
    while let Some((hash, follow)) = recv_signal.recv().await {
    debug!("received {:?} {:?}", hash, follow);
    if let CS::Change(hash) = hash {
    waiting -= 1;
    if follow {
    use libpijul::changestore::ChangeStore;
    let mut needs_dep = false;
    for dep in changes.get_dependencies(&hash)? {
    let dep: libpijul::pristine::Hash = dep;
    [8.74]
    [8.1956]
    };
  • replacement in pijul-remote/src/lib.rs at line 1691
    [8.1957][8.3631:3718](),[8.3718][8.2056:2136](),[8.2056][8.2056:2136]()
    let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
    let has_dep = std::fs::metadata(&dep_path).is_ok();
    [8.1957]
    [8.70880]
    for item in items {
    let item = *item.borrow();
    tasks.push(make_download_job(item));
  • edit in pijul-remote/src/lib.rs at line 1695
    [8.70881][8.2229:2321](),[8.2403][8.259:326](),[8.326][8.2313:2370](),[8.86][8.326:479](),[8.2370][8.326:479](),[8.326][8.326:479](),[8.479][8.2514:2570](),[8.2514][8.2514:2570](),[8.259][8.78:118]()
    if !has_dep {
    needs_dep = true;
    if asked.insert(CS::Change(dep)) {
    progress_bar.inc(1);
    send_hash.send(CS::Change(dep))?;
    waiting += 1
    }
    }
    }
    debug!("to_apply {:?}", h);
  • resolve order conflict in pijul-remote/src/lib.rs at line 1695
    [8.70881]
  • edit in pijul-remote/src/lib.rs at line 1695
    [0.8015]
    [8.3532]
    if let CS::Change(hash) = item {
    rev_deps.insert(hash, Vec::new());
    }
    }
  • replacement in pijul-remote/src/lib.rs at line 1700
    [8.3533][8.41:71](),[8.454][8.41:71](),[8.71][8.119:178](),[8.454][8.119:178]()
    asked.insert(*h);
    hash_send.send(*h)?;
    waiting += 1;
    [8.3533]
    while let Some((cs, res)) = tasks.next().await {
    debug!("{:?} finished downloading (result: {:?})", cs, &res);
    let follow = res?;
    let CS::Change(hash) = cs else {
    debug!("it is not a change, we're done");
    sender.send(Ok(cs)).await?;
    continue;
    };
    fetched.insert(hash);
    // first, populate our dependencies
    if follow {
    info!("{:?}", hash);
    let pending_deps = match pending_deps.entry(hash) {
    Entry::Occupied(_) => unreachable!(),
    Entry::Vacant(v) => v.insert(Default::default()),
    };
    for dep in repo.changes.get_dependencies(&hash)? {
    if !fetched.contains(&dep) {
    pending_deps.insert(dep);
    }
    rev_deps.entry(dep).or_insert_with(|| {
    tasks.push(make_download_job(CS::Change(dep)));
    Default::default()
    });
    }
    }
    // then, send completed changes (including parents of this) to
    // our caller
    let mut to_check = vec![hash];
    while let Some(hash) = to_check.pop() {
    if let Some(pending_deps) = pending_deps.get(&hash) {
    if !pending_deps.is_empty() {
    continue;
    }
    }
    // this change has no pending dependencies => it is complete
    sender.send(Ok(CS::Change(hash))).await?;
    // other changes may be complete if this was the last
    // missing dependency, propagate
    for dep in rev_deps.get(&hash).into_iter().flatten() {
    if let Some(p) = pending_deps.get_mut(dep) {
    assert!(p.remove(dep));
    to_check.push(*dep);
    }
    }
    }
    }
    Ok(())
    })
    }