Fixing a panic in pull

[?]
Jan 4, 2021, 5:19 PM
KI2AFWOSN3PTGBGYQ7UKHFOZERZWEUWQ4AQNADG5S4QDJ53ESXFAC

Dependencies

  • [2] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [3] 76PCXGML Pushing to, and pulling from the local repository
  • [4] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [5] I52XSRUH Massive cleanup, and simplification
  • [6] UDHP4ZVB Fixing SSH asynchronicity issues
  • [*] FBXYP7QM Forgot to add remote::http
  • [*] MU5GSJAW Partial push and pull (WARNING: breaks the existing protocol)

Change contents

  • replacement in pijul/src/remote/mod.rs at line 259
    [3.59320][3.59320:59915](),[3.59915][2.8770:8829](),[2.8829][3.60067:60604](),[3.60067][3.60067:60604]()
    RemoteRepo::Http(ref h) => {
    debug!("get_state {:?}", h.url);
    let url = format!("{}/{}", h.url, DOT_DIR);
    let q = if let Some(mid) = mid {
    [
    ("state", format!("{}", mid)),
    ("channel", h.channel.clone()),
    ]
    } else {
    [("state", String::new()), ("channel", h.channel.clone())]
    };
    let res = h.client.get(&url).query(&q).send().await?;
    if !res.status().is_success() {
    bail!("HTTP error {:?}", res.status())
    }
    let resp = res.bytes().await?;
    let resp = std::str::from_utf8(&resp)?;
    debug!("resp = {:?}", resp);
    let mut s = resp.split(' ');
    if let (Some(n), Some(m)) = (
    s.next().and_then(|s| s.parse().ok()),
    s.next().and_then(|m| Merkle::from_base32(m.as_bytes())),
    ) {
    Ok(Some((n, m)))
    } else {
    Ok(None)
    }
    }
    [3.59320]
    [3.1926]
    RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
  • replacement in pijul/src/remote/mod.rs at line 275
    [3.60807][3.60807:60825]()
    mut w: W,
    [3.60807]
    [3.60825]
    w: W,
  • replacement in pijul/src/remote/mod.rs at line 294
    [3.61877][3.61877:62677](),[3.62677][2.9111:9170](),[2.9170][3.62829:63484](),[3.62829][3.62829:63484]()
    RemoteRepo::Http(ref h) => {
    let url = h.url.clone() + "/" + DOT_DIR;
    let res = h.client.get(&url).query(&[("channel", &h.channel)]);
    let res = if let Some((ref state, ref extra)) = state {
    let mut q = vec![("archive".to_string(), state.to_base32())];
    if let Some(pre) = prefix {
    q.push(("outputPrefix".to_string(), pre));
    }
    for e in extra.iter() {
    q.push(("change".to_string(), e.to_base32()))
    }
    res.query(&q)
    } else {
    res
    };
    let res = res.send().await?;
    if !res.status().is_success() {
    bail!("HTTP error {:?}", res.status())
    }
    use futures_util::StreamExt;
    let mut stream = res.bytes_stream();
    let mut conflicts = 0;
    let mut n = 0;
    while let Some(item) = stream.next().await {
    let item = item?;
    let mut off = 0;
    while n < 8 && off < item.len() {
    conflicts = (conflicts << 8) | (item[off] as u64);
    off += 1;
    n += 1
    }
    w.write_all(&item[off..])?;
    }
    Ok(conflicts as u64)
    }
    [3.61877]
    [3.2197]
    RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
  • edit in pijul/src/remote/http.rs at line 212
    [9.6503]
    [9.6503]
    }
    pub async fn get_state(
    &mut self,
    mid: Option<u64>,
    ) -> Result<Option<(u64, libpijul::Merkle)>, anyhow::Error> {
    debug!("get_state {:?}", self.url);
    let url = format!("{}/{}", self.url, super::DOT_DIR);
    let q = if let Some(mid) = mid {
    [
    ("state", format!("{}", mid)),
    ("channel", self.channel.clone()),
    ]
    } else {
    [("state", String::new()), ("channel", self.channel.clone())]
    };
    let res = self.client.get(&url).query(&q).send().compat().await?;
    if !res.status().is_success() {
    bail!("HTTP error {:?}", res.status())
    }
    let resp = res.bytes().compat().await?;
    let resp = std::str::from_utf8(&resp)?;
    debug!("resp = {:?}", resp);
    let mut s = resp.split(' ');
    if let (Some(n), Some(m)) = (
    s.next().and_then(|s| s.parse().ok()),
    s.next()
    .and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
    ) {
    Ok(Some((n, m)))
    } else {
    Ok(None)
    }
    }
    pub async fn archive<W: std::io::Write + Send + 'static>(
    &mut self,
    prefix: Option<String>,
    state: Option<(libpijul::Merkle, &[Hash])>,
    mut w: W,
    ) -> Result<u64, anyhow::Error> {
    let url = self.url.clone() + "/" + super::DOT_DIR;
    let res = self.client.get(&url).query(&[("channel", &self.channel)]);
    let res = if let Some((ref state, ref extra)) = state {
    let mut q = vec![("archive".to_string(), state.to_base32())];
    if let Some(pre) = prefix {
    q.push(("outputPrefix".to_string(), pre));
    }
    for e in extra.iter() {
    q.push(("change".to_string(), e.to_base32()))
    }
    res.query(&q)
    } else {
    res
    };
    let res = res.send().compat().await?;
    if !res.status().is_success() {
    bail!("HTTP error {:?}", res.status())
    }
    use futures_util::StreamExt;
    let mut stream = res.bytes_stream();
    let mut conflicts = 0;
    let mut n = 0;
    while let Some(item) = stream.next().compat().await {
    let item = item?;
    let mut off = 0;
    while n < 8 && off < item.len() {
    conflicts = (conflicts << 8) | (item[off] as u64);
    off += 1;
    n += 1
    }
    w.write_all(&item[off..])?;
    }
    Ok(conflicts as u64)