Fixing async issues in downloads

[?]
Feb 27, 2021, 9:25 PM
3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC

Dependencies

  • [2] WTZXEWY7 Flushing the futures pipeline when downloading over HTTP(S)
  • [3] TKEVOH7H Fixing a bug when downloading changes, and making change download more efficient (more async)
  • [4] KTTKF3RW Locking stderr and the progress bar in SSH
  • [5] K6GWUOD5 Styling progress bars
  • [6] UDHP4ZVB Fixing SSH asynchronicity issues
  • [7] BZSC7VMY address clippy lints
  • [8] Q45QHPO4 Feedback on network stuff
  • [9] 5QTMRUXN Fixing a race condition between progress bars
  • [10] SZWBLWZ4 Reading ~/.ssh/config
  • [11] WLUID7NA Do not block when downloading more than 100 changes over SSH
  • [12] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [13] X6YFD4WV Do not download changes if we already have them
  • [*] HXEIH4UQ Pulling more than 100 changes at once

Change contents

  • edit in pijul/src/remote/ssh.rs at line 274
    [4.492]
    [4.492]
    final_path: PathBuf,
  • edit in pijul/src/remote/ssh.rs at line 428
    [4.2696]
    [4.2696]
    ref mut final_path,
  • replacement in pijul/src/remote/ssh.rs at line 449
    [4.3643][4.3643:3706]()
    let mut final_path = path.clone();
    [4.3643]
    [4.3706]
    libpijul::changestore::filesystem::push_filename(
    final_path,
    &hashes[*current],
    );
  • replacement in pijul/src/remote/ssh.rs at line 456
    [4.3847][4.3847:3913]()
    std::fs::rename(&path, &final_path)?;
    [4.3847]
    [4.3913]
    std::fs::create_dir_all(&final_path.parent().unwrap())?;
    let r = std::fs::rename(&path, &final_path);
    libpijul::changestore::filesystem::pop_filename(final_path);
    r?;
  • replacement in pijul/src/remote/ssh.rs at line 469
    [4.4287][4.4287:4400](),[4.4400][3.210:276](),[3.276][4.4400:4701](),[4.4400][4.4400:4701](),[4.4701][3.277:342](),[3.342][4.4701:4784](),[4.4701][4.4701:4784](),[4.4784][2.0:59](),[2.59][4.4840:4908](),[4.4840][4.4840:4908]()
    // If we're still waiting for
    // another change.
    debug!("before pop: {:?}", path);
    libpijul::changestore::filesystem::pop_filename(path);
    libpijul::changestore::filesystem::push_filename(
    path,
    &hashes[*current],
    );
    debug!("after pop: {:?}", path);
    std::fs::create_dir_all(&path.parent().unwrap())?;
    path.set_extension("tmp");
    debug!("creating file {:?}", path);
    [4.4287]
    [4.4908]
    // If we're still waiting for another
    // change.
  • replacement in pijul/src/remote/ssh.rs at line 813
    [4.8923][4.8923:8967](),[4.9043][4.46489:46548](),[4.46489][4.46489:46548](),[4.46548][2.60:95]()
    let mut path = changes_dir.clone();
    std::fs::create_dir_all(&path.parent().unwrap())?;
    path.set_extension("tmp");
    [4.8923]
    [4.9044]
    let path = changes_dir.join("tmp");
    std::fs::create_dir_all(&changes_dir)?;
  • edit in pijul/src/remote/ssh.rs at line 816
    [4.9094][3.609:661]()
    debug!("changes_dir = {:?}", changes_dir);
  • edit in pijul/src/remote/ssh.rs at line 820
    [4.9229]
    [4.9229]
    final_path: changes_dir.clone(),
  • edit in pijul/src/remote/ssh.rs at line 827
    [3.720][3.720:898](),[3.898][4.9371:9743](),[4.9371][4.9371:9743](),[4.9743][3.899:921](),[3.921][4.9743:9753](),[4.9743][4.9743:9753]()
    while let Some(c) = c.recv().await {
    if let State::Changes { ref mut hashes, .. } = *self.state.lock().await {
    hashes.push(c);
    }
    debug!("download_change {:?} {:?}", c, full);
    if full {
    self.c
    .data(format!("change {}\n", c.to_base32()).as_bytes())
    .await?;
    } else {
    self.c
    .data(format!("partial {}\n", c.to_base32()).as_bytes())
    .await?;
    }
    len += 1;
    }
  • replacement in pijul/src/remote/ssh.rs at line 833
    [4.135][4.9753:9854](),[4.166][4.9753:9854](),[4.840][4.9753:9854](),[4.9753][4.9753:9854](),[4.9854][4.841:957](),[4.957][4.427:533](),[4.165][4.427:533]()
    while let Some(_hash) = recv.recv().await {
    debug!("received hash {:?}", _hash);
    if let Some(ref mut progress) = *PROGRESS.lock().await {
    progress.inc(1);
    }
    if let Some(ref mut sender) = sender {
    if sender.send(_hash).await.is_err() {
    [4.840]
    [4.958]
    let mut dropped = false;
    loop {
    tokio::select! {
    x = recv.recv() => {
    let hash = if let Some(hash) = x {
    debug!("received hash {:?}", hash);
    hash
    } else {
    debug!("finished");
    break
    };
  • replacement in pijul/src/remote/ssh.rs at line 846
    [4.1035][4.1035:1079]()
    progress.abandon();
    [4.1035]
    [4.1079]
    progress.inc(1);
  • replacement in pijul/src/remote/ssh.rs at line 848
    [4.1101][4.573:600](),[4.573][4.573:600]()
    break;
    [4.1101]
    [4.600]
    if let Some(ref mut sender) = sender {
    if sender.send(hash).await.is_err() {
    if let Some(ref mut progress) = *PROGRESS.lock().await {
    progress.abandon();
    }
    break;
    }
    }
    }
    x = c.recv(), if !dropped => {
    let c = if let Some(c) = x {
    c
    } else {
    debug!("other end dropped");
    dropped = true;
    if len == 0 {
    break
    } else {
    continue
    }
    };
    if let State::Changes { ref mut hashes, .. } = *self.state.lock().await {
    hashes.push(c);
    }
    debug!("download_change {:?} {:?}", c, full);
    if full {
    self.c
    .data(format!("change {}\n", c.to_base32()).as_bytes())
    .await?;
    } else {
    self.c
    .data(format!("partial {}\n", c.to_base32()).as_bytes())
    .await?;
    }
    if let Some(ref mut p) = *PROGRESS.lock().await {
    p.inc_length(1)
    }
    len += 1;
  • edit in pijul/src/remote/mod.rs at line 399
    [4.544]
    [15.0]
    std::mem::drop(hash_send);