SN7AGY6SLIWVKLBHQQ2MZ7VYY4BBFKROC5F3M6FVVFIGT25QNX7AC
MBUNDZ3OES7FXMIYBICHRKXWEII5FZO3LFPSP5EDWQUG7KGP2NPQC
FI3WFMTSNVFWOQWSIWD64UCPRIS6MW4GOADOC6RXPHBSGZBYL5PAC
JL4WKA5PBKXRNAMETYO4I52QKASQ3COYHH2JKGA7W5YLIRZZH53AC
64M73LNCB2V57AWAAKHK2NNERXANMV5V3XI3TFC4XR7FLJNCDCRQC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
GYXIF25T2BCTCWCQI5DQOF3F4HBWDJUDJUMZ2WLHVBIOYATJTFAQC
2K7JLB4Z7BS5VFNWD4DO3MKYU7VNPA5MTVHVSDI3FQZ5ICM6XM6QC
let tx = std::sync::Arc::new(tx);
let clock = Arc::new(Mutex::new(0usize));
let txn_counter = Arc::new(Mutex::new(0usize));
let (done_tx, mut done_rx) = mpsc::channel(1);
let file = Arc::new(self.path);
let locks = Locks {
n_clients: Arc::new(Mutex::new(0usize)),
muttxn: Arc::new(Mutex::new(())),
tx: Arc::new(tx),
active_at_last_commit,
file: file.clone(),
clock: Arc::new(Mutex::new(0usize)),
txn_counter: Arc::new(Mutex::new(0usize)),
done_tx,
};
let x = listener.accept().await;
match x {
Ok((mut stream, _addr)) => {
*n_clients.lock().await += 1;
let file = file.clone();
let muttxn = muttxn.clone();
let mut active_at_last_commit = active_at_last_commit.clone();
let clock = clock.clone();
let txn_counter = txn_counter.clone();
let n_clients = n_clients.clone();
let tx = tx.clone();
tokio::spawn(async move {
let mut t = [0u8];
while let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == 1 {
// muttxn
let lock = if let Ok(guard) = muttxn.try_lock() {
guard
} else {
stream.write_all(&[LOCKED]).await.unwrap_or(());
muttxn.lock().await
};
while *active_at_last_commit.borrow() > 0 {
stream.write_all(&[LOCKED]).await.unwrap_or(());
active_at_last_commit.changed().await.unwrap();
}
stream.write_all(&[ACK]).await.unwrap_or(());
if let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == COMMIT {
// commit
let mut clock = clock.lock().await;
*clock += 1;
let counter = *txn_counter.lock().await;
tx.send(counter).unwrap();
}
}
std::mem::drop(lock);
} else {
// txn
*txn_counter.lock().await += 1;
let start_date = *clock.lock().await;
stream.write_all(&[ACK]).await.unwrap_or(());
let n = stream.read(&mut t).await.unwrap_or(0);
if n == 0 {
break;
}
*txn_counter.lock().await -= 1;
if start_date < *clock.lock().await {
let last = *active_at_last_commit.borrow();
tx.send(last - 1).unwrap();
}
}
}
if *n_clients.lock().await == 1 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if *n_clients.lock().await == 1 {
std::fs::remove_file(file.as_ref()).unwrap_or(());
std::process::exit(0)
}
}
*n_clients.lock().await -= 1
});
tokio::select! {
x = tokio::time::timeout(std::time::Duration::from_secs(1), listener.accept()) => {
if let Ok(Ok((stream, _))) = x {
accept(stream, locks.clone()).await
} else {
break
}
},
_ = done_rx.recv() => {
break
async fn accept(mut stream: tokio::net::UnixStream, mut locks: Locks) {
*locks.n_clients.lock().await += 1;
tokio::spawn(async move {
let mut t = [0u8];
while let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == 1 {
// muttxn
let lock = if let Ok(guard) = locks.muttxn.try_lock() {
guard
} else {
stream.write_all(&[LOCKED]).await.unwrap_or(());
locks.muttxn.lock().await
};
while *locks.active_at_last_commit.borrow() > 0 {
stream.write_all(&[LOCKED]).await.unwrap_or(());
locks.active_at_last_commit.changed().await.unwrap();
}
stream.write_all(&[ACK]).await.unwrap_or(());
if let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == COMMIT {
// commit
let mut clock = locks.clock.lock().await;
*clock += 1;
let counter = *locks.txn_counter.lock().await;
locks.tx.send(counter).unwrap();
}
}
std::mem::drop(lock)
} else {
// txn
*locks.txn_counter.lock().await += 1;
let start_date = *locks.clock.lock().await;
stream.write_all(&[ACK]).await.unwrap_or(());
let n = stream.read(&mut t).await.unwrap_or(0);
if n == 0 {
break;
}
*locks.txn_counter.lock().await -= 1;
if start_date < *locks.clock.lock().await {
let last = *locks.active_at_last_commit.borrow();
locks.tx.send(last - 1).unwrap();
}
}
}
if *locks.n_clients.lock().await == 1 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if *locks.n_clients.lock().await == 1 {
locks.done_tx.send(()).await.unwrap();
return;
}
}
*locks.n_clients.lock().await -= 1
});
}