Making `pijul lock` robust to kill signals
[?]
Jan 16, 2021, 7:53 PM
SN7AGY6SLIWVKLBHQQ2MZ7VYY4BBFKROC5F3M6FVVFIGT25QNX7ACDependencies
- [2]
MBUNDZ3OLock: using tokio processes rather than std ones - [3]
WI5BS6BSNew published versions - [4]
XAY4DYRRVersion bump - [5]
5BRU2RRWCleanup (debugging a crash related to trees/inodes) - [6]
6DOXSHWGCleanup, and version bump - [7]
CCLLB7OIUpgrading to Sanakirja 0.15 + version bump - [8]
FI3WFMTSSimplifying the locks - [9]
YX3VCEOMVersion bump - [10]
TPEH2XNB1.0.0-alpha.28, with Tokio 1.0 - [11]
BT2ZHPY4Version bumps - [12]
JRENVH5DReqwest 0.11 - [13]
WIORLB47Version bump - [14]
SXEYMYF7Fixing the bad changes in history (unfortunately, by rebooting). - [15]
ZQXP3HNAVersion bump - [16]
ZTVNGFNTVersion bump - [17]
JACZWIJ6Version bump - [18]
G6YZ7U65Version bump - [19]
5YDI33C4Fixing pager on OSX - [20]
SAGSYAPXVarious version bumps - [21]
W5NU4F6JLess noisy lock - [22]
JL4WKA5PImplement the Sanakirja concurrency model in a cross-process way - [23]
3S4DR77ZVersion updates - [24]
64M73LNCHide `pijul lock` and avoid panics when used improperly - [25]
B5Z4IMEUGenerating Cargo.nix for pijul 1.0.0-alpha.6 - [26]
VYHHOEYHVersions and formatting - [27]
OCBM7IFENew release: pijul-1.0.0-alpha.8 - [28]
OUWD436AVersion bump - [29]
NX5I5H53New published versions - [*]
GYXIF25TProper parsing of URLs - [*]
2K7JLB4ZNo pager on Windows
Change contents
- replacement in pijul/src/repository/unix_lock.rs at line 31
s.read_exact(&mut [0u8]).await?;if let Err(e) = s.read_exact(&mut [0u8]).await {process.wait().await?;return Err(e);} - edit in pijul/src/commands/lock.rs at line 1
use clap::Clap;use fs2::FileExt;use std::fs::OpenOptions; - replacement in pijul/src/commands/lock.rs at line 8
use tokio::sync::{watch, Mutex};use clap::Clap;use tokio::sync::{mpsc, watch, Mutex}; - edit in pijul/src/commands/lock.rs at line 23
}self.run_socket().await}async fn run_socket(self) -> Result<(), anyhow::Error> {let lock = self.path.with_extension("lock");let file = OpenOptions::new().write(true).create(true).open(&lock)?;if file.try_lock_exclusive().is_err() {return Ok(()); - edit in pijul/src/commands/lock.rs at line 33
std::fs::remove_file(&self.path).unwrap_or(()); - edit in pijul/src/commands/lock.rs at line 41
let n_clients = Arc::new(Mutex::new(0usize));let file = Arc::new(self.path.clone());let muttxn = Arc::new(Mutex::new(())); - replacement in pijul/src/commands/lock.rs at line 42
let tx = std::sync::Arc::new(tx);let clock = Arc::new(Mutex::new(0usize));let txn_counter = Arc::new(Mutex::new(0usize));let (done_tx, mut done_rx) = mpsc::channel(1);let file = Arc::new(self.path);let locks = Locks {n_clients: Arc::new(Mutex::new(0usize)),muttxn: Arc::new(Mutex::new(())),tx: Arc::new(tx),active_at_last_commit,file: file.clone(),clock: Arc::new(Mutex::new(0usize)),txn_counter: Arc::new(Mutex::new(0usize)),done_tx,}; - replacement in pijul/src/commands/lock.rs at line 55[3.8434]→[3.8434:8479](∅→∅),[3.8768]→[3.8768:8885](∅→∅),[3.9272]→[3.9272:9496](∅→∅),[3.9565]→[3.9565:9720](∅→∅),[3.9765]→[3.9765:11259](∅→∅),[3.11340]→[3.11340:12167](∅→∅),[3.12167]→[3.1067:1137](∅→∅),[3.1137]→[3.12248:12456](∅→∅),[3.12248]→[3.12248:12456](∅→∅),[3.12879]→[3.12879:13197](∅→∅),[3.349]→[3.13531:13718](∅→∅),[3.13531]→[3.13531:13718](∅→∅)
let x = listener.accept().await;match x {Ok((mut stream, _addr)) => {*n_clients.lock().await += 1;let file = file.clone();let muttxn = muttxn.clone();let mut active_at_last_commit = active_at_last_commit.clone();let clock = clock.clone();let txn_counter = txn_counter.clone();let n_clients = n_clients.clone();let tx = tx.clone();tokio::spawn(async move {let mut t = [0u8];while let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == 1 {// muttxnlet lock = if let Ok(guard) = muttxn.try_lock() {guard} else {stream.write_all(&[LOCKED]).await.unwrap_or(());muttxn.lock().await};while *active_at_last_commit.borrow() > 0 {stream.write_all(&[LOCKED]).await.unwrap_or(());active_at_last_commit.changed().await.unwrap();}stream.write_all(&[ACK]).await.unwrap_or(());if let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == COMMIT {// commitlet mut clock = clock.lock().await;*clock += 1;let counter = *txn_counter.lock().await;tx.send(counter).unwrap();}}std::mem::drop(lock);} else {// txn*txn_counter.lock().await += 1;let start_date = *clock.lock().await;stream.write_all(&[ACK]).await.unwrap_or(());let n = stream.read(&mut t).await.unwrap_or(0);if n == 0 {break;}*txn_counter.lock().await -= 1;if start_date < *clock.lock().await {let last = *active_at_last_commit.borrow();tx.send(last - 1).unwrap();}}}if *n_clients.lock().await == 1 {tokio::time::sleep(std::time::Duration::from_secs(1)).await;if *n_clients.lock().await == 1 {std::fs::remove_file(file.as_ref()).unwrap_or(());std::process::exit(0)}}*n_clients.lock().await -= 1});tokio::select! {x = tokio::time::timeout(std::time::Duration::from_secs(1), listener.accept()) => {if let Ok(Ok((stream, _))) = x {accept(stream, locks.clone()).await} else {break}},_ = done_rx.recv() => {break - edit in pijul/src/commands/lock.rs at line 66
Err(_) => break, - replacement in pijul/src/commands/lock.rs at line 69
std::process::exit(0)Ok(()) - edit in pijul/src/commands/lock.rs at line 71
}#[derive(Clone)]struct Locks {n_clients: Arc<Mutex<usize>>,file: Arc<PathBuf>,muttxn: Arc<Mutex<()>>,clock: Arc<Mutex<usize>>,txn_counter: Arc<Mutex<usize>>,active_at_last_commit: watch::Receiver<usize>,tx: Arc<watch::Sender<usize>>,done_tx: mpsc::Sender<()>, - edit in pijul/src/commands/lock.rs at line 84[3.14080]
async fn accept(mut stream: tokio::net::UnixStream, mut locks: Locks) {*locks.n_clients.lock().await += 1;tokio::spawn(async move {let mut t = [0u8];while let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == 1 {// muttxnlet lock = if let Ok(guard) = locks.muttxn.try_lock() {guard} else {stream.write_all(&[LOCKED]).await.unwrap_or(());locks.muttxn.lock().await};while *locks.active_at_last_commit.borrow() > 0 {stream.write_all(&[LOCKED]).await.unwrap_or(());locks.active_at_last_commit.changed().await.unwrap();}stream.write_all(&[ACK]).await.unwrap_or(());if let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == COMMIT {// commitlet mut clock = locks.clock.lock().await;*clock += 1;let counter = *locks.txn_counter.lock().await;locks.tx.send(counter).unwrap();}}std::mem::drop(lock)} else {// txn*locks.txn_counter.lock().await += 1;let start_date = *locks.clock.lock().await;stream.write_all(&[ACK]).await.unwrap_or(());let n = stream.read(&mut t).await.unwrap_or(0);if n == 0 {break;}*locks.txn_counter.lock().await -= 1;if start_date < *locks.clock.lock().await {let last = *locks.active_at_last_commit.borrow();locks.tx.send(last - 1).unwrap();}}}if *locks.n_clients.lock().await == 1 {tokio::time::sleep(std::time::Duration::from_secs(1)).await;if *locks.n_clients.lock().await == 1 {locks.done_tx.send(()).await.unwrap();return;}}*locks.n_clients.lock().await -= 1});} - replacement in pijul/Cargo.toml at line 4
version = "1.0.0-alpha.33"version = "1.0.0-alpha.35" - edit in pijul/Cargo.toml at line 87[31.2300][32.309]
fs2 = "0.4"