Fixing SSH asynchronicity issues

[?]
Nov 23, 2020, 5:59 PM
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC

Dependencies

  • [2] 76PCXGML Pushing to, and pulling from the local repository
  • [3] XWETQ4DE Upgrading versions
  • [4] L4JXJHWX pijul/*: reorganize imports and remove extern crate
  • [5] RR65HCKO Thrussh versions
  • [6] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [7] 3S4DR77Z Version updates
  • [8] OUWD436A Version bump
  • [9] Q7CAYX5N Fixing Windows compilation
  • [10] NX5I5H53 New published versions

Change contents

  • edit in pijul/src/remote/ssh.rs at line 3
    [6.25699]
    [6.25699]
    use std::pin::Pin;
  • edit in pijul/src/remote/ssh.rs at line 14
    [6.25788]
    [6.25788]
    use tokio::sync::Mutex;
  • edit in pijul/src/remote/ssh.rs at line 28
    [6.26002]
    [6.26002]
    state: Arc<Mutex<State>>,
  • edit in pijul/src/remote/ssh.rs at line 88
    [6.27527]
    [6.27527]
    let state = Arc::new(Mutex::new(State::None));
  • edit in pijul/src/remote/ssh.rs at line 93
    [6.27683]
    [6.27683]
    state: state.clone(),
  • edit in pijul/src/remote/ssh.rs at line 126
    [6.28790]
    [6.28790]
    state,
  • edit in pijul/src/remote/ssh.rs at line 230
    [6.32378]
    [6.32378]
    state: Arc<Mutex<State>>,
    }
    enum State {
    None,
    State {
    sender: Option<tokio::sync::oneshot::Sender<Option<(u64, Merkle)>>>,
    },
    Changes {
    sender: Option<tokio::sync::mpsc::Sender<Hash>>,
    remaining_len: usize,
    file: std::fs::File,
    path: PathBuf,
    hashes: Vec<libpijul::pristine::Hash>,
    current: usize,
    },
    Changelist {
    sender: tokio::sync::mpsc::Sender<Option<(u64, Hash, Merkle)>>,
    },
    Channel {
    sender: tokio::sync::mpsc::Sender<Vec<u8>>,
    },
    Archive {
    sender: Option<tokio::sync::oneshot::Sender<u64>>,
    len: u64,
    conflicts: u64,
    len_n: u64,
    w: Box<dyn Write + Send>,
    },
  • replacement in pijul/src/remote/ssh.rs at line 263
    [6.32510][6.32510:32596]()
    type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
    [6.32510]
    [6.32596]
    type FutureUnit = Pin<
    Box<dyn futures::future::Future<Output = Result<(Self, Session), anyhow::Error>> + Send>,
    >;
    // type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
  • replacement in pijul/src/remote/ssh.rs at line 272
    [6.32768][6.32768:32820]()
    futures::future::ready(Ok((self, session)))
    [6.32768]
    [6.32820]
    // futures::future::ready(Ok((self, session)))
    Box::pin(async move { Ok((self, session)) })
  • edit in pijul/src/remote/ssh.rs at line 314
    [6.34230]
    [6.34230]
    }
    fn data(
    self,
    channel: thrussh::ChannelId,
    data: &[u8],
    session: thrussh::client::Session,
    ) -> Self::FutureUnit {
    debug!("data {:?} {:?}", channel, data.len());
    let data = data.to_vec();
    Box::pin(async move {
    match *self.state.lock().await {
    State::State { ref mut sender } => {
    debug!("state: State");
    if let Some(sender) = sender.take() {
    // If we can't parse `data` (for example if the
    // remote returns the standard "-\n"), this
    // returns None.
    let mut s = std::str::from_utf8(&data).unwrap().split(' ');
    debug!("s = {:?}", s);
    if let (Some(n), Some(m)) = (s.next(), s.next()) {
    let n = n.parse().unwrap();
    sender
    .send(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())))
    .unwrap_or(());
    } else {
    sender.send(None).unwrap_or(());
    }
    }
    }
    State::Changes {
    ref mut sender,
    ref mut remaining_len,
    ref mut file,
    ref mut path,
    ref hashes,
    ref mut current,
    } => {
    debug!("state changes");
    let mut p = 0;
    while p < data.len() {
    if *remaining_len == 0 {
    *remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;
    p += 8;
    debug!("remaining_len = {:?}", remaining_len);
    }
    if data.len() >= p + *remaining_len {
    file.write_all(&data[p..p + *remaining_len])?;
    // We have enough data to write the
    // file, write it and move to the next
    // file.
    p += *remaining_len;
    *remaining_len = 0;
    file.flush()?;
    let mut final_path = path.clone();
    final_path.set_extension("change");
    debug!("moving {:?} to {:?}", path, final_path);
    std::fs::rename(&path, &final_path)?;
    debug!("sending");
    if let Some(ref mut sender) = sender {
    sender.send(hashes[*current].clone()).await.unwrap();
    }
    debug!("sent");
    *current += 1;
    if *current < hashes.len() {
    // If we're still waiting for
    // another change.
    libpijul::changestore::filesystem::pop_filename(path);
    libpijul::changestore::filesystem::push_filename(
    path,
    &hashes[*current],
    );
    std::fs::create_dir_all(&path.parent().unwrap())?;
    path.set_extension("");
    debug!("creating file {:?}", path);
    *file = std::fs::File::create(&path)?;
    } else {
    // Else, just finish.
    debug!("dropping channel");
    *sender = None;
    break;
    }
    } else {
    // not enough data, we need more.
    file.write_all(&data[p..])?;
    file.flush()?;
    *remaining_len -= data.len() - p;
    debug!("need more data");
    break;
    }
    }
    debug!("finished, {:?} {:?}", p, data.len());
    }
    State::Changelist { ref mut sender } => {
    debug!("state changelist");
    if &data[..] == b"\n" {
    debug!("log done");
    sender.send(None).await.unwrap_or(())
    } else if let Ok(data) = std::str::from_utf8(&data) {
    for l in data.lines() {
    if !l.is_empty() {
    debug!("line = {:?}", l);
    let (n, h, m) = parse_line(l)?;
    sender.send(Some((n, h, m))).await.unwrap_or(());
    } else {
    sender.send(None).await.unwrap_or(());
    }
    }
    }
    }
    State::Channel { ref mut sender } => sender.send(data).await?,
    State::Archive { ref mut sender, ref mut w, ref mut len, ref mut len_n, ref mut conflicts } => {
    let mut off = 0;
    while *len_n < 16 && off < data.len() {
    if *len_n < 8 {
    *len = (*len << 8) | (data[off] as u64);
    } else {
    *conflicts = (*conflicts << 8) | (data[off] as u64);
    }
    *len_n += 1;
    off += 1;
    }
    if *len_n >= 16 {
    w.write_all(&data[off..])?;
    *len -= (data.len() - off) as u64;
    if *len == 0 {
    if let Some(sender) = sender.take() {
    sender.send(*conflicts).unwrap_or(())
    }
    }
    }
    }
    State::None => {
    debug!("None state");
    }
    }
    Ok((self, session))
    })
  • edit in pijul/src/remote/ssh.rs at line 515
    [6.36250]
    [6.36250]
    debug!("get_state");
    let (sender, receiver) = tokio::sync::oneshot::channel();
    *self.state.lock().await = State::State {
    sender: Some(sender),
    };
  • replacement in pijul/src/remote/ssh.rs at line 530
    [6.36582][6.36582:37991]()
    while let Some(msg) = self.c.wait().await {
    match msg {
    thrussh::ChannelMsg::Data { data } => {
    // If we can't parse `data` (for example if the
    // remote returns the standard "-\n"), this
    // returns None.
    let mut s = std::str::from_utf8(&data)?.split(' ');
    debug!("s = {:?}", s);
    if let (Some(n), Some(m)) = (s.next(), s.next()) {
    let n = n.parse().unwrap();
    return Ok(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())));
    } else {
    break;
    }
    }
    thrussh::ChannelMsg::ExtendedData { data, ext } => {
    if ext == 1 {
    debug!("{:?}", std::str::from_utf8(&data))
    }
    }
    thrussh::ChannelMsg::Eof => {}
    thrussh::ChannelMsg::ExitStatus { exit_status } => {
    if exit_status != 0 {
    return Err((Error::RemoteExit {
    status: exit_status,
    })
    .into());
    }
    }
    msg => panic!("wrong message {:?}", msg),
    }
    }
    Ok(None)
    [6.36582]
    [6.37991]
    Ok(receiver.await?)
  • replacement in pijul/src/remote/ssh.rs at line 533
    [6.37998][6.37998:38043]()
    pub async fn archive<W: std::io::Write>(
    [6.37998]
    [6.38043]
    pub async fn archive<W: std::io::Write + Send + 'static>(
  • replacement in pijul/src/remote/ssh.rs at line 537
    [6.38136][6.38136:38154]()
    mut w: W,
    [6.38136]
    [6.38154]
    w: W,
  • edit in pijul/src/remote/ssh.rs at line 539
    [6.38192]
    [6.38192]
    debug!("archive");
    let (sender, receiver) = tokio::sync::oneshot::channel();
    *self.state.lock().await = State::Archive {
    sender: Some(sender),
    len: 0,
    conflicts: 0,
    len_n: 0,
    w: Box::new(w),
    };
  • edit in pijul/src/remote/ssh.rs at line 572
    [6.39070][6.39070:40659]()
    }
    let mut len = 0;
    let mut conflicts = 0;
    let mut len_n = 0;
    while let Some(msg) = self.c.wait().await {
    match msg {
    thrussh::ChannelMsg::Data { data } => {
    let mut off = 0;
    while len_n < 16 && off < data.len() {
    if len_n < 8 {
    len = (len << 8) | (data[off] as u64);
    } else {
    conflicts = (conflicts << 8) | (data[off] as u64);
    }
    len_n += 1;
    off += 1;
    }
    if len_n >= 16 {
    w.write_all(&data[off..])?;
    len -= (data.len() - off) as u64;
    if len == 0 {
    break;
    }
    }
    }
    thrussh::ChannelMsg::ExtendedData { data, ext } => {
    if ext == 1 {
    debug!("{:?}", std::str::from_utf8(&data))
    }
    }
    thrussh::ChannelMsg::Eof => {}
    thrussh::ChannelMsg::ExitStatus { exit_status } => {
    if exit_status != 0 {
    return Err((Error::RemoteExit {
    status: exit_status,
    })
    .into());
    }
    }
    msg => panic!("wrong message {:?}", msg),
    }
  • edit in pijul/src/remote/ssh.rs at line 573
    [6.40669]
    [6.40669]
    let conflicts = receiver.await.unwrap_or(0);
  • edit in pijul/src/remote/ssh.rs at line 621
    [6.42222]
    [6.42222]
    let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
    *self.state.lock().await = State::Changelist {
    sender,
    };
  • replacement in pijul/src/remote/ssh.rs at line 635
    [6.42593][6.42593:44262]()
    'msg: while let Some(msg) = self.c.wait().await {
    debug!("msg = {:?}", msg);
    match msg {
    thrussh::ChannelMsg::Data { data } => {
    if &data[..] == b"\n" {
    debug!("log done");
    break;
    } else if let Ok(data) = std::str::from_utf8(&data) {
    for l in data.lines() {
    if !l.is_empty() {
    debug!("line = {:?}", l);
    let (n, h, m) = parse_line(l)?;
    txn.put_remote(remote, n, (h, m))?;
    } else {
    break 'msg;
    }
    }
    }
    }
    thrussh::ChannelMsg::ExtendedData { data, ext } => {
    debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));
    /*return Err((crate::Error::Remote {
    msg: std::str::from_utf8(&data[..]).unwrap().to_string()
    }).into())*/
    }
    thrussh::ChannelMsg::WindowAdjusted { .. } => {}
    thrussh::ChannelMsg::Eof => {}
    thrussh::ChannelMsg::ExitStatus { exit_status } => {
    if exit_status != 0 {
    return Err((Error::RemoteExit {
    status: exit_status,
    })
    .into());
    }
    }
    msg => panic!("wrong message {:?}", msg),
    }
    [6.42593]
    [6.44262]
    while let Some(Some((n, h, m))) = receiver.recv().await {
    txn.put_remote(remote, n, (h, m))?;
  • replacement in pijul/src/remote/ssh.rs at line 672
    [6.45411][6.45411:45451]()
    pub async fn start_change_download(
    [6.45411]
    [6.45451]
    pub async fn download_changes(
  • replacement in pijul/src/remote/ssh.rs at line 674
    [6.45470][6.45470:45507]()
    c: libpijul::pristine::Hash,
    [6.45470]
    [6.45507]
    c: &[libpijul::pristine::Hash],
    sender: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
    changes_dir: &mut PathBuf,
  • replacement in pijul/src/remote/ssh.rs at line 679
    [6.45564][6.45564:46489]()
    self.run_protocol().await?;
    debug!("download_change {:?}", full);
    if full {
    self.c
    .data(format!("change {}\n", c.to_base32()).as_bytes())
    .await?;
    } else {
    self.c
    .data(format!("partial {}\n", c.to_base32()).as_bytes())
    .await?;
    }
    Ok(())
    }
    pub async fn wait_downloads(
    &mut self,
    changes_dir: &Path,
    hashes: &[libpijul::pristine::Hash],
    send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
    ) -> Result<(), anyhow::Error> {
    debug!("wait_downloads");
    if !self.is_running {
    return Ok(());
    }
    let mut remaining_len = 0;
    let mut current: usize = 0;
    let mut path = changes_dir.to_path_buf();
    libpijul::changestore::filesystem::push_filename(&mut path, &hashes[current]);
    [6.45564]
    [6.46489]
    let (sender_, mut recv) = tokio::sync::mpsc::channel(100);
    let mut path = changes_dir.clone();
    libpijul::changestore::filesystem::push_filename(&mut path, &c[0]);
  • replacement in pijul/src/remote/ssh.rs at line 684
    [6.46580][6.46580:49729]()
    let mut file = std::fs::File::create(&path)?;
    'outer: while let Some(msg) = self.c.wait().await {
    match msg {
    thrussh::ChannelMsg::Data { data } => {
    debug!("data = {:?}", &data[..]);
    let mut p = 0;
    while p < data.len() {
    if remaining_len == 0 {
    remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;
    p += 8;
    debug!("remaining_len = {:?}", remaining_len);
    }
    if data.len() >= p + remaining_len {
    file.write_all(&data[p..p + remaining_len])?;
    // We have enough data to write the
    // file, write it and move to the next
    // file.
    p += remaining_len;
    remaining_len = 0;
    file.flush()?;
    let mut final_path = path.clone();
    final_path.set_extension("change");
    debug!("moving {:?} to {:?}", path, final_path);
    std::fs::rename(&path, &final_path)?;
    debug!("sending");
    send.send(hashes[current].clone()).await.unwrap();
    debug!("sent");
    current += 1;
    if current < hashes.len() {
    // If we're still waiting for
    // another change.
    libpijul::changestore::filesystem::pop_filename(&mut path);
    libpijul::changestore::filesystem::push_filename(
    &mut path,
    &hashes[current],
    );
    std::fs::create_dir_all(&path.parent().unwrap())?;
    path.set_extension("");
    file = std::fs::File::create(&path)?;
    } else {
    // Else, just finish.
    break 'outer;
    }
    } else {
    // not enough data, we need more.
    file.write_all(&data[p..])?;
    remaining_len -= data.len() - p;
    break;
    }
    }
    }
    thrussh::ChannelMsg::ExitStatus { exit_status } => {
    debug!("exit: {:?}", exit_status);
    if exit_status != 0 {
    error!("Remote command returned {:?}", exit_status)
    }
    self.is_running = false;
    return Ok(());
    }
    msg => {
    debug!("{:?}", msg);
    }
    [6.46580]
    [6.49729]
    let file = std::fs::File::create(&path)?;
    *self.state.lock().await = State::Changes {
    sender: Some(sender_),
    remaining_len: 0,
    path,
    file,
    hashes: c.to_vec(),
    current: 0,
    };
    self.run_protocol().await?;
    for c in c {
    debug!("download_change {:?} {:?}", c, full);
    if full {
    self.c
    .data(format!("change {}\n", c.to_base32()).as_bytes())
    .await?;
    } else {
    self.c
    .data(format!("partial {}\n", c.to_base32()).as_bytes())
    .await?;
    }
    }
    while let Some(_hash) = recv.recv().await {
    debug!("received hash {:?}", _hash);
    if sender.send(_hash).await.is_err() {
    break
  • replacement in pijul/src/remote/ssh.rs at line 712
    [6.49753][6.49753:49799]()
    debug!("done waiting for downloads");
    [6.49753]
    [6.49799]
    debug!("done downloading");
  • edit in pijul/src/remote/ssh.rs at line 723
    [6.50033]
    [6.50033]
    let (sender, mut recv) = tokio::sync::mpsc::channel(10);
    *self.state.lock().await = State::Channel { sender };
  • edit in pijul/src/remote/ssh.rs at line 726
    [6.50069]
    [6.50069]
    debug!("clone channel");
  • edit in pijul/src/remote/ssh.rs at line 730
    [6.50173]
    [6.50173]
  • replacement in pijul/src/remote/ssh.rs at line 734
    [6.50331][6.50331:51267]()
    while let Some(msg) = self.c.wait().await {
    match msg {
    thrussh::ChannelMsg::Data { data } => {
    debug!("data = {:?}", &data[..]);
    if from_dump.read(&data)? {
    break;
    }
    }
    thrussh::ChannelMsg::ExtendedData { data, ext } => {
    debug!("data = {:?}, ext = {:?}", &data[..], ext);
    }
    thrussh::ChannelMsg::ExitStatus { exit_status } => {
    if exit_status != 0 {
    error!("Remote command returned {:?}", exit_status)
    }
    self.is_running = false;
    break;
    }
    msg => {
    debug!("msg = {:?}", msg);
    }
    [6.50331]
    [6.51267]
    while let Some(msg) = recv.recv().await {
    if from_dump.read(&msg)? {
    break;
  • replacement in pijul/src/remote/ssh.rs at line 764
    [6.52222][6.52222:52289]()
    self.wait_downloads(&repo.changes_dir, &hashes, &mut send)
    [6.52222]
    [6.52289]
    self.download_changes(&hashes, &mut send, &mut repo.changes_dir, false)
  • edit in pijul/src/remote/mod.rs at line 0
    [6.52474][4.386:406]()
    use std::io::Write;
  • edit in pijul/src/remote/mod.rs at line 1
    [4.438]
    [4.438]
    use std::sync::Arc;
  • edit in pijul/src/remote/mod.rs at line 17
    [6.52768]
    [6.52768]
    pub mod http;
    use http::*;
  • edit in pijul/src/remote/mod.rs at line 28
    [6.52851][6.52851:52973]()
    pub struct Http {
    pub url: String,
    pub channel: String,
    pub client: reqwest::Client,
    pub name: String,
    }
  • replacement in pijul/src/remote/mod.rs at line 90
    [6.54719][6.54719:54745]()
    pristine,
    [6.54719]
    [6.54745]
    pristine: Arc::new(pristine),
  • replacement in pijul/src/remote/mod.rs at line 278
    [6.60669][6.60669:60714]()
    pub async fn archive<W: std::io::Write>(
    [6.60669]
    [6.60714]
    pub async fn archive<W: std::io::Write + Send + 'static>(
  • replacement in pijul/src/remote/mod.rs at line 387
    [6.65147][2.2309:2397]()
    pub async fn upload_changes<T: MutTxnTExt, W: libpijul::working_copy::WorkingCopy>(
    [6.65147]
    [6.65180]
    pub async fn upload_changes<T: MutTxnTExt>(
  • edit in pijul/src/remote/mod.rs at line 389
    [6.65199][2.2398:2420]()
    repo: &mut W,
  • replacement in pijul/src/remote/mod.rs at line 423
    [2.2665][2.2665:2745]()
    local::upload_changes(&store, repo, txn, &mut channel, changes)
    [2.2665]
    [2.2745]
    local::upload_changes(&store, txn, &mut channel, changes)
  • replacement in pijul/src/remote/mod.rs at line 430
    [6.66651][6.66651:66691]()
    pub async fn start_change_download(
    [6.66651]
    [6.66691]
    pub async fn download_changes(
  • replacement in pijul/src/remote/mod.rs at line 432
    [6.66710][6.66710:66747]()
    c: libpijul::pristine::Hash,
    [6.66710]
    [6.66747]
    hashes: &[libpijul::pristine::Hash],
    send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
  • replacement in pijul/src/remote/mod.rs at line 437
    [6.66834][6.66834:67210]()
    debug!("start_change_download");
    libpijul::changestore::filesystem::push_filename(path, &c);
    if std::fs::metadata(&path).is_ok() && !full {
    debug!("metadata {:?} ok", path);
    libpijul::changestore::filesystem::pop_filename(path);
    return Ok(false);
    }
    std::fs::create_dir_all(&path.parent().unwrap())?;
    [6.66834]
    [6.67210]
    debug!("download_changes");
  • replacement in pijul/src/remote/mod.rs at line 439
    [6.67232][6.67232:68044]()
    RemoteRepo::Local(ref mut l) => l.start_change_download(c, path).await?,
    RemoteRepo::Ssh(ref mut s) => s.start_change_download(c, full).await?,
    RemoteRepo::Http(ref h) => {
    let mut f = std::fs::File::create(&path)?;
    let c32 = c.to_base32();
    let url = format!("{}/{}", h.url, DOT_DIR);
    let mut res = h.client.get(&url).query(&[("change", c32)]).send().await?;
    if !res.status().is_success() {
    return Err((crate::Error::Http {
    status: res.status(),
    })
    .into());
    }
    while let Some(chunk) = res.chunk().await? {
    f.write_all(&chunk)?;
    }
    }
    [6.67232]
    [2.2760]
    RemoteRepo::Local(ref mut l) => l.download_changes(hashes, send, path).await?,
    RemoteRepo::Ssh(ref mut s) => s.download_changes(hashes, send, path, full).await?,
    RemoteRepo::Http(ref mut h) => h.download_changes(hashes, send, path, full).await?,
  • edit in pijul/src/remote/mod.rs at line 447
    [6.68182][6.68182:68729]()
    }
    pub async fn wait_downloads(
    &mut self,
    changes_dir: &Path,
    hashes: &[libpijul::pristine::Hash],
    send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
    ) -> Result<(), anyhow::Error> {
    if hashes.is_empty() {
    return Ok(());
    }
    if let RemoteRepo::Ssh(ref mut s) = *self {
    s.wait_downloads(changes_dir, hashes, send).await?
    } else {
    for h in hashes {
    send.send(*h).await?
    }
    }
    Ok(())
  • replacement in pijul/src/remote/mod.rs at line 449
    [6.68736][6.68736:68803]()
    pub async fn pull<T: MutTxnTExt + TxnTExt>(
    &mut self,
    [6.68736]
    [6.68803]
    pub async fn pull<'a, T: MutTxnTExt + TxnTExt>(
    &'a mut self,
  • replacement in pijul/src/remote/mod.rs at line 461
    [6.69159][6.69159:69754]()
    let mut self_ = std::mem::replace(self, RemoteRepo::None);
    let t = tokio::spawn(async move {
    let mut hashes = Vec::new();
    for h in to_download_.iter() {
    if self_
    .start_change_download(*h, &mut change_path_, false)
    .await?
    {
    hashes.push(*h);
    }
    }
    debug!("hashes = {:?}", hashes);
    self_
    .wait_downloads(&change_path_, &hashes, &mut send)
    .await?;
    Ok(self_)
    });
    [6.69159]
    [6.69754]
    self
    .download_changes(&to_download_, &mut send, &mut change_path_, false)
    .await?;
  • edit in pijul/src/remote/mod.rs at line 489
    [6.70763][6.70763:70859]()
    let r: Result<_, anyhow::Error> = t.await?;
    debug!("done");
    *self = r?;
  • edit in pijul/src/remote/mod.rs at line 505
    [6.71428][6.71428:71469]()
    let mut hashes = Vec::new();
  • replacement in pijul/src/remote/mod.rs at line 506
    [6.71529][6.71529:71732]()
    if self_
    .start_change_download(hash, &mut change_path_, false)
    .await?
    {
    hashes.push(hash);
    }
    [6.71529]
    [6.71732]
    self_
    .download_changes(&[hash], &mut send_signal, &mut change_path_, false)
    .await?;
  • edit in pijul/src/remote/mod.rs at line 510
    [6.71746][6.71746:71908]()
    debug!("hashes = {:?}", hashes);
    self_
    .wait_downloads(&change_path_, &hashes, &mut send_signal)
    .await?;
  • edit in pijul/src/remote/mod.rs at line 606
    [6.75405][6.75405:75446]()
    let mut hashes = Vec::new();
  • replacement in pijul/src/remote/mod.rs at line 608
    [6.75562][6.75562:75825]()
    if self_
    .start_change_download(h, &mut changes_dir, true)
    .await?
    {
    debug!("push");
    hashes.push(h);
    }
    debug!("done");
    [6.75562]
    [6.75825]
    self_
    .download_changes(&[h], &mut send_sig, &mut changes_dir, true)
    .await?;
  • edit in pijul/src/remote/mod.rs at line 612
    [6.75839][6.75839:75983]()
    debug!("waiting");
    self_
    .wait_downloads(&changes_dir, &hashes, &mut send_sig)
    .await?;
  • edit in pijul/src/remote/mod.rs at line 687
    [6.78459][6.78459:78533]()
    // let pullable = self.pullable(txn, local_channel, path).await?;
  • replacement in pijul/src/remote/local.rs at line 0
    [6.79745][4.502:534]()
    use std::path::{Path, PathBuf};
    [6.79745]
    [4.534]
    use std::path::PathBuf;
    use std::sync::Arc;
  • edit in pijul/src/remote/local.rs at line 9
    [6.79901]
    [6.79901]
    #[derive(Clone)]
  • replacement in pijul/src/remote/local.rs at line 14
    [6.80020][6.80020:80079]()
    pub pristine: libpijul::pristine::sanakirja::Pristine,
    [6.80020]
    [6.80079]
    pub pristine: Arc<libpijul::pristine::sanakirja::Pristine>,
  • replacement in pijul/src/remote/local.rs at line 109
    [6.83453][6.83453:83493]()
    pub async fn start_change_download(
    [6.83453]
    [6.83493]
    pub async fn download_changes(
  • replacement in pijul/src/remote/local.rs at line 111
    [6.83512][6.83512:83570]()
    c: libpijul::pristine::Hash,
    path: &Path,
    [6.83512]
    [6.83570]
    c: &[libpijul::pristine::Hash],
    send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
    mut path: &mut PathBuf,
  • replacement in pijul/src/remote/local.rs at line 115
    [6.83607][6.83607:83755](),[6.83755][2.3622:3741]()
    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
    debug!("hard link {:?} {:?}", self.changes_dir, path);
    if std::fs::hard_link(&self.changes_dir, path).is_err() {
    std::fs::copy(&self.changes_dir, path)?;
    [6.83607]
    [2.3741]
    for c in c {
    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, c);
    libpijul::changestore::filesystem::push_filename(&mut path, c);
    if std::fs::metadata(&path).is_ok() {
    debug!("metadata {:?} ok", path);
    libpijul::changestore::filesystem::pop_filename(&mut path);
    continue
    }
    std::fs::create_dir_all(&path.parent().unwrap())?;
    if std::fs::hard_link(&self.changes_dir, &path).is_err() {
    std::fs::copy(&self.changes_dir, &path)?;
    }
    debug!("hard link done");
    libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
    libpijul::changestore::filesystem::pop_filename(&mut path);
    debug!("sent");
    send.send(*c).await.unwrap();
  • edit in pijul/src/remote/local.rs at line 133
    [2.3751][6.83809:83947](),[6.83809][6.83809:83947]()
    debug!("hard link done");
    libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
    debug!("sent");
  • replacement in pijul/src/commands/archive.rs at line 65
    [2.8755][2.8755:8811]()
    let mut f = std::fs::File::create(&p)?;
    [2.8755]
    [2.8811]
    let f = std::fs::File::create(&p)?;
  • replacement in pijul/src/commands/archive.rs at line 67
    [2.8834][2.8834:8916]()
    .archive(self.prefix, state.map(|x| (x, &extra[..])), &mut f)
    [2.8834]
    [2.8916]
    .archive(self.prefix, state.map(|x| (x, &extra[..])), f)
  • replacement in pijul/Cargo.toml at line 58
    [6.197741][5.0:19]()
    thrussh = "0.29.8"
    [6.197741]
    [5.19]
    thrussh = "0.29.9"
  • replacement in Cargo.lock at line 1977
    [6.1078238][3.1528:1547](),[3.1547][6.1078257:1078322](),[6.1078257][6.1078257:1078322](),[6.1078322][3.1548:1626]()
    version = "0.29.6"
    source = "registry+https://github.com/rust-lang/crates.io-index"
    checksum = "907849cfee4388f2d6bb1558f1d72ef80d70b5cb8d3583fb2c06391f9c9d71b4"
    [6.1078238]
    [6.1078400]
    version = "0.29.8"
  • replacement in Cargo.lock at line 1995
    [6.1078612][3.1648:1667]()
    version = "0.18.3"
    [6.1078612]
    [6.1078631]
    version = "0.18.8"
  • replacement in Cargo.lock at line 1997
    [6.1078696][3.1668:1746]()
    checksum = "086807e338da6863b2f151bf898a0ba233dec6e30f626982346679709decd23d"
    [6.1078696]
    [6.1078774]
    checksum = "b1c6b62fd561e81bbbaf6620f29d9d763dcede2925b96ebf3fe20f819e949ffa"