Fixing a bug when downloading changes, and making change download more efficient (more async)

[?]
Feb 27, 2021, 7:14 PM
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC

Dependencies

  • [2] TZ42DX3B Properly dropping a channel
  • [3] DNQHXWRZ address clippy hard errors
  • [4] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [5] WTZXEWY7 Flushing the futures pipeline when downloading over HTTP(S)
  • [6] Q45QHPO4 Feedback on network stuff
  • [7] K6GWUOD5 Styling progress bars
  • [8] HXEIH4UQ Pulling more than 100 changes at once
  • [9] 5QTMRUXN Fixing a race condition between progress bars
  • [10] YN63NUZO Sanakirja 1.0
  • [11] GHO6DWPI Refactoring iterators
  • [12] VO5OQW4W Removing anyhow in libpijul
  • [13] SZWBLWZ4 Reading ~/.ssh/config
  • [14] IQ4FCHPZ HTTP connections: pooling + retry on error
  • [15] 5XMUEZMZ pijul-clone: avoid panics on parsing remote URLs
  • [16] VBMXB443 Retrying if the HTTP connection drops while reading the body
  • [17] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [18] 76PCXGML Pushing to, and pulling from the local repository
  • [19] WLUID7NA Do not block when downloading more than 100 changes over SSH
  • [20] BZSC7VMY address clippy lints
  • [21] 367UBQ6K Forwarding SSH stderr, and progress bar for push
  • [22] L4JXJHWX pijul/*: reorganize imports and remove extern crate
  • [23] UDHP4ZVB Fixing SSH asynchronicity issues
  • [24] FBXYP7QM Forgot to add remote::http
  • [25] I52XSRUH Massive cleanup, and simplification
  • [26] KTTKF3RW Locking stderr and the progress bar in SSH
  • [27] QMTANHVN Reset: only output changed files
  • [28] X6YFD4WV Do not download changes if we already have them
  • [29] LGEJSLTY Fixing output (including its uses in reset and pull)
  • [30] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)
  • [*] 6YMDOZIB Refactoring apply

Change contents

  • replacement in pijul/src/remote/ssh.rs at line 14
    [3.80][3.251:276](),[3.6054][3.251:276](),[3.251][3.251:276]()
    use log::{debug, error};
    [3.6054]
    [3.276]
    use log::{debug, error, trace};
  • replacement in pijul/src/remote/ssh.rs at line 400
    [3.1411][3.1411:1466]()
    debug!("data {:?} {:?}", channel, data.len());
    [3.1411]
    [3.1466]
    trace!("data {:?} {:?}", channel, data.len());
  • replacement in pijul/src/remote/ssh.rs at line 430
    [3.2788][3.2788:2833]()
    debug!("state changes");
    [3.2788]
    [3.2833]
    trace!("state changes");
  • edit in pijul/src/remote/ssh.rs at line 439
    [3.3260]
    [3.3260]
    debug!("writing {:?} bytes", *remaining_len);
  • edit in pijul/src/remote/ssh.rs at line 462
    [3.4400]
    [3.4400]
    debug!("before pop: {:?}", path);
  • edit in pijul/src/remote/ssh.rs at line 468
    [3.4701]
    [3.4701]
    debug!("after pop: {:?}", path);
  • replacement in pijul/src/remote/ssh.rs at line 484
    [3.5504][3.5504:5558]()
    debug!("need more data");
    [3.5504]
    [3.5558]
    trace!("need more data");
  • replacement in pijul/src/remote/ssh.rs at line 488
    [3.5641][3.5641:5707]()
    debug!("finished, {:?} {:?}", p, data.len());
    [3.5641]
    [3.5707]
    trace!("finished, {:?} {:?}", p, data.len());
  • replacement in pijul/src/remote/ssh.rs at line 797
    [3.45470][3.8706:8746]()
    c: &[libpijul::pristine::Hash],
    [3.45470]
    [3.8746]
    c: &mut tokio::sync::mpsc::Receiver<libpijul::pristine::Hash>,
  • replacement in pijul/src/remote/ssh.rs at line 808
    [3.208][3.208:248]()
    c: &[libpijul::pristine::Hash],
    [3.208]
    [3.248]
    c: &mut tokio::sync::mpsc::Receiver<libpijul::pristine::Hash>,
  • edit in pijul/src/remote/ssh.rs at line 813
    [3.426][3.2585:2648]()
    if c.is_empty() {
    return Ok(());
    }
  • edit in pijul/src/remote/ssh.rs at line 815
    [3.8967][3.8967:9043]()
    libpijul::changestore::filesystem::push_filename(&mut path, &c[0]);
  • edit in pijul/src/remote/ssh.rs at line 818
    [3.9094]
    [3.9094]
    debug!("changes_dir = {:?}", changes_dir);
  • replacement in pijul/src/remote/ssh.rs at line 825
    [3.9247][3.9247:9279]()
    hashes: c.to_vec(),
    [3.9247]
    [3.9279]
    hashes: Vec::new(),
  • replacement in pijul/src/remote/ssh.rs at line 829
    [3.9350][3.9350:9371]()
    for c in c {
    [3.9350]
    [3.9371]
    let mut len = 0;
    while let Some(c) = c.recv().await {
    if let State::Changes { ref mut hashes, .. } = *self.state.lock().await {
    hashes.push(c);
    }
  • edit in pijul/src/remote/ssh.rs at line 845
    [3.9743]
    [3.9743]
    len += 1;
  • replacement in pijul/src/remote/ssh.rs at line 847
    [3.9753][3.29:86]()
    let progress = ProgressBar::new(c.len() as u64);
    [3.9753]
    [3.0]
    let progress = ProgressBar::new(len as u64);
  • replacement in pijul/src/remote/ssh.rs at line 876
    [3.49753][3.9928:9964]()
    debug!("done downloading");
    [3.49753]
    [3.52441]
    debug!("done downloading {:?}", changes_dir);
  • replacement in pijul/src/remote/mod.rs at line 7
    [3.393][3.516:602](),[3.439][3.516:602]()
    use libpijul::pristine::{Base32, ChannelRef, Hash, Merkle, MutTxnT, RemoteRef, TxnT};
    [3.393]
    [3.52599]
    use libpijul::pristine::{Base32, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef, TxnT};
  • replacement in pijul/src/remote/mod.rs at line 350
    [3.66710][3.10655:10700]()
    hashes: &[libpijul::pristine::Hash],
    [3.66710]
    [3.10700]
    hashes: &mut tokio::sync::mpsc::Receiver<libpijul::pristine::Hash>,
  • edit in pijul/src/remote/mod.rs at line 363
    [3.68102][3.68102:68165]()
    libpijul::changestore::filesystem::pop_filename(path);
  • edit in pijul/src/remote/mod.rs at line 376
    [3.69053]
    [3.69053]
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let (hash_send, mut hash_recv) = tokio::sync::mpsc::channel(100);
    let mut change_path_ = repo.path.clone();
    change_path_.push(DOT_DIR);
    change_path_.push("changes");
    let t = tokio::spawn(async move {
    self_
    .download_changes(&mut hash_recv, &mut send, &mut change_path_, false)
    .await?;
    Ok::<_, anyhow::Error>(self_)
    });
  • replacement in pijul/src/remote/mod.rs at line 390
    [3.69111][3.188:231]()
    let mut to_download_ = Vec::new();
    [3.69111]
    [3.231]
    let mut len = 0;
  • replacement in pijul/src/remote/mod.rs at line 394
    [3.402][3.402:440]()
    to_download_.push(*h)
    [3.402]
    [3.440]
    hash_send.send(*h).await?;
    len += 1
  • edit in pijul/src/remote/mod.rs at line 399
    [3.544][3.544:592]()
    let to_download = to_download_.clone();
  • edit in pijul/src/remote/mod.rs at line 400
    [3.1][3.1:293](),[3.293][3.69754:69755](),[3.11284][3.69754:69755](),[3.69754][3.69754:69755]()
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let t = tokio::spawn(async move {
    self_
    .download_changes(&to_download_, &mut send, &mut change_path_, false)
    .await?;
    Ok::<_, anyhow::Error>(self_)
    });
  • replacement in pijul/src/remote/mod.rs at line 403
    [3.611][3.611:686]()
    let p = indicatif::ProgressBar::new(to_download.len() as u64);
    [3.611]
    [3.905]
    let p = indicatif::ProgressBar::new(len);
  • replacement in pijul/src/remote/mod.rs at line 497
    [3.71469][3.71469:71529](),[3.71529][3.11285:11427](),[3.11427][3.71732:71746](),[3.71732][3.71732:71746]()
    while let Some(hash) = recv_hash.recv().await {
    self_
    .download_changes(&[hash], &mut send_signal, &mut change_path_, false)
    .await?;
    }
    [3.71428]
    [3.71908]
    self_
    .download_changes(&mut recv_hash, &mut send_signal, &mut change_path_, false)
    .await?;
  • replacement in pijul/src/remote/mod.rs at line 560
    [3.74770][3.798:855]()
    pub async fn complete_changes<T: MutTxnT + TxnTExt>(
    [3.74770]
    [3.74830]
    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
  • edit in pijul/src/remote/mod.rs at line 568
    [3.75038]
    [3.75038]
    debug!("complete changes {:?}", changes);
  • replacement in pijul/src/remote/mod.rs at line 575
    [3.75446][3.75446:75562](),[3.75562][3.11428:11562](),[3.11562][3.75825:75839](),[3.75825][3.75825:75839](),[3.75983][3.75983:76064]()
    while let Some(h) = recv_hash.recv().await {
    debug!("downloading full patch: {:?}", h);
    self_
    .download_changes(&[h], &mut send_sig, &mut changes_dir, true)
    .await?;
    }
    let result: Result<_, anyhow::Error> = Ok(self_);
    result
    [3.75405]
    [3.76064]
    self_
    .download_changes(&mut recv_hash, &mut send_sig, &mut changes_dir, true)
    .await?;
    Ok::<_, anyhow::Error>(self_)
  • edit in pijul/src/remote/mod.rs at line 599
    [3.76547]
    [3.76547]
    debug!("could not find internal for {:?}", sc);
  • replacement in pijul/src/remote/mod.rs at line 609
    [3.76925][3.8315:8360](),[3.8360][3.899:942](),[3.9672][3.899:942](),[3.899][3.899:942]()
    if txn.is_alive(&channel, &v)? {
    send_hash.send(*c).await?;
    [3.76925]
    [3.77531]
    let graph = txn.graph(&channel);
    let mut it = txn.iter_graph(graph, Some(&v))?;
    while let Some(x) = txn.next_graph(&graph, &mut it) {
    let (v, e) = x?;
    if v.change > change {
    break;
    } else if e.flag().is_alive_parent() {
    send_hash.send(*c).await?;
    break;
    }
  • replacement in pijul/src/remote/local.rs at line 129
    [3.83512][3.11727:11767]()
    c: &[libpijul::pristine::Hash],
    [3.83512]
    [3.11767]
    hashes: &mut tokio::sync::mpsc::Receiver<libpijul::pristine::Hash>,
  • replacement in pijul/src/remote/local.rs at line 133
    [3.83607][3.11872:12057]()
    for c in c {
    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, c);
    libpijul::changestore::filesystem::push_filename(&mut path, c);
    [3.83607]
    [3.12057]
    while let Some(c) = hashes.recv().await {
    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
    libpijul::changestore::filesystem::push_filename(&mut path, &c);
  • replacement in pijul/src/remote/local.rs at line 149
    [3.12700][3.157:191]()
    send.send(*c).await?;
    [3.12700]
    [3.3741]
    send.send(c).await?;
  • replacement in pijul/src/remote/http.rs at line 74
    [3.1135][3.1135:1180]()
    hashes: &[libpijul::pristine::Hash],
    [3.1135]
    [3.1180]
    hashes: &mut tokio::sync::mpsc::Receiver<libpijul::pristine::Hash>,
  • replacement in pijul/src/remote/http.rs at line 79
    [3.1338][3.1254:1316]()
    let progress = ProgressBar::new(hashes.len() as u64);
    [3.1338]
    [3.1282]
    let progress = ProgressBar::new(0 as u64);
  • replacement in pijul/src/remote/http.rs at line 87
    [3.981][3.432:458](),[3.1338][3.432:458](),[3.1365][3.432:458](),[3.1448][3.432:458](),[3.1538][3.432:458](),[3.432][3.432:458]()
    for c in hashes {
    [3.1538]
    [3.322]
    while let Some(c) = hashes.recv().await {
    progress.inc_length(1);
  • replacement in pijul/src/remote/http.rs at line 97
    [3.1774][3.1774:1798]()
    *c,
    [3.1774]
    [3.1798]
    c,
  • replacement in libpijul/src/pristine/sanakirja.rs at line 552
    [3.38323][3.38323:38422](),[3.38422][3.69108:69176]()
    fn iter_graph(&self, g: &Self::Graph) -> Result<Self::GraphCursor, TxnErr<Self::GraphError>> {
    Ok(::sanakirja::btree::cursor::Cursor::new(&self.txn, &g)?)
    [3.38323]
    [3.38539]
    fn iter_graph(
    &self,
    g: &Self::Graph,
    s: Option<&Vertex<ChangeId>>,
    ) -> Result<Self::GraphCursor, TxnErr<Self::GraphError>> {
    let mut c = ::sanakirja::btree::cursor::Cursor::new(&self.txn, &g)?;
    if let Some(s) = s {
    c.set(&self.txn, s, None)?;
    }
    Ok(c)
  • replacement in libpijul/src/pristine/sanakirja.rs at line 1643
    [2.363][2.363:445]()
    .map_err(|_| SanakirjaError::ChannelRc { c: name0.to_string() })?
    [2.363]
    [2.445]
    .map_err(|_| SanakirjaError::ChannelRc {
    c: name0.to_string(),
    })?
  • replacement in libpijul/src/pristine/mod.rs at line 350
    [3.51889][3.51889:51987]()
    fn iter_graph(&self, g: &Self::Graph) -> Result<Self::GraphCursor, TxnErr<Self::GraphError>>;
    [3.51889]
    [3.102575]
    fn iter_graph(
    &self,
    g: &Self::Graph,
    s: Option<&Vertex<ChangeId>>,
    ) -> Result<Self::GraphCursor, TxnErr<Self::GraphError>>;
  • replacement in libpijul/src/pristine/mod.rs at line 913
    [3.17878][3.54034:54090]()
    let mut cursor = txn.iter_graph(&channel).unwrap();
    [3.17878]
    [3.45536]
    let mut cursor = txn.iter_graph(&channel, None).unwrap();
  • replacement in libpijul/src/pristine/mod.rs at line 961
    [3.46782][3.54273:54329]()
    let mut cursor = txn.iter_graph(&channel).unwrap();
    [3.46782]
    [3.54329]
    let mut cursor = txn.iter_graph(&channel, None).unwrap();
  • edit in libpijul/src/pristine/edge.rs at line 53
    [32.9044]
    [32.9044]
  • edit in libpijul/src/pristine/edge.rs at line 55
    [32.9058]
    [32.9058]
    pub fn is_alive_parent(&self) -> bool {
    *self & (EdgeFlags::DELETED | EdgeFlags::PARENT) == EdgeFlags::PARENT
    }
    #[inline]