HTTP connections: pooling + retry on error

[?]
Dec 24, 2020, 10:49 AM
IQ4FCHPZYGTZHCQHUIRCMUI5LCHIDSJCM2AZXGRJARWLCPPLXZOQC

Dependencies

  • [2] KQTD46KV Unrecord: restore files *after* having unapplied the *entire* change
  • [3] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)
  • [4] Q45QHPO4 Feedback on network stuff
  • [5] K6GWUOD5 Styling progress bars
  • [6] FBXYP7QM Forgot to add remote::http
  • [7] I52XSRUH Massive cleanup, and simplification
  • [8] 367UBQ6K Forwarding SSH stderr, and progress bar for push
  • [9] HXEIH4UQ Pulling more than 100 changes at once
  • [10] 5QTMRUXN Fixing a race condition between progress bars

Change contents

  • replacement in pijul/src/remote/http.rs at line 8
    [3.5100][4.779:795](),[4.1253][4.779:795]()
    use log::debug;
    [3.5100]
    [4.87]
    use log::{debug, error};
  • edit in pijul/src/remote/http.rs at line 15
    [4.207]
    [4.207]
    }
    async fn download_change(
    client: reqwest::Client,
    url: String,
    mut path: PathBuf,
    c: libpijul::pristine::Hash,
    ) -> Result<(), anyhow::Error> {
    libpijul::changestore::filesystem::push_filename(&mut path, &c);
    std::fs::create_dir_all(&path.parent().unwrap())?;
    let path_ = path.with_extension("tmp");
    let mut f = std::fs::File::create(&path_)?;
    libpijul::changestore::filesystem::pop_filename(&mut path);
    let c32 = c.to_base32();
    let url = format!("{}/{}", url, super::DOT_DIR);
    let mut delay = 1f64;
    let mut res = loop {
    let res = if let Ok(res) = client.get(&url).query(&[("change", &c32)]).send().await {
    res
    } else {
    error!("HTTP error, retrying in {} seconds", delay.round());
    tokio::time::delay_for(std::time::Duration::from_secs_f64(delay)).await;
    delay *= 2.;
    continue;
    };
    debug!("response {:?}", res);
    if res.status().is_success() {
    break res;
    } else {
    return Err((crate::Error::Http {
    status: res.status(),
    })
    .into());
    }
    };
    while let Some(chunk) = res.chunk().await? {
    debug!("writing {:?}", chunk.len());
    f.write_all(&chunk)?;
    }
    std::fs::rename(&path_, &path_.with_extension("change"))?;
    Ok(())
  • edit in pijul/src/remote/http.rs at line 57
    [4.209]
    [4.209]
    const POOL_SIZE: usize = 20;
  • replacement in pijul/src/remote/http.rs at line 65
    [4.1252][4.1252:1280]()
    path: &mut PathBuf,
    [4.1252]
    [4.1280]
    path: &PathBuf,
  • edit in pijul/src/remote/http.rs at line 74
    [4.981]
    [4.432]
    let mut pool = <[_; POOL_SIZE]>::default();
    let mut cur = 0;
  • edit in pijul/src/remote/http.rs at line 78
    [4.365][4.1339:1410](),[4.458][4.1339:1410](),[4.1410][4.576:639](),[4.576][4.576:639](),[4.639][4.366:474](),[4.474][4.694:864](),[4.694][4.694:864](),[4.864][4.1411:1585](),[4.1585][4.475:517](),[4.1585][4.953:1107](),[4.517][4.953:1107](),[4.953][4.953:1107](),[4.1107][4.1586:1612](),[4.1612][4.1144:1215](),[4.1144][4.1144:1215](),[4.1215][4.518:571](),[4.571][4.1215:1267](),[4.1215][4.1215:1267](),[4.1267][4.572:643]()
    libpijul::changestore::filesystem::push_filename(path, c);
    std::fs::create_dir_all(&path.parent().unwrap())?;
    let path_ = path.with_extension("tmp");
    let mut f = std::fs::File::create(&path_)?;
    libpijul::changestore::filesystem::pop_filename(path);
    let c32 = c.to_base32();
    let url = format!("{}/{}", self.url, super::DOT_DIR);
    let mut res = self
    .client
    .get(&url)
    .query(&[("change", c32)])
    .send()
    .await?;
    debug!("response {:?}", res);
    if !res.status().is_success() {
    return Err((crate::Error::Http {
    status: res.status(),
    })
    .into());
    }
    while let Some(chunk) = res.chunk().await? {
    debug!("writing {:?}", chunk.len());
    f.write_all(&chunk)?;
    }
    std::fs::rename(&path_, &path_.with_extension("change"))?;
  • replacement in pijul/src/remote/http.rs at line 79
    [4.1395][4.1267:1313](),[4.1267][4.1267:1313](),[4.1313][2.0:43](),[2.43][4.1396:1432](),[4.1313][4.1396:1432](),[4.1432][4.1613:1636](),[4.1313][4.1613:1636]()
    if send.send(*c).await.is_err() {
    debug!("err for {:?}", c);
    progress.abandon();
    break;
    [4.1395]
    [4.1335]
    let t = std::mem::replace(
    &mut pool[cur],
    Some(tokio::spawn(download_change(
    self.client.clone(),
    self.url.clone(),
    path.clone(),
    *c,
    ))),
    );
    if let Some(t) = t {
    t.await??;
    if send.send(*c).await.is_err() {
    debug!("err for {:?}", c);
    progress.abandon();
    break;
    }
  • edit in pijul/src/remote/http.rs at line 96
    [4.1349]
    [4.1349]
    cur = (cur + 1) % POOL_SIZE;