Fixing SSH asynchronicity issues
[?]
Nov 23, 2020, 5:59 PM
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQCDependencies
- [2]
76PCXGMLPushing to, and pulling from the local repository - [3]
XWETQ4DEUpgrading versions - [4]
L4JXJHWXpijul/*: reorganize imports and remove extern crate - [5]
RR65HCKOThrussh versions - [6]
NX5I5H53New published versions - [7]
OUWD436AVersion bump - [8]
3S4DR77ZVersion updates - [9]
Q7CAYX5NFixing Windows compilation - [10]
SXEYMYF7Fixing the bad changes in history (unfortunately, by rebooting).
Change contents
- edit in pijul/src/remote/ssh.rs at line 3
use std::pin::Pin; - edit in pijul/src/remote/ssh.rs at line 14
use tokio::sync::Mutex; - edit in pijul/src/remote/ssh.rs at line 28
state: Arc<Mutex<State>>, - edit in pijul/src/remote/ssh.rs at line 88
let state = Arc::new(Mutex::new(State::None)); - edit in pijul/src/remote/ssh.rs at line 93
state: state.clone(), - edit in pijul/src/remote/ssh.rs at line 126
state, - edit in pijul/src/remote/ssh.rs at line 230
state: Arc<Mutex<State>>,}enum State {None,State {sender: Option<tokio::sync::oneshot::Sender<Option<(u64, Merkle)>>>,},Changes {sender: Option<tokio::sync::mpsc::Sender<Hash>>,remaining_len: usize,file: std::fs::File,path: PathBuf,hashes: Vec<libpijul::pristine::Hash>,current: usize,},Changelist {sender: tokio::sync::mpsc::Sender<Option<(u64, Hash, Merkle)>>,},Channel {sender: tokio::sync::mpsc::Sender<Vec<u8>>,},Archive {sender: Option<tokio::sync::oneshot::Sender<u64>>,len: u64,conflicts: u64,len_n: u64,w: Box<dyn Write + Send>,}, - replacement in pijul/src/remote/ssh.rs at line 263
type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;type FutureUnit = Pin<Box<dyn futures::future::Future<Output = Result<(Self, Session), anyhow::Error>> + Send>,>;// type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>; - replacement in pijul/src/remote/ssh.rs at line 272
futures::future::ready(Ok((self, session)))// futures::future::ready(Ok((self, session)))Box::pin(async move { Ok((self, session)) }) - edit in pijul/src/remote/ssh.rs at line 314
}fn data(self,channel: thrussh::ChannelId,data: &[u8],session: thrussh::client::Session,) -> Self::FutureUnit {debug!("data {:?} {:?}", channel, data.len());let data = data.to_vec();Box::pin(async move {match *self.state.lock().await {State::State { ref mut sender } => {debug!("state: State");if let Some(sender) = sender.take() {// If we can't parse `data` (for example if the// remote returns the standard "-\n"), this// returns None.let mut s = std::str::from_utf8(&data).unwrap().split(' ');debug!("s = {:?}", s);if let (Some(n), Some(m)) = (s.next(), s.next()) {let n = n.parse().unwrap();sender.send(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap()))).unwrap_or(());} else {sender.send(None).unwrap_or(());}}}State::Changes {ref mut sender,ref mut remaining_len,ref mut file,ref mut path,ref hashes,ref mut current,} => {debug!("state changes");let mut p = 0;while p < data.len() {if *remaining_len == 0 {*remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;p += 8;debug!("remaining_len = {:?}", remaining_len);}if data.len() >= p + *remaining_len {file.write_all(&data[p..p + *remaining_len])?;// We have enough data to write the// file, write it and move to the next// file.p += *remaining_len;*remaining_len = 0;file.flush()?;let mut final_path = path.clone();final_path.set_extension("change");debug!("moving {:?} to {:?}", path, final_path);std::fs::rename(&path, &final_path)?;debug!("sending");if let Some(ref mut sender) = sender {sender.send(hashes[*current].clone()).await.unwrap();}debug!("sent");*current += 1;if *current < hashes.len() {// If we're still waiting for// another change.libpijul::changestore::filesystem::pop_filename(path);libpijul::changestore::filesystem::push_filename(path,&hashes[*current],);std::fs::create_dir_all(&path.parent().unwrap())?;path.set_extension("");debug!("creating file {:?}", path);*file = std::fs::File::create(&path)?;} else {// Else, just finish.debug!("dropping channel");*sender = None;break;}} else {// not enough data, we need more.file.write_all(&data[p..])?;file.flush()?;*remaining_len -= data.len() - p;debug!("need more data");break;}}debug!("finished, {:?} {:?}", p, data.len());}State::Changelist { ref mut sender } => {debug!("state changelist");if &data[..] == b"\n" {debug!("log done");sender.send(None).await.unwrap_or(())} else if let Ok(data) = std::str::from_utf8(&data) {for l in data.lines() {if !l.is_empty() {debug!("line = {:?}", l);let (n, h, m) = parse_line(l)?;sender.send(Some((n, h, m))).await.unwrap_or(());} else {sender.send(None).await.unwrap_or(());}}}}State::Channel { ref mut sender } => sender.send(data).await?,State::Archive { ref mut sender, ref mut w, ref mut len, ref mut len_n, ref mut conflicts } => {let mut off = 0;while *len_n < 16 && off < data.len() {if *len_n < 8 {*len = (*len << 8) | (data[off] as u64);} else {*conflicts = (*conflicts << 8) | (data[off] as u64);}*len_n += 1;off += 1;}if *len_n >= 16 {w.write_all(&data[off..])?;*len -= (data.len() - off) as u64;if *len == 0 {if let Some(sender) = sender.take() {sender.send(*conflicts).unwrap_or(())}}}}State::None => {debug!("None state");}}Ok((self, session))}) - edit in pijul/src/remote/ssh.rs at line 515
debug!("get_state");let (sender, receiver) = tokio::sync::oneshot::channel();*self.state.lock().await = State::State {sender: Some(sender),}; - replacement in pijul/src/remote/ssh.rs at line 530
while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {// If we can't parse `data` (for example if the// remote returns the standard "-\n"), this// returns None.let mut s = std::str::from_utf8(&data)?.split(' ');debug!("s = {:?}", s);if let (Some(n), Some(m)) = (s.next(), s.next()) {let n = n.parse().unwrap();return Ok(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())));} else {break;}}thrussh::ChannelMsg::ExtendedData { data, ext } => {if ext == 1 {debug!("{:?}", std::str::from_utf8(&data))}}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),}}Ok(None)Ok(receiver.await?) - replacement in pijul/src/remote/ssh.rs at line 533
pub async fn archive<W: std::io::Write>(pub async fn archive<W: std::io::Write + Send + 'static>( - replacement in pijul/src/remote/ssh.rs at line 537
mut w: W,w: W, - edit in pijul/src/remote/ssh.rs at line 539
debug!("archive");let (sender, receiver) = tokio::sync::oneshot::channel();*self.state.lock().await = State::Archive {sender: Some(sender),len: 0,conflicts: 0,len_n: 0,w: Box::new(w),}; - edit in pijul/src/remote/ssh.rs at line 572
}let mut len = 0;let mut conflicts = 0;let mut len_n = 0;while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {let mut off = 0;while len_n < 16 && off < data.len() {if len_n < 8 {len = (len << 8) | (data[off] as u64);} else {conflicts = (conflicts << 8) | (data[off] as u64);}len_n += 1;off += 1;}if len_n >= 16 {w.write_all(&data[off..])?;len -= (data.len() - off) as u64;if len == 0 {break;}}}thrussh::ChannelMsg::ExtendedData { data, ext } => {if ext == 1 {debug!("{:?}", std::str::from_utf8(&data))}}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),} - edit in pijul/src/remote/ssh.rs at line 573
let conflicts = receiver.await.unwrap_or(0); - edit in pijul/src/remote/ssh.rs at line 621
let (sender, mut receiver) = tokio::sync::mpsc::channel(10);*self.state.lock().await = State::Changelist {sender,}; - replacement in pijul/src/remote/ssh.rs at line 635
'msg: while let Some(msg) = self.c.wait().await {debug!("msg = {:?}", msg);match msg {thrussh::ChannelMsg::Data { data } => {if &data[..] == b"\n" {debug!("log done");break;} else if let Ok(data) = std::str::from_utf8(&data) {for l in data.lines() {if !l.is_empty() {debug!("line = {:?}", l);let (n, h, m) = parse_line(l)?;txn.put_remote(remote, n, (h, m))?;} else {break 'msg;}}}}thrussh::ChannelMsg::ExtendedData { data, ext } => {debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));/*return Err((crate::Error::Remote {msg: std::str::from_utf8(&data[..]).unwrap().to_string()}).into())*/}thrussh::ChannelMsg::WindowAdjusted { .. } => {}thrussh::ChannelMsg::Eof => {}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {return Err((Error::RemoteExit {status: exit_status,}).into());}}msg => panic!("wrong message {:?}", msg),}while let Some(Some((n, h, m))) = receiver.recv().await {txn.put_remote(remote, n, (h, m))?; - replacement in pijul/src/remote/ssh.rs at line 672
pub async fn start_change_download(pub async fn download_changes( - replacement in pijul/src/remote/ssh.rs at line 674
c: libpijul::pristine::Hash,c: &[libpijul::pristine::Hash],sender: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,changes_dir: &mut PathBuf, - replacement in pijul/src/remote/ssh.rs at line 679
self.run_protocol().await?;debug!("download_change {:?}", full);if full {self.c.data(format!("change {}\n", c.to_base32()).as_bytes()).await?;} else {self.c.data(format!("partial {}\n", c.to_base32()).as_bytes()).await?;}Ok(())}pub async fn wait_downloads(&mut self,changes_dir: &Path,hashes: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,) -> Result<(), anyhow::Error> {debug!("wait_downloads");if !self.is_running {return Ok(());}let mut remaining_len = 0;let mut current: usize = 0;let mut path = changes_dir.to_path_buf();libpijul::changestore::filesystem::push_filename(&mut path, &hashes[current]);let (sender_, mut recv) = tokio::sync::mpsc::channel(100);let mut path = changes_dir.clone();libpijul::changestore::filesystem::push_filename(&mut path, &c[0]); - replacement in pijul/src/remote/ssh.rs at line 684
let mut file = std::fs::File::create(&path)?;'outer: while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {debug!("data = {:?}", &data[..]);let mut p = 0;while p < data.len() {if remaining_len == 0 {remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;p += 8;debug!("remaining_len = {:?}", remaining_len);}if data.len() >= p + remaining_len {file.write_all(&data[p..p + remaining_len])?;// We have enough data to write the// file, write it and move to the next// file.p += remaining_len;remaining_len = 0;file.flush()?;let mut final_path = path.clone();final_path.set_extension("change");debug!("moving {:?} to {:?}", path, final_path);std::fs::rename(&path, &final_path)?;debug!("sending");send.send(hashes[current].clone()).await.unwrap();debug!("sent");current += 1;if current < hashes.len() {// If we're still waiting for// another change.libpijul::changestore::filesystem::pop_filename(&mut path);libpijul::changestore::filesystem::push_filename(&mut path,&hashes[current],);std::fs::create_dir_all(&path.parent().unwrap())?;path.set_extension("");file = std::fs::File::create(&path)?;} else {// Else, just finish.break 'outer;}} else {// not enough data, we need more.file.write_all(&data[p..])?;remaining_len -= data.len() - p;break;}}}thrussh::ChannelMsg::ExitStatus { exit_status } => {debug!("exit: {:?}", exit_status);if exit_status != 0 {error!("Remote command returned {:?}", exit_status)}self.is_running = false;return Ok(());}msg => {debug!("{:?}", msg);}let file = std::fs::File::create(&path)?;*self.state.lock().await = State::Changes {sender: Some(sender_),remaining_len: 0,path,file,hashes: c.to_vec(),current: 0,};self.run_protocol().await?;for c in c {debug!("download_change {:?} {:?}", c, full);if full {self.c.data(format!("change {}\n", c.to_base32()).as_bytes()).await?;} else {self.c.data(format!("partial {}\n", c.to_base32()).as_bytes()).await?;}}while let Some(_hash) = recv.recv().await {debug!("received hash {:?}", _hash);if sender.send(_hash).await.is_err() {break - replacement in pijul/src/remote/ssh.rs at line 712
debug!("done waiting for downloads");debug!("done downloading"); - edit in pijul/src/remote/ssh.rs at line 723
let (sender, mut recv) = tokio::sync::mpsc::channel(10);*self.state.lock().await = State::Channel { sender }; - edit in pijul/src/remote/ssh.rs at line 726
debug!("clone channel"); - edit in pijul/src/remote/ssh.rs at line 730
- replacement in pijul/src/remote/ssh.rs at line 734
while let Some(msg) = self.c.wait().await {match msg {thrussh::ChannelMsg::Data { data } => {debug!("data = {:?}", &data[..]);if from_dump.read(&data)? {break;}}thrussh::ChannelMsg::ExtendedData { data, ext } => {debug!("data = {:?}, ext = {:?}", &data[..], ext);}thrussh::ChannelMsg::ExitStatus { exit_status } => {if exit_status != 0 {error!("Remote command returned {:?}", exit_status)}self.is_running = false;break;}msg => {debug!("msg = {:?}", msg);}while let Some(msg) = recv.recv().await {if from_dump.read(&msg)? {break; - replacement in pijul/src/remote/ssh.rs at line 764
self.wait_downloads(&repo.changes_dir, &hashes, &mut send)self.download_changes(&hashes, &mut send, &mut repo.changes_dir, false) - edit in pijul/src/remote/mod.rs at line 0
use std::io::Write; - edit in pijul/src/remote/mod.rs at line 1
use std::sync::Arc; - edit in pijul/src/remote/mod.rs at line 17
pub mod http;use http::*; - edit in pijul/src/remote/mod.rs at line 28
pub struct Http {pub url: String,pub channel: String,pub client: reqwest::Client,pub name: String,} - replacement in pijul/src/remote/mod.rs at line 90
pristine,pristine: Arc::new(pristine), - replacement in pijul/src/remote/mod.rs at line 278
pub async fn archive<W: std::io::Write>(pub async fn archive<W: std::io::Write + Send + 'static>( - replacement in pijul/src/remote/mod.rs at line 387
pub async fn upload_changes<T: MutTxnTExt, W: libpijul::working_copy::WorkingCopy>(pub async fn upload_changes<T: MutTxnTExt>( - edit in pijul/src/remote/mod.rs at line 389
repo: &mut W, - replacement in pijul/src/remote/mod.rs at line 423
local::upload_changes(&store, repo, txn, &mut channel, changes)local::upload_changes(&store, txn, &mut channel, changes) - replacement in pijul/src/remote/mod.rs at line 430
pub async fn start_change_download(pub async fn download_changes( - replacement in pijul/src/remote/mod.rs at line 432
c: libpijul::pristine::Hash,hashes: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>, - replacement in pijul/src/remote/mod.rs at line 437
debug!("start_change_download");libpijul::changestore::filesystem::push_filename(path, &c);if std::fs::metadata(&path).is_ok() && !full {debug!("metadata {:?} ok", path);libpijul::changestore::filesystem::pop_filename(path);return Ok(false);}std::fs::create_dir_all(&path.parent().unwrap())?;debug!("download_changes"); - replacement in pijul/src/remote/mod.rs at line 439
RemoteRepo::Local(ref mut l) => l.start_change_download(c, path).await?,RemoteRepo::Ssh(ref mut s) => s.start_change_download(c, full).await?,RemoteRepo::Http(ref h) => {let mut f = std::fs::File::create(&path)?;let c32 = c.to_base32();let url = format!("{}/{}", h.url, DOT_DIR);let mut res = h.client.get(&url).query(&[("change", c32)]).send().await?;if !res.status().is_success() {return Err((crate::Error::Http {status: res.status(),}).into());}while let Some(chunk) = res.chunk().await? {f.write_all(&chunk)?;}}RemoteRepo::Local(ref mut l) => l.download_changes(hashes, send, path).await?,RemoteRepo::Ssh(ref mut s) => s.download_changes(hashes, send, path, full).await?,RemoteRepo::Http(ref mut h) => h.download_changes(hashes, send, path, full).await?, - edit in pijul/src/remote/mod.rs at line 447
}pub async fn wait_downloads(&mut self,changes_dir: &Path,hashes: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,) -> Result<(), anyhow::Error> {if hashes.is_empty() {return Ok(());}if let RemoteRepo::Ssh(ref mut s) = *self {s.wait_downloads(changes_dir, hashes, send).await?} else {for h in hashes {send.send(*h).await?}}Ok(()) - replacement in pijul/src/remote/mod.rs at line 449
pub async fn pull<T: MutTxnTExt + TxnTExt>(&mut self,pub async fn pull<'a, T: MutTxnTExt + TxnTExt>(&'a mut self, - replacement in pijul/src/remote/mod.rs at line 461
let mut self_ = std::mem::replace(self, RemoteRepo::None);let t = tokio::spawn(async move {let mut hashes = Vec::new();for h in to_download_.iter() {if self_.start_change_download(*h, &mut change_path_, false).await?{hashes.push(*h);}}debug!("hashes = {:?}", hashes);self_.wait_downloads(&change_path_, &hashes, &mut send).await?;Ok(self_)});self.download_changes(&to_download_, &mut send, &mut change_path_, false).await?; - edit in pijul/src/remote/mod.rs at line 489
let r: Result<_, anyhow::Error> = t.await?;debug!("done");*self = r?; - edit in pijul/src/remote/mod.rs at line 505
let mut hashes = Vec::new(); - replacement in pijul/src/remote/mod.rs at line 506
if self_.start_change_download(hash, &mut change_path_, false).await?{hashes.push(hash);}self_.download_changes(&[hash], &mut send_signal, &mut change_path_, false).await?; - edit in pijul/src/remote/mod.rs at line 510
debug!("hashes = {:?}", hashes);self_.wait_downloads(&change_path_, &hashes, &mut send_signal).await?; - edit in pijul/src/remote/mod.rs at line 606
let mut hashes = Vec::new(); - replacement in pijul/src/remote/mod.rs at line 608
if self_.start_change_download(h, &mut changes_dir, true).await?{debug!("push");hashes.push(h);}debug!("done");self_.download_changes(&[h], &mut send_sig, &mut changes_dir, true).await?; - edit in pijul/src/remote/mod.rs at line 612
debug!("waiting");self_.wait_downloads(&changes_dir, &hashes, &mut send_sig).await?; - edit in pijul/src/remote/mod.rs at line 687
// let pullable = self.pullable(txn, local_channel, path).await?; - replacement in pijul/src/remote/local.rs at line 0
use std::path::{Path, PathBuf};use std::path::PathBuf;use std::sync::Arc; - edit in pijul/src/remote/local.rs at line 9
#[derive(Clone)] - replacement in pijul/src/remote/local.rs at line 14
pub pristine: libpijul::pristine::sanakirja::Pristine,pub pristine: Arc<libpijul::pristine::sanakirja::Pristine>, - replacement in pijul/src/remote/local.rs at line 109
pub async fn start_change_download(pub async fn download_changes( - replacement in pijul/src/remote/local.rs at line 111
c: libpijul::pristine::Hash,path: &Path,c: &[libpijul::pristine::Hash],send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,mut path: &mut PathBuf, - replacement in pijul/src/remote/local.rs at line 115
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);debug!("hard link {:?} {:?}", self.changes_dir, path);if std::fs::hard_link(&self.changes_dir, path).is_err() {std::fs::copy(&self.changes_dir, path)?;for c in c {libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, c);libpijul::changestore::filesystem::push_filename(&mut path, c);if std::fs::metadata(&path).is_ok() {debug!("metadata {:?} ok", path);libpijul::changestore::filesystem::pop_filename(&mut path);continue}std::fs::create_dir_all(&path.parent().unwrap())?;if std::fs::hard_link(&self.changes_dir, &path).is_err() {std::fs::copy(&self.changes_dir, &path)?;}debug!("hard link done");libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);libpijul::changestore::filesystem::pop_filename(&mut path);debug!("sent");send.send(*c).await.unwrap(); - edit in pijul/src/remote/local.rs at line 133
debug!("hard link done");libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);debug!("sent"); - replacement in pijul/src/commands/archive.rs at line 65
let mut f = std::fs::File::create(&p)?;let f = std::fs::File::create(&p)?; - replacement in pijul/src/commands/archive.rs at line 67
.archive(self.prefix, state.map(|x| (x, &extra[..])), &mut f).archive(self.prefix, state.map(|x| (x, &extra[..])), f) - replacement in pijul/Cargo.toml at line 58
thrussh = "0.29.8"thrussh = "0.29.9" - replacement in Cargo.lock at line 1977[6.1078238]→[3.1528:1547](∅→∅),[3.1547]→[6.1078257:1078322](∅→∅),[6.1078257]→[6.1078257:1078322](∅→∅),[6.1078322]→[3.1548:1626](∅→∅)
version = "0.29.6"source = "registry+https://github.com/rust-lang/crates.io-index"checksum = "907849cfee4388f2d6bb1558f1d72ef80d70b5cb8d3583fb2c06391f9c9d71b4"version = "0.29.8" - replacement in Cargo.lock at line 1995
version = "0.18.3"version = "0.18.8" - replacement in Cargo.lock at line 1997
checksum = "086807e338da6863b2f151bf898a0ba233dec6e30f626982346679709decd23d"checksum = "b1c6b62fd561e81bbbaf6620f29d9d763dcede2925b96ebf3fe20f819e949ffa"