SN7AGY6SLIWVKLBHQQ2MZ7VYY4BBFKROC5F3M6FVVFIGT25QNX7AC MBUNDZ3OES7FXMIYBICHRKXWEII5FZO3LFPSP5EDWQUG7KGP2NPQC FI3WFMTSNVFWOQWSIWD64UCPRIS6MW4GOADOC6RXPHBSGZBYL5PAC JL4WKA5PBKXRNAMETYO4I52QKASQ3COYHH2JKGA7W5YLIRZZH53AC 64M73LNCB2V57AWAAKHK2NNERXANMV5V3XI3TFC4XR7FLJNCDCRQC SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC GYXIF25T2BCTCWCQI5DQOF3F4HBWDJUDJUMZ2WLHVBIOYATJTFAQC 2K7JLB4Z7BS5VFNWD4DO3MKYU7VNPA5MTVHVSDI3FQZ5ICM6XM6QC let tx = std::sync::Arc::new(tx);let clock = Arc::new(Mutex::new(0usize));let txn_counter = Arc::new(Mutex::new(0usize));
let (done_tx, mut done_rx) = mpsc::channel(1);let file = Arc::new(self.path);let locks = Locks {n_clients: Arc::new(Mutex::new(0usize)),muttxn: Arc::new(Mutex::new(())),tx: Arc::new(tx),active_at_last_commit,file: file.clone(),clock: Arc::new(Mutex::new(0usize)),txn_counter: Arc::new(Mutex::new(0usize)),done_tx,};
let x = listener.accept().await;match x {Ok((mut stream, _addr)) => {*n_clients.lock().await += 1;let file = file.clone();let muttxn = muttxn.clone();let mut active_at_last_commit = active_at_last_commit.clone();let clock = clock.clone();let txn_counter = txn_counter.clone();let n_clients = n_clients.clone();let tx = tx.clone();tokio::spawn(async move {let mut t = [0u8];while let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == 1 {// muttxnlet lock = if let Ok(guard) = muttxn.try_lock() {guard} else {stream.write_all(&[LOCKED]).await.unwrap_or(());muttxn.lock().await};while *active_at_last_commit.borrow() > 0 {stream.write_all(&[LOCKED]).await.unwrap_or(());active_at_last_commit.changed().await.unwrap();}stream.write_all(&[ACK]).await.unwrap_or(());if let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == COMMIT {// commitlet mut clock = clock.lock().await;*clock += 1;let counter = *txn_counter.lock().await;tx.send(counter).unwrap();}}std::mem::drop(lock);} else {// txn*txn_counter.lock().await += 1;let start_date = *clock.lock().await;stream.write_all(&[ACK]).await.unwrap_or(());let n = stream.read(&mut t).await.unwrap_or(0);if n == 0 {break;}*txn_counter.lock().await -= 1;if start_date < *clock.lock().await {let last = *active_at_last_commit.borrow();tx.send(last - 1).unwrap();}}}if *n_clients.lock().await == 1 {tokio::time::sleep(std::time::Duration::from_secs(1)).await;if *n_clients.lock().await == 1 {std::fs::remove_file(file.as_ref()).unwrap_or(());std::process::exit(0)}}*n_clients.lock().await -= 1});
tokio::select! {x = tokio::time::timeout(std::time::Duration::from_secs(1), listener.accept()) => {if let Ok(Ok((stream, _))) = x {accept(stream, locks.clone()).await} else {break}},_ = done_rx.recv() => {break
async fn accept(mut stream: tokio::net::UnixStream, mut locks: Locks) {*locks.n_clients.lock().await += 1;tokio::spawn(async move {let mut t = [0u8];while let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == 1 {// muttxnlet lock = if let Ok(guard) = locks.muttxn.try_lock() {guard} else {stream.write_all(&[LOCKED]).await.unwrap_or(());locks.muttxn.lock().await};while *locks.active_at_last_commit.borrow() > 0 {stream.write_all(&[LOCKED]).await.unwrap_or(());locks.active_at_last_commit.changed().await.unwrap();}stream.write_all(&[ACK]).await.unwrap_or(());if let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == COMMIT {// commitlet mut clock = locks.clock.lock().await;*clock += 1;let counter = *locks.txn_counter.lock().await;locks.tx.send(counter).unwrap();}}std::mem::drop(lock)} else {// txn*locks.txn_counter.lock().await += 1;let start_date = *locks.clock.lock().await;stream.write_all(&[ACK]).await.unwrap_or(());let n = stream.read(&mut t).await.unwrap_or(0);if n == 0 {break;}*locks.txn_counter.lock().await -= 1;if start_date < *locks.clock.lock().await {let last = *locks.active_at_last_commit.borrow();locks.tx.send(last - 1).unwrap();}}}if *locks.n_clients.lock().await == 1 {tokio::time::sleep(std::time::Duration::from_secs(1)).await;if *locks.n_clients.lock().await == 1 {locks.done_tx.send(()).await.unwrap();return;}}*locks.n_clients.lock().await -= 1});}