Pull code cleanup v1
Dependencies
- [2]
JLVRAJA5Using iter_graph in pijul - [3]
3E2KY6Y4Deterministic ordering of pulled patches - [4]
QWIYNMI5Formatting + big-endian Sanakirja - [5]
5Z2Y7VGVMigrate `pijul::identity::Complete::prove` to `pijul::remote::prove` - [6]
G7HJHNFDMigrate from `pijul_interaction::progress` to `pijul_interaction` - [7]
LKIKT4FR[2/?] Clean up change file name handling - [8]
IBPVOKM5Fixing a bug in patch download - [9]
Q45QHPO4Feedback on network stuff - [10]
JUYSZJSHMigrate from `pijul::progress` to `pijul_interaction::progress` - [11]
CCLLB7OIUpgrading to Sanakirja 0.15 + version bump - [12]
5XMUEZMZpijul-clone: avoid panics on parsing remote URLs - [13]
3WO4H2MMFixing async issues in downloads - [14]
A6JQQNNJMore appropriate debug messages - [15]
2D7P2VKJChange completions (where the whole progress bar story started) - [16]
UDGL7ER2Clean up change file name handling - [17]
L4JXJHWXpijul/*: reorganize imports and remove extern crate - [18]
I52XSRUHMassive cleanup, and simplification - [19]
TKEVOH7HFixing a bug when downloading changes, and making change download more efficient (more async) - [20]
BNPSVXICFriendlier progress bars - [21]
SXEYMYF7Fixing the bad changes in history (unfortunately, by rebooting). - [22]
YN63NUZOSanakirja 1.0 - [23]
UDHP4ZVBFixing SSH asynchronicity issues - [24]
A3RM526YIntegrating identity malleability - [25]
IVLLXQ5ZImproved push/pull reporting - [26]
X6YFD4WVDo not download changes if we already have them - [27]
GNMZNKB4Cursors cleanup - [28]
ZWVYH7WPPulling local tags - [29]
4XLHUME7Fixing a interlocking when cloning from a particular patch - [30]
XQHABMC2Do not block when there is no patch to pull - [31]
MU5GSJAWPartial push and pull (WARNING: breaks the existing protocol) - [32]
C3L2TLQWWhen downloading changes, check whether we have their dependencies and download them too - [33]
4RV7T4SRMigrate from `pijul::config` to `pijul-config` - [34]
ABQDWHNGMigrate from `pijul::repository` to `pijul-repository` - [35]
5QTMRUXNFixing a race condition between progress bars - [36]
ZDK3GNDBTag transactions (including a massive refactoring of errors) - [37]
DO2Y5TY5Tag synchronisation - [38]
HXEIH4UQPulling more than 100 changes at once - [39]
TPEH2XNB1.0.0-alpha.28, with Tokio 1.0 - [40]
4HTHYIA3Fixing HTTP download - [41]
5SLOJYHGFixing the Git feature - [42]
Z6ASIMORAvoid increasing the download progress bar length if we need the same dependency more than once - [43]
HKHMES6TSolving conflicts - [44]
PYTC7DPVPartial clones: including all the subtree of a directory when selecting patches - [45]
OGJFEWHUFixing missing dependencies on partial clones - [46]
EUZFFJSOUpdating Pijul with the latest changes in Libpijul - [47]
I24UEJQLVarious post-fire fixes
Change contents
- edit in pijul-remote/src/lib.rs at line 1
use std::borrow::Borrow;use std::collections::hash_map::Entry; - edit in pijul-remote/src/lib.rs at line 4
use std::future::Future; - replacement in pijul-remote/src/lib.rs at line 6
use std::path::{Path, PathBuf};use std::path::Path;use std::pin::Pin; - edit in pijul-remote/src/lib.rs at line 9
use std::task::Poll; - edit in pijul-remote/src/lib.rs at line 13
use futures::{pin_mut, Stream};use futures_util::stream::{FuturesUnordered, Once};use futures_util::{stream, StreamExt, TryStreamExt}; - replacement in pijul-remote/src/lib.rs at line 17
use libpijul::changestore::filesystem;use libpijul::changestore::filesystem::fmt_filename;use crate::context::DownloadContext;use libpijul::changestore::{filesystem, ChangeStore}; - replacement in pijul-remote/src/lib.rs at line 24[8.137]→[8.52599:52622](∅→∅),[8.602]→[8.52599:52622](∅→∅),[8.1128]→[8.52599:52622](∅→∅),[8.52599]→[8.52599:52622](∅→∅)
use libpijul::DOT_DIR;use libpijul::pristine::{ChangePosition, Position};use libpijul::{ApplyWorkspace, HashMap, DOT_DIR}; - edit in pijul-remote/src/lib.rs at line 28
use regex::Regex;use tokio::sync::mpsc; - edit in pijul-remote/src/lib.rs at line 44
mod context; - replacement in pijul-remote/src/lib.rs at line 1177
pub async fn download_changes(async fn download_changes( - replacement in pijul-remote/src/lib.rs at line 1180
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,mut hashes: mpsc::UnboundedReceiver<CS>,mut send: mpsc::Sender<(CS, bool)>, - replacement in pijul-remote/src/lib.rs at line 1188
l.download_changes(progress_bar, hashes, send, path).await?l.download_changes(progress_bar, &mut hashes, &mut send, path).await? - replacement in pijul-remote/src/lib.rs at line 1192
s.download_changes(progress_bar, hashes, send, path, full)s.download_changes(progress_bar, &mut hashes, &mut send, path, full) - replacement in pijul-remote/src/lib.rs at line 1196
h.download_changes(progress_bar, hashes, send, path, full)h.download_changes(progress_bar, &mut hashes, &mut send, path, full) - edit in pijul-remote/src/lib.rs at line 1247
let (mut send, recv) = tokio::sync::mpsc::channel(100); - edit in pijul-remote/src/lib.rs at line 1256
let mut self_ = std::mem::replace(self, RemoteRepo::None);let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();let cloned_download_bar = download_bar.clone(); - edit in pijul-remote/src/lib.rs at line 1257[7.52]→[8.1472:1532](∅→∅),[8.1694]→[8.1472:1532](∅→∅),[8.2806]→[8.1472:1532](∅→∅),[8.3492]→[8.1472:1532](∅→∅),[8.1472]→[8.1472:1532](∅→∅),[8.1532]→[8.1695:1838](∅→∅),[8.1838]→[7.53:87](∅→∅),[7.87]→[8.1877:1922](∅→∅),[8.3531]→[8.1877:1922](∅→∅),[8.1877]→[8.1877:1922](∅→∅),[8.1250]→[8.1619:1644](∅→∅),[8.1922]→[8.1619:1644](∅→∅),[8.3024]→[8.1619:1644](∅→∅),[8.1619]→[8.1619:1644](∅→∅)
let t = tokio::spawn(async move {self_.download_changes(cloned_download_bar,&mut hash_recv,&mut send,&changes_dir,false,).await?; - replacement in pijul-remote/src/lib.rs at line 1258
Ok::<_, anyhow::Error>(self_)});let mut to_apply_inodes = HashSet::new(); - replacement in pijul-remote/src/lib.rs at line 1260
let mut waiting = 0;let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);{let to_apply_inodes = &mut to_apply_inodes; - replacement in pijul-remote/src/lib.rs at line 1263[8.77]→[8.0:40](∅→∅),[8.40]→[8.231:259](∅→∅),[8.77]→[8.231:259](∅→∅),[8.94]→[8.231:259](∅→∅),[8.1724]→[8.231:259](∅→∅),[8.3095]→[8.231:259](∅→∅),[8.231]→[8.231:259](∅→∅),[8.529]→[8.534:544](∅→∅),[8.534]→[8.534:544](∅→∅)
let mut asked = HashSet::new();for h in to_apply {}self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {let stream = download_changes_rec(&cx, repo, to_apply);pin_mut!(stream); - replacement in pijul-remote/src/lib.rs at line 1267[8.127]→[8.179:200](∅→∅),[8.200]→[8.1925:2151](∅→∅),[8.164]→[8.237:258](∅→∅),[8.285]→[8.237:258](∅→∅),[8.2151]→[8.237:258](∅→∅),[8.237]→[8.237:258](∅→∅)
let u = self.download_changes_rec(repo,hash_send,recv,send_ready,download_bar,waiting,asked,).await?;let mut ws = ApplyWorkspace::new();while let Some(h) = stream.try_next().await? {debug!("to_apply: {:?}", h);let mut touches_inodes = inodes.is_empty(); - edit in pijul-remote/src/lib.rs at line 1274[8.69755]→[8.69755:69809](∅→∅),[8.69809]→[3.0:50](∅→∅),[3.50]→[8.286:340](∅→∅),[8.1454]→[8.286:340](∅→∅),[8.311]→[8.3174:3215](∅→∅),[8.340]→[8.3174:3215](∅→∅),[8.621]→[8.3174:3215](∅→∅),[8.1456]→[8.1456:1528](∅→∅)
let mut ws = libpijul::ApplyWorkspace::new();let mut to_apply_inodes = HashSet::new();while let Some(h) = recv_ready.recv().await {debug!("to_apply: {:?}", h);let touches_inodes = inodes.is_empty()|| { - replacement in pijul-remote/src/lib.rs at line 1275[8.1581]→[8.1581:1641](∅→∅),[8.1641]→[8.312:363](∅→∅),[8.363]→[8.16600:16882](∅→∅),[8.16600]→[8.16600:16882](∅→∅),[8.16882]→[8.320:504](∅→∅)
use libpijul::changestore::ChangeStore;if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,if !touches_inodes {if let CS::Change(ref h) = h {let changes = repo.changes.get_changes(h)?;touches_inodes |= changes.iter().any(|c| {c.iter().any(|c| {let inode = c.inode();debug!("inode = {:?}", inode);inodes.contains(&Position {change: inode.change.unwrap_or(*h),pos: inode.pos,}) - replacement in pijul-remote/src/lib.rs at line 1290
})}}if !touches_inodes {touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);}if !touches_inodes {continue;}to_apply_inodes.insert(h);if let Some(apply_bar) = &apply_bar {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(ref h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;}debug!("applied"); - replacement in pijul-remote/src/lib.rs at line 1315
falsedebug!("not applying {:?}", h) - edit in pijul-remote/src/lib.rs at line 1318
|| { inodes.iter().any(|i| CS::Change(i.change) == h) }; - replacement in pijul-remote/src/lib.rs at line 1319[8.2393]→[8.2393:2425](∅→∅),[8.2425]→[3.51:94](∅→∅),[3.94]→[8.2467:2528](∅→∅),[8.479]→[8.2467:2528](∅→∅),[8.2467]→[8.2467:2528](∅→∅),[8.2528]→[8.2528:2529](∅→∅),[8.2529]→[6.144:201](∅→∅),[6.201]→[8.972:1015](∅→∅),[8.1292]→[8.972:1015](∅→∅),[8.2209]→[8.972:1015](∅→∅),[8.3500]→[8.972:1015](∅→∅),[8.972]→[8.972:1015](∅→∅),[8.1015]→[8.2210:2244](∅→∅),[8.221]→[8.3567:3600](∅→∅),[8.1355]→[8.3567:3600](∅→∅),[8.2244]→[8.3567:3600](∅→∅),[8.3567]→[8.3567:3600](∅→∅),[8.3600]→[8.17462:17560](∅→∅),[8.17560]→[8.480:568](∅→∅),[8.568]→[8.17643:17661](∅→∅),[8.17643]→[8.17643:17661](∅→∅),[8.139]→[8.3601:3636](∅→∅),[8.17661]→[8.3601:3636](∅→∅),[8.7732]→[8.3601:3636](∅→∅),[8.3636]→[8.70594:70662](∅→∅),[8.7732]→[8.70594:70662](∅→∅),[8.70594]→[8.70594:70662](∅→∅),[8.70662]→[3.95:109](∅→∅)
if touches_inodes {to_apply_inodes.insert(h);} else {continue;}if let Some(apply_bar) = apply_bar.clone() {info!("Applying {:?}", h);apply_bar.inc(1);debug!("apply");if let CS::Change(h) = h {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;}debug!("applied");} else {debug!("not applying {:?}", h)}Ok(())}).await?; - edit in pijul-remote/src/lib.rs at line 1332[8.70716]→[8.700:747](∅→∅),[8.747]→[8.294:321](∅→∅),[8.70716]→[8.294:321](∅→∅),[8.321]→[8.569:588](∅→∅)
debug!("waiting for spawned process");*self = t.await??;u.await??; - edit in pijul-remote/src/lib.rs at line 1333[3.314]→[8.70874:70880](∅→∅),[8.2558]→[8.70874:70880](∅→∅),[8.70874]→[8.70874:70880](∅→∅),[8.2661]→[8.2661:2662](∅→∅),[8.2662]→[8.2662:3005](∅→∅),[8.3005]→[8.3005:3023](∅→∅),[8.3023]→[8.3023:3084](∅→∅),[8.3084]→[8.3084:3116](∅→∅),[8.3116]→[8.480:520](∅→∅),[8.520]→[8.3116:3188](∅→∅),[8.3116]→[8.3116:3188](∅→∅),[8.3188]→[8.3188:3202](∅→∅),[8.3202]→[8.3202:3243](∅→∅),[8.3243]→[8.3243:3262](∅→∅),[8.3262]→[8.3262:3274](∅→∅),[8.3274]→[8.479:493](∅→∅)
}if !needs_dep {send_ready.send(CS::Change(hash)).await?;} else {ready.push(CS::Change(hash))}} else {send_ready.send(CS::Change(hash)).await?;}}if waiting == 0 {break;}}info!("waiting loop done");for r in ready {send_ready.send(r).await?;}std::mem::drop(recv_signal);Ok(())});Ok(t) - replacement in pijul-remote/src/lib.rs at line 1337
repo: &mut Repository,repo: &Repository, - replacement in pijul-remote/src/lib.rs at line 1339
channel: &mut ChannelRef<T>,channel: &ChannelRef<T>, - edit in pijul-remote/src/lib.rs at line 1342[8.71183]→[8.3692:3773](∅→∅),[8.3773]→[8.3310:3388](∅→∅),[8.3388]→[8.71319:71386](∅→∅),[8.71319]→[8.71319:71386](∅→∅),[8.71386]→[7.88:140](∅→∅)
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);let mut self_ = std::mem::replace(self, RemoteRepo::None);let changes_dir = repo.changes_dir.clone(); - edit in pijul-remote/src/lib.rs at line 1343
let cloned_download_bar = download_bar.clone(); - replacement in pijul-remote/src/lib.rs at line 1344[8.1734]→[8.71386:71428](∅→∅),[8.2510]→[8.71386:71428](∅→∅),[8.3446]→[8.71386:71428](∅→∅),[8.3830]→[8.71386:71428](∅→∅),[8.71386]→[8.71386:71428](∅→∅),[8.71428]→[8.1849:1867](∅→∅),[8.1867]→[8.3831:3866](∅→∅),[8.3866]→[8.2511:2552](∅→∅),[8.1762]→[8.3910:3984](∅→∅),[8.2552]→[8.3910:3984](∅→∅),[8.3910]→[8.3910:3984](∅→∅),[8.3984]→[7.141:175](∅→∅),[7.175]→[8.4023:4068](∅→∅),[8.4023]→[8.4023:4068](∅→∅),[8.4068]→[8.1961:1986](∅→∅),[8.1961]→[8.1961:1986](∅→∅),[8.1986]→[8.71908:71942](∅→∅),[8.71908]→[8.71908:71942](∅→∅)
let t = tokio::spawn(async move {self_.download_changes(cloned_download_bar,&mut recv_hash,&mut send_signal,&changes_dir,false,).await?;Ok(self_)});let mut hashes = Vec::new(); - replacement in pijul-remote/src/lib.rs at line 1346[8.71943]→[8.316:345](∅→∅),[8.345]→[8.521:561](∅→∅),[8.345]→[8.71943:71974](∅→∅),[8.561]→[8.71943:71974](∅→∅),[8.71943]→[8.71943:71974](∅→∅),[8.71974]→[8.346:372](∅→∅),[8.372]→[8.17662:17706](∅→∅),[8.71974]→[8.17662:17706](∅→∅),[8.17706]→[8.562:603](∅→∅),[8.603]→[8.72012:72022](∅→∅),[8.4101]→[8.72012:72022](∅→∅),[8.17706]→[8.72012:72022](∅→∅),[8.72012]→[8.72012:72022](∅→∅)
let mut waiting = 0;let mut asked = HashSet::new();for &h in tag.iter() {waiting += 1;send_hash.send(CS::Change(h))?;asked.insert(CS::Change(h));}{let txn = &mut *txn;let hashes = &mut hashes; - replacement in pijul-remote/src/lib.rs at line 1350
let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));pin_mut!(stream); - replacement in pijul-remote/src/lib.rs at line 1354[8.572]→[8.572:593](∅→∅),[8.593]→[8.2553:2786](∅→∅),[4.445]→[8.3566:3587](∅→∅),[8.685]→[8.3566:3587](∅→∅),[8.703]→[8.3566:3587](∅→∅),[8.2786]→[8.3566:3587](∅→∅),[8.3566]→[8.3566:3587](∅→∅)
let u = self.download_changes_rec(repo,send_hash,recv_signal,send_ready,download_bar,waiting,asked,).await?;let mut ws = ApplyWorkspace::new(); - replacement in pijul-remote/src/lib.rs at line 1356[8.72079]→[8.72079:72116](∅→∅),[8.72752]→[8.72752:72806](∅→∅),[8.72806]→[8.140:150](∅→∅),[8.150]→[8.530:578](∅→∅),[8.578]→[8.3588:3702](∅→∅),[8.3702]→[8.1312:1403](∅→∅),[8.18352]→[8.1312:1403](∅→∅)
let mut hashes = Vec::new();let mut ws = libpijul::ApplyWorkspace::new();{let mut channel_ = channel.write();while let Some(hash) = recv_ready.recv().await {if let CS::Change(ref hash) = hash {txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;while let Some(cs) = stream.try_next().await? {if let CS::Change(hash) = cs {let mut channel = channel.write();txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;}hashes.push(cs); - replacement in pijul-remote/src/lib.rs at line 1364[8.18457]→[8.3703:3738](∅→∅),[8.3738]→[8.330:344](∅→∅),[8.18457]→[8.330:344](∅→∅),[8.330]→[8.330:344](∅→∅)
hashes.push(hash);}Ok(())}).await?; - replacement in pijul-remote/src/lib.rs at line 1369
let r: Result<_, anyhow::Error> = t.await?;*self = r?;u.await??; - replacement in pijul-remote/src/lib.rs at line 1416
local_channel: &mut ChannelRef<T>,local_channel: &ChannelRef<T>, - edit in pijul-remote/src/lib.rs at line 1421[8.2107]→[8.75038:75086](∅→∅),[8.75038]→[8.75038:75086](∅→∅),[8.75086]→[8.4141:4222](∅→∅),[8.917]→[8.75164:75307](∅→∅),[8.4222]→[8.75164:75307](∅→∅),[8.75164]→[8.75164:75307](∅→∅),[8.75307]→[7.176:228](∅→∅)
use libpijul::changestore::ChangeStore;let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);let mut self_ = std::mem::replace(self, RemoteRepo::None);let changes_dir = repo.changes_dir.clone(); - edit in pijul-remote/src/lib.rs at line 1424[8.2902]→[8.2902:3077](∅→∅),[8.3077]→[6.289:327](∅→∅),[6.327]→[8.3112:3191](∅→∅),[8.3112]→[8.3112:3191](∅→∅),[8.3191]→[7.229:267](∅→∅),[7.267]→[8.3233:3376](∅→∅),[8.3233]→[8.3233:3376](∅→∅)
let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =tokio::spawn(async move {self_.download_changes(download_bar,&mut recv_hash,&mut send_sig,&changes_dir,true,).await?;Ok::<_, anyhow::Error>(self_)}); - replacement in pijul-remote/src/lib.rs at line 1425[8.76077]→[8.76077:76104](∅→∅),[8.76104]→[8.530:600](∅→∅),[8.600]→[8.7945:8102](∅→∅),[8.18666]→[8.7945:8102](∅→∅),[8.76104]→[8.7945:8102](∅→∅),[8.8102]→[8.76173:76247](∅→∅),[8.9560]→[8.76173:76247](∅→∅),[8.76173]→[8.76173:76247](∅→∅)
for c in changes {let c = if let CS::Change(c) = c { c } else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {let mut waiting = Vec::new();for c in changes {let CS::Change(c) = c else { continue };let sc = c.into();if repo.changes.has_contents(*c, txn.get_internal(&sc)?.cloned()){debug!("has contents {:?}", c);continue;}if full {waiting.push(cx.download(CS::Change(*c))?);continue;}let Some(&change) = txn.get_internal(&sc)? else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: ChangePosition(0u64.into()),end: ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {waiting.push(cx.download(CS::Change(*c))?);break;}} - replacement in pijul-remote/src/lib.rs at line 1471[8.76261]→[8.76261:76328](∅→∅),[8.2574]→[8.76371:76429](∅→∅),[8.4550]→[8.76371:76429](∅→∅),[8.76371]→[8.76371:76429](∅→∅)
if full {debug!("sending send_hash");debug!("sent");continue;for w in waiting {w.await?; - replacement in pijul-remote/src/lib.rs at line 1475[8.76443]→[8.8103:8171](∅→∅),[8.8171]→[8.76508:76547](∅→∅),[8.9627]→[8.76508:76547](∅→∅),[8.76508]→[8.76508:76547](∅→∅),[8.76547]→[8.2283:2347](∅→∅),[8.2347]→[8.76547:76738](∅→∅),[8.76547]→[8.76547:76738](∅→∅),[8.76738]→[8.8172:8314](∅→∅),[8.8314]→[8.76860:76875](∅→∅),[8.76860]→[8.76860:76875](∅→∅),[8.76875]→[8.642:690](∅→∅),[8.690]→[8.2348:2393](∅→∅),[8.735]→[8.2348:2393](∅→∅),[8.76925]→[8.2348:2393](∅→∅),[8.2393]→[2.0:56](∅→∅),[2.56]→[8.2518:2672](∅→∅),[8.2518]→[8.2518:2672](∅→∅),[8.2672]→[8.18717:18770](∅→∅),[8.2645]→[8.2719:2746](∅→∅),[8.4592]→[8.2719:2746](∅→∅),[8.2719]→[8.2719:2746](∅→∅),[8.2746]→[8.2746:2764](∅→∅),[8.942]→[8.77531:77545](∅→∅),[8.2764]→[8.77531:77545](∅→∅),[8.77531]→[8.77531:77545](∅→∅),[8.77545]→[8.77545:77704](∅→∅)
let change = if let Some(&i) = txn.get_internal(&sc)? {i} else {debug!("could not find internal for {:?}", sc);continue;};// Check if at least one non-empty vertex from c is still alive.let v = libpijul::pristine::Vertex {change,start: libpijul::pristine::ChangePosition(0u64.into()),end: libpijul::pristine::ChangePosition(0u64.into()),};let channel = local_channel.read();let graph = txn.graph(&channel);for x in txn.iter_graph(graph, Some(&v))? {let (v, e) = x?;if v.change > change {break;} else if e.flag().is_alive_parent() {send_hash.send(CS::Change(*c))?;break;}}}debug!("dropping send_hash");std::mem::drop(send_hash);while recv_sig.recv().await.is_some() {}*self = t.await??;Ok(())}).await?; - edit in pijul-remote/src/lib.rs at line 1513
use libpijul::pristine::{ChangePosition, Position};use regex::Regex; - edit in pijul-remote/src/lib.rs at line 1605
}fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>whereC: FnOnce(mpsc::Sender<Result<T, E>>) -> F,F: Future<Output = Result<(), E>>,{stream(|sender| {let fut = op(sender.clone());async move {if let Err(e) = fut.await {let _ = sender.send(Err(e));}}}) - edit in pijul-remote/src/lib.rs at line 1622
send_hash.send(CS::Change(*c))?; - resolve order conflict in pijul-remote/src/lib.rs at line 1622[8.79733]
- edit in pijul-remote/src/lib.rs at line 1622
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>whereC: FnOnce(mpsc::Sender<T>) -> F,F: Future<Output = ()>,{struct Impl<F, T> {fut: Once<F>,rx: mpsc::Receiver<T>,}impl<F, T> Stream for Impl<F, T>whereF: Future<Output = ()>,{type Item = T;fn poll_next(self: Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> Poll<Option<Self::Item>> {let (fut, mut rx) = unsafe {let this = self.get_unchecked_mut();(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))};match (fut.poll_next(cx), rx.poll_recv(cx)) {(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),_ => Poll::Pending,}}}let (tx, rx) = mpsc::channel(1);Impl {fut: stream::once(op(tx)),rx,}} - replacement in pijul-remote/src/lib.rs at line 1664[8.590]→[8.590:625](∅→∅),[8.625]→[8.625:644](∅→∅),[8.644]→[8.644:800](∅→∅),[8.800]→[8.341:392](∅→∅),[8.392]→[8.2245:2280](∅→∅),[8.2280]→[8.822:850](∅→∅),[8.822]→[8.822:850](∅→∅),[8.850]→[8.165:197](∅→∅),[8.197]→[8.393:478](∅→∅),[8.850]→[8.393:478](∅→∅),[8.478]→[8.3534:3586](∅→∅),[8.3586]→[8.1209:1295](∅→∅),[8.1209]→[8.1209:1295](∅→∅),[8.1295]→[8.3587:3629](∅→∅)
async fn download_changes_rec(&mut self,repo: &mut Repository,send_hash: tokio::sync::mpsc::UnboundedSender<CS>,mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,send_ready: tokio::sync::mpsc::Sender<CS>,progress_bar: ProgressBar,mut waiting: usize,mut asked: HashSet<CS>,) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {let changes_dir = repo.changes_dir.clone();let changes = repo.changes.clone();let t = tokio::spawn(async move {let mut buf = PathBuf::new();fn download_changes_rec<'a, I>(cx: &'a DownloadContext,repo: &'a Repository,items: I,) -> impl Stream<Item = anyhow::Result<CS>> + 'awhereI: IntoIterator + 'a,I::Item: Borrow<CS>,{try_stream(move |sender| async move {let mut tasks = FuturesUnordered::new(); - replacement in pijul-remote/src/lib.rs at line 1676
if waiting == 0 {return Ok(());// there is probably a way to model this using futures, but I// couldn't find a nice way to do it// so here, have three funny collectionslet mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();let mut fetched = HashSet::new();let make_download_job = |cs| async move {match cx.download(cs) {Ok(v) => (cs, v.await),Err(e) => (cs, Err(e)), - replacement in pijul-remote/src/lib.rs at line 1689[8.74]→[8.1295:1407](∅→∅),[8.1295]→[8.1295:1407](∅→∅),[8.1407]→[8.198:258](∅→∅),[8.258]→[8.1407:1522](∅→∅),[8.1407]→[8.1407:1522](∅→∅),[8.1702]→[8.1702:1956](∅→∅)
let mut ready = Vec::new();while let Some((hash, follow)) = recv_signal.recv().await {debug!("received {:?} {:?}", hash, follow);if let CS::Change(hash) = hash {waiting -= 1;if follow {use libpijul::changestore::ChangeStore;let mut needs_dep = false;for dep in changes.get_dependencies(&hash)? {let dep: libpijul::pristine::Hash = dep;}; - replacement in pijul-remote/src/lib.rs at line 1691
let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);let has_dep = std::fs::metadata(&dep_path).is_ok();for item in items {let item = *item.borrow();tasks.push(make_download_job(item)); - edit in pijul-remote/src/lib.rs at line 1695[8.70881]→[8.2229:2321](∅→∅),[8.2403]→[8.259:326](∅→∅),[8.326]→[8.2313:2370](∅→∅),[8.86]→[8.326:479](∅→∅),[8.2370]→[8.326:479](∅→∅),[8.326]→[8.326:479](∅→∅),[8.479]→[8.2514:2570](∅→∅),[8.2514]→[8.2514:2570](∅→∅),[8.259]→[8.78:118](∅→∅)
if !has_dep {needs_dep = true;if asked.insert(CS::Change(dep)) {progress_bar.inc(1);send_hash.send(CS::Change(dep))?;waiting += 1}}}debug!("to_apply {:?}", h); - resolve order conflict in pijul-remote/src/lib.rs at line 1695[8.70881]
- edit in pijul-remote/src/lib.rs at line 1695
if let CS::Change(hash) = item {rev_deps.insert(hash, Vec::new());}} - replacement in pijul-remote/src/lib.rs at line 1700
asked.insert(*h);hash_send.send(*h)?;waiting += 1;[8.3533]while let Some((cs, res)) = tasks.next().await {debug!("{:?} finished downloading (result: {:?})", cs, &res);let follow = res?;let CS::Change(hash) = cs else {debug!("it is not a change, we're done");sender.send(Ok(cs)).await?;continue;};fetched.insert(hash);// first, populate our dependenciesif follow {info!("{:?}", hash);let pending_deps = match pending_deps.entry(hash) {Entry::Occupied(_) => unreachable!(),Entry::Vacant(v) => v.insert(Default::default()),};for dep in repo.changes.get_dependencies(&hash)? {if !fetched.contains(&dep) {pending_deps.insert(dep);}rev_deps.entry(dep).or_insert_with(|| {tasks.push(make_download_job(CS::Change(dep)));Default::default()});}}// then, send completed changes (including parents of this) to// our callerlet mut to_check = vec![hash];while let Some(hash) = to_check.pop() {if let Some(pending_deps) = pending_deps.get(&hash) {if !pending_deps.is_empty() {continue;}}// this change has no pending dependencies => it is completesender.send(Ok(CS::Change(hash))).await?;// other changes may be complete if this was the last// missing dependency, propagatefor dep in rev_deps.get(&hash).into_iter().flatten() {if let Some(p) = pending_deps.get_mut(dep) {assert!(p.remove(dep));to_check.push(*dep);}}}}Ok(())})}