Fixing HTTP download

pmeunier
Mar 1, 2023, 3:51 PM
4HTHYIA3GLMUBUMAI7CQMZA4KE47EUXOP24XLUEYFRMOG57CJLMAC

Dependencies

  • [2] 7Z3KZV6G Retry HTTP downloads if we don't get a full patch
  • [3] H4AU6QRP New config for HTTP remotes
  • [4] IBPVOKM5 Fixing a bug in patch download
  • [5] K6GWUOD5 Styling progress bars
  • [6] 5QTMRUXN Fixing a race condition between progress bars
  • [7] WTZXEWY7 Flushing the futures pipeline when downloading over HTTP(S)
  • [8] GNMZNKB4 Cursors cleanup
  • [9] 2D7P2VKJ Change completions (where the whole progress bar story started)
  • [10] I52XSRUH Massive cleanup, and simplification
  • [11] 4XLHUME7 Fixing a interlocking when cloning from a particular patch
  • [12] IQ4FCHPZ HTTP connections: pooling + retry on error
  • [13] FBXYP7QM Forgot to add remote::http
  • [14] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [15] Q45QHPO4 Feedback on network stuff
  • [16] VBMXB443 Retrying if the HTTP connection drops while reading the body
  • [17] C3L2TLQW When downloading changes, check whether we have their dependencies and download them too
  • [18] BNPSVXIC Friendlier progress bars
  • [19] HXEIH4UQ Pulling more than 100 changes at once
  • [20] TKEVOH7H Fixing a bug when downloading changes, and making change download more efficient (more async)
  • [*] X6YFD4WV Do not download changes if we already have them
  • [*] DO2Y5TY5 Tag synchronisation
  • [*] OIOMXESD Better error handling in HTTP

Change contents

  • edit in pijul/src/remote/mod.rs at line 1165
    [4.77]
    [22.231]
    let mut asked = HashSet::new();
  • edit in pijul/src/remote/mod.rs at line 1176
    [22.454]
    [4.119]
    asked.insert(*h);
  • replacement in pijul/src/remote/mod.rs at line 1183
    [4.200][4.200:285]()
    .download_changes_rec(repo, hash_send, recv, send_ready, pro_a, waiting)
    [4.200]
    [5.237]
    .download_changes_rec(repo, hash_send, recv, send_ready, pro_a, waiting, asked)
  • edit in pijul/src/remote/mod.rs at line 1253
    [5.850]
    [4.393]
    mut asked: HashSet<CS>,
  • edit in pijul/src/remote/mod.rs at line 1264
    [5.1407]
    [5.1407]
    debug!("received {:?} {:?}", hash, follow);
  • replacement in pijul/src/remote/mod.rs at line 1282
    [5.2403][5.2403:2514]()
    send_hash.send(CS::Change(dep))?;
    waiting += 1
    [5.2403]
    [5.2514]
    if asked.insert(CS::Change(dep)) {
    send_hash.send(CS::Change(dep))?;
    waiting += 1
    }
  • edit in pijul/src/remote/mod.rs at line 1303
    [5.3116]
    [5.3116]
    info!("waiting loop done");
  • edit in pijul/src/remote/mod.rs at line 1346
    [5.345]
    [5.71943]
    let mut asked = HashSet::new();
  • edit in pijul/src/remote/mod.rs at line 1350
    [23.17706]
    [5.72012]
    asked.insert(CS::Change(h));
  • replacement in pijul/src/remote/mod.rs at line 1356
    [4.593][4.593:685]()
    .download_changes_rec(repo, send_hash, recv_signal, send_ready, pro_n, waiting)
    [4.593]
    [5.3566]
    .download_changes_rec(repo, send_hash, recv_signal, send_ready, pro_n, waiting, asked)
  • edit in pijul/src/remote/http.rs at line 50
    [2.38]
    [24.213]
    debug!("waiting chunk {:?}", c);
  • edit in pijul/src/remote/http.rs at line 61
    [24.596]
    [24.596]
    debug!("waiting chunk {:?}", c);
  • edit in pijul/src/remote/http.rs at line 63
    [24.606]
    [2.39]
    debug!("done chunk {:?}", c);
  • edit in pijul/src/remote/http.rs at line 126
    [24.1304]
    [24.1304]
    debug!("renaming {:?} {:?}", c, done);
  • edit in pijul/src/remote/http.rs at line 137
    [24.1391]
    [5.856]
    debug!("download_change returning {:?}", c);
  • replacement in pijul/src/remote/http.rs at line 152
    [5.981][5.1461:1513]()
    let mut pool = <[_; POOL_SIZE]>::default();
    [5.1338]
    [5.1513]
    debug!("starting download_changes http");
    let mut pool: [Option<tokio::task::JoinHandle<Result<CS, _>>>; POOL_SIZE] =
    <[_; POOL_SIZE]>::default();
  • replacement in pijul/src/remote/http.rs at line 156
    [5.1538][5.3222:3272](),[5.3308][5.322:365](),[5.458][5.322:365](),[5.1395][5.1539:1740](),[5.1740][3.2291:2333](),[3.2333][5.1740:1774](),[5.1740][5.1740:1774](),[5.1774][5.3309:3332](),[5.3332][5.1798:1867](),[5.1798][5.1798:1867]()
    while let Some(c) = hashes.recv().await {
    debug!("downloading {:?}", c);
    let t = std::mem::replace(
    &mut pool[cur],
    Some(tokio::spawn(download_change(
    self.client.clone(),
    self.url.clone(),
    self.headers.clone(),
    path.clone(),
    c,
    ))),
    );
    if let Some(t) = t {
    [5.1538]
    [5.4977]
    loop {
    if let Some(t) = pool[cur].take() {
  • replacement in pijul/src/remote/http.rs at line 159
    [5.5034][5.867:902](),[5.1867][5.867:902](),[5.902][5.5035:5078]()
    let c = t.await??;
    debug!("sending {:?}", c);
    [5.5034]
    [5.2950]
    let c_ = t.await.unwrap().unwrap();
    debug!("sending {:?}", c_);
  • replacement in pijul/src/remote/http.rs at line 162
    [5.3019][5.1650:1707](),[5.951][5.1944:1991](),[5.1707][5.1944:1991](),[5.1944][5.1944:1991]()
    if send.send((c, true)).await.is_err() {
    debug!("err for {:?}", c);
    [5.3019]
    [5.2031]
    if send.send((c_, true)).await.is_err() {
    debug!("err for {:?}", c_);
  • replacement in pijul/src/remote/http.rs at line 166
    [5.2076][5.5139:5171]()
    debug!("sent");
    [5.2076]
    [5.1335]
    debug!("sent {:?}", c_);
    continue
    }
    let mut next = cur;
    for i in 1..POOL_SIZE {
    if pool[(cur + i) % POOL_SIZE].is_some() {
    next = (cur + i) % POOL_SIZE;
    break
    }
  • replacement in pijul/src/remote/http.rs at line 176
    [5.1349][5.2077:2118](),[5.2118][5.96:239](),[5.239][5.5172:5215](),[5.5215][5.3020:3089](),[5.3089][5.1708:1765](),[5.1765][5.288:335](),[5.288][5.288:335]()
    cur = (cur + 1) % POOL_SIZE;
    }
    for f in 0..POOL_SIZE {
    if let Some(t) = pool[(cur + f) % POOL_SIZE].take() {
    let c = t.await??;
    debug!("sending {:?}", c);
    super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
    if send.send((c, true)).await.is_err() {
    debug!("err for {:?}", c);
    [5.1349]
    [5.375]
    if next == cur {
    if let Some(c) = hashes.recv().await {
    debug!("downloading on process {:?}: {:?}", cur, c);
    pool[cur] = Some(tokio::spawn(download_change(
    self.client.clone(),
    self.url.clone(),
    self.headers.clone(),
    path.clone(),
    c,
    )));
    cur = (cur + 1) % POOL_SIZE;
    } else {
  • replacement in pijul/src/remote/http.rs at line 190
    [5.420][5.5276:5308]()
    debug!("sent");
    [5.420]
    [5.420]
    } else {
    tokio::select! {
    c = hashes.recv() => {
    if let Some(c) = c {
    debug!("downloading on process {:?}: {:?}", cur, c);
    pool[cur] = Some(tokio::spawn(download_change(
    self.client.clone(),
    self.url.clone(),
    self.headers.clone(),
    path.clone(),
    c,
    )));
    cur = (cur + 1) % POOL_SIZE;
    } else {
    break;
    }
    }
    c = pool[next].as_mut().unwrap() => {
    pool[next] = None;
    let c = c??;
    super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
    if send.send((c, true)).await.is_err() {
    debug!("err for {:?}", c);
    break;
    }
    }
    }