Making `pijul lock` robust to kill signals

[?]
Jan 16, 2021, 7:53 PM
SN7AGY6SLIWVKLBHQQ2MZ7VYY4BBFKROC5F3M6FVVFIGT25QNX7AC

Dependencies

  • [2] MBUNDZ3O Lock: using tokio processes rather than std ones
  • [3] OCBM7IFE New release: pijul-1.0.0-alpha.8
  • [4] WIORLB47 Version bump
  • [5] G6YZ7U65 Version bump
  • [6] OUWD436A Version bump
  • [7] ZQXP3HNA Version bump
  • [8] BT2ZHPY4 Version bumps
  • [9] JACZWIJ6 Version bump
  • [10] 5YDI33C4 Fixing pager on OSX
  • [11] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [12] XAY4DYRR Version bump
  • [13] ZTVNGFNT Version bump
  • [14] TPEH2XNB 1.0.0-alpha.28, with Tokio 1.0
  • [15] FI3WFMTS Simplifying the locks
  • [16] 6DOXSHWG Cleanup, and version bump
  • [17] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [18] JRENVH5D Reqwest 0.11
  • [19] WI5BS6BS New published versions
  • [20] 3S4DR77Z Version updates
  • [21] YX3VCEOM Version bump
  • [22] W5NU4F6J Less noisy lock
  • [23] NX5I5H53 New published versions
  • [24] 5BRU2RRW Cleanup (debugging a crash related to trees/inodes)
  • [25] SAGSYAPX Various version bumps
  • [26] B5Z4IMEU Generating Cargo.nix for pijul 1.0.0-alpha.6
  • [27] VYHHOEYH Versions and formatting
  • [28] JL4WKA5P Implement the Sanakirja concurrency model in a cross-process way
  • [29] 64M73LNC Hide `pijul lock` and avoid panics when used improperly
  • [*] GYXIF25T Proper parsing of URLs
  • [*] 2K7JLB4Z No pager on Windows

Change contents

  • replacement in pijul/src/repository/unix_lock.rs at line 31
    [3.462][2.162:203]()
    s.read_exact(&mut [0u8]).await?;
    [3.462]
    [3.491]
    if let Err(e) = s.read_exact(&mut [0u8]).await {
    process.wait().await?;
    return Err(e);
    }
  • edit in pijul/src/commands/lock.rs at line 1
    [3.7180]
    [3.7245]
    use clap::Clap;
    use fs2::FileExt;
    use std::fs::OpenOptions;
  • replacement in pijul/src/commands/lock.rs at line 8
    [3.7365][3.7365:7415]()
    use tokio::sync::{watch, Mutex};
    use clap::Clap;
    [3.7365]
    [3.7415]
    use tokio::sync::{mpsc, watch, Mutex};
  • edit in pijul/src/commands/lock.rs at line 23
    [3.877]
    [3.7905]
    }
    self.run_socket().await
    }
    async fn run_socket(self) -> Result<(), anyhow::Error> {
    let lock = self.path.with_extension("lock");
    let file = OpenOptions::new().write(true).create(true).open(&lock)?;
    if file.try_lock_exclusive().is_err() {
    return Ok(());
  • edit in pijul/src/commands/lock.rs at line 33
    [3.7915]
    [3.878]
    std::fs::remove_file(&self.path).unwrap_or(());
  • edit in pijul/src/commands/lock.rs at line 41
    [3.1066][3.7999:8149](),[3.7999][3.7999:8149]()
    let n_clients = Arc::new(Mutex::new(0usize));
    let file = Arc::new(self.path.clone());
    let muttxn = Arc::new(Mutex::new(()));
  • replacement in pijul/src/commands/lock.rs at line 42
    [3.8210][3.8210:8358]()
    let tx = std::sync::Arc::new(tx);
    let clock = Arc::new(Mutex::new(0usize));
    let txn_counter = Arc::new(Mutex::new(0usize));
    [3.8210]
    [3.8419]
    let (done_tx, mut done_rx) = mpsc::channel(1);
    let file = Arc::new(self.path);
    let locks = Locks {
    n_clients: Arc::new(Mutex::new(0usize)),
    muttxn: Arc::new(Mutex::new(())),
    tx: Arc::new(tx),
    active_at_last_commit,
    file: file.clone(),
    clock: Arc::new(Mutex::new(0usize)),
    txn_counter: Arc::new(Mutex::new(0usize)),
    done_tx,
    };
  • replacement in pijul/src/commands/lock.rs at line 55
    [3.8434][3.8434:8479](),[3.8768][3.8768:8885](),[3.9272][3.9272:9496](),[3.9565][3.9565:9720](),[3.9765][3.9765:11259](),[3.11340][3.11340:12167](),[3.12167][3.1067:1137](),[3.1137][3.12248:12456](),[3.12248][3.12248:12456](),[3.12879][3.12879:13197](),[3.349][3.13531:13718](),[3.13531][3.13531:13718]()
    let x = listener.accept().await;
    match x {
    Ok((mut stream, _addr)) => {
    *n_clients.lock().await += 1;
    let file = file.clone();
    let muttxn = muttxn.clone();
    let mut active_at_last_commit = active_at_last_commit.clone();
    let clock = clock.clone();
    let txn_counter = txn_counter.clone();
    let n_clients = n_clients.clone();
    let tx = tx.clone();
    tokio::spawn(async move {
    let mut t = [0u8];
    while let Ok(n) = stream.read(&mut t).await {
    if n == 0 {
    break;
    }
    if t[0] == 1 {
    // muttxn
    let lock = if let Ok(guard) = muttxn.try_lock() {
    guard
    } else {
    stream.write_all(&[LOCKED]).await.unwrap_or(());
    muttxn.lock().await
    };
    while *active_at_last_commit.borrow() > 0 {
    stream.write_all(&[LOCKED]).await.unwrap_or(());
    active_at_last_commit.changed().await.unwrap();
    }
    stream.write_all(&[ACK]).await.unwrap_or(());
    if let Ok(n) = stream.read(&mut t).await {
    if n == 0 {
    break;
    }
    if t[0] == COMMIT {
    // commit
    let mut clock = clock.lock().await;
    *clock += 1;
    let counter = *txn_counter.lock().await;
    tx.send(counter).unwrap();
    }
    }
    std::mem::drop(lock);
    } else {
    // txn
    *txn_counter.lock().await += 1;
    let start_date = *clock.lock().await;
    stream.write_all(&[ACK]).await.unwrap_or(());
    let n = stream.read(&mut t).await.unwrap_or(0);
    if n == 0 {
    break;
    }
    *txn_counter.lock().await -= 1;
    if start_date < *clock.lock().await {
    let last = *active_at_last_commit.borrow();
    tx.send(last - 1).unwrap();
    }
    }
    }
    if *n_clients.lock().await == 1 {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    if *n_clients.lock().await == 1 {
    std::fs::remove_file(file.as_ref()).unwrap_or(());
    std::process::exit(0)
    }
    }
    *n_clients.lock().await -= 1
    });
    [3.8434]
    [3.13718]
    tokio::select! {
    x = tokio::time::timeout(std::time::Duration::from_secs(1), listener.accept()) => {
    if let Ok(Ok((stream, _))) = x {
    accept(stream, locks.clone()).await
    } else {
    break
    }
    },
    _ = done_rx.recv() => {
    break
  • edit in pijul/src/commands/lock.rs at line 66
    [3.13736][3.13736:13769]()
    Err(_) => break,
  • replacement in pijul/src/commands/lock.rs at line 69
    [3.14042][3.14042:14072]()
    std::process::exit(0)
    [3.13852]
    [3.14072]
    Ok(())
  • edit in pijul/src/commands/lock.rs at line 71
    [3.14078]
    [3.14078]
    }
    #[derive(Clone)]
    struct Locks {
    n_clients: Arc<Mutex<usize>>,
    file: Arc<PathBuf>,
    muttxn: Arc<Mutex<()>>,
    clock: Arc<Mutex<usize>>,
    txn_counter: Arc<Mutex<usize>>,
    active_at_last_commit: watch::Receiver<usize>,
    tx: Arc<watch::Sender<usize>>,
    done_tx: mpsc::Sender<()>,
  • edit in pijul/src/commands/lock.rs at line 84
    [3.14080]
    async fn accept(mut stream: tokio::net::UnixStream, mut locks: Locks) {
    *locks.n_clients.lock().await += 1;
    tokio::spawn(async move {
    let mut t = [0u8];
    while let Ok(n) = stream.read(&mut t).await {
    if n == 0 {
    break;
    }
    if t[0] == 1 {
    // muttxn
    let lock = if let Ok(guard) = locks.muttxn.try_lock() {
    guard
    } else {
    stream.write_all(&[LOCKED]).await.unwrap_or(());
    locks.muttxn.lock().await
    };
    while *locks.active_at_last_commit.borrow() > 0 {
    stream.write_all(&[LOCKED]).await.unwrap_or(());
    locks.active_at_last_commit.changed().await.unwrap();
    }
    stream.write_all(&[ACK]).await.unwrap_or(());
    if let Ok(n) = stream.read(&mut t).await {
    if n == 0 {
    break;
    }
    if t[0] == COMMIT {
    // commit
    let mut clock = locks.clock.lock().await;
    *clock += 1;
    let counter = *locks.txn_counter.lock().await;
    locks.tx.send(counter).unwrap();
    }
    }
    std::mem::drop(lock)
    } else {
    // txn
    *locks.txn_counter.lock().await += 1;
    let start_date = *locks.clock.lock().await;
    stream.write_all(&[ACK]).await.unwrap_or(());
    let n = stream.read(&mut t).await.unwrap_or(0);
    if n == 0 {
    break;
    }
    *locks.txn_counter.lock().await -= 1;
    if start_date < *locks.clock.lock().await {
    let last = *locks.active_at_last_commit.borrow();
    locks.tx.send(last - 1).unwrap();
    }
    }
    }
    if *locks.n_clients.lock().await == 1 {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    if *locks.n_clients.lock().await == 1 {
    locks.done_tx.send(()).await.unwrap();
    return;
    }
    }
    *locks.n_clients.lock().await -= 1
    });
    }
  • replacement in pijul/Cargo.toml at line 4
    [3.196462][3.16737:16764]()
    version = "1.0.0-alpha.33"
    [3.196462]
    [3.196488]
    version = "1.0.0-alpha.35"
  • edit in pijul/Cargo.toml at line 87
    [31.2300]
    [32.309]
    fs2 = "0.4"