JL4WKA5PBKXRNAMETYO4I52QKASQ3COYHH2JKGA7W5YLIRZZH53AC
WEHUTJUKHOJIBMEK2M7ILPK532FMO7YGWTEAHOIXZP5WOOOSF3ZAC
GYXIF25T2BCTCWCQI5DQOF3F4HBWDJUDJUMZ2WLHVBIOYATJTFAQC
SQVWP4LU7AAJSEIHK5CNNUK3XBUVT3FGIJIOPTKMR53PO2P4ARNQC
I2D35LLFDA7KMKGVQWDOYZFZVHTRKOCLVRL3M3ZASSU7QQKL4K5AC
VYHHOEYHO67JNJEODX5L3CQFIV3DAXZBBIQUOMCWJDYF3VWICDNQC
HW5Q7GGYDMCQQYDBFLDAWCTNQQMIQHJ77UOSPN5BHMSD3KMO2O3AC
L4JXJHWXYNCL4QGJXNKKTOKKTAXKKXBJUUY7HFZGEUZ5A2V5H34QC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC
SNZ3OAMCPUGFYON5SZHQQQK46ZZMVMJECJYEUCMG657UVLY2PNBQC
5DVRL6MFXQOCPOZMYSKBERMRRVUTYRL2SRGRTU2MH4IEOFCDKM3QC
VQPAUKBQ2POZKL7CZFAZK5ZQKEBYL27XZYZWYUSH5AH25KK6DWKAC
VNBLGT6GAN2AHKRFKTKED7WNDDRGNULY5H343ZYV3ETSDZZKGBTAC
MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC
G6S6PWZEFJK7ARWBIFKDU6VYC5DCJ2YFJMWZOLLWWKU52R2QPXZAC
367UBQ6KNAKUEWG32R4QRJ6H7IE7NAZFOPTC3ZOE4Z6E44RV3ISQC
VMPAOJS2ZFOLNXALHWSVM5AFENWX6ZUACB45EJV3HXI7DQNAZPHQC
BZSC7VMYSFRXDHDDAMCDR6X67FN5VWIBOSE76BQLX7OCVOJFUA3AC
LYTVEPH3W5UHF7MAYFWBT6NVNC42HEVKJGGMFDKUDZDNDOI33YJQC
K6GWUOD55G377RVEEMMRPZ4EUAHCM2BGXNRJTE5UZJFFMJGFCEZQC
I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC
76PCXGML77EZWTRI5E6KHLVRAFTJ2AB5YRN5EKOYNAPKTWY2KCGAC
2K7JLB4Z7BS5VFNWD4DO3MKYU7VNPA5MTVHVSDI3FQZ5ICM6XM6QC
OUWD436ATBTZJR53B6XDSI5SXRRNZV7YGRUEA5ACHZC2RUDP7G5QC
SAGSYAPXQ2T6GC3B3TNRPNFTZMS7UMME6YQGSF5MOIM66S5NKB2QC
impl Lock {
pub async fn mut_txn_lock<P: AsRef<Path>>(p: P) -> Result<Self, anyhow::Error> {
let pp = p.as_ref().join("db_lock");
Ok(Lock::MutTxn(mut_txn(&pp).await?))
}
pub async fn txn_lock<P: AsRef<Path>>(p: P) -> Result<Self, anyhow::Error> {
let pp = p.as_ref().join("db_lock");
Ok(Lock::Txn(txn(&pp).await?))
}
pub async fn commit(&mut self) -> Result<(), anyhow::Error> {
match self {
Lock::MutTxn(m) => m.commit().await,
_ => Ok(()),
}
}
}
}
pristine: libpijul::pristine::sanakirja::Pristine::new(&pristine_dir)?,
lock: if mutable {
lock::Lock::mut_txn_lock(&pristine_dir).await?
} else {
lock::Lock::txn_lock(&pristine_dir).await?
},
pristine: unsafe {
libpijul::pristine::sanakirja::Pristine::new_nolock(&pristine_dir)?
},
use anyhow::bail;
use std::io::Read;
use std::path::Path;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
pub struct TxnLock {
_stream: UnixStream,
}
pub struct MutTxnLock {
stream: UnixStream,
}
const TXN_BEGIN: u8 = 0;
const MUT_TXN_BEGIN: u8 = 1;
const COMMIT: u8 = 2;
const ACK: u8 = 3;
pub async fn mut_txn<P: AsRef<Path>>(file: P) -> Result<MutTxnLock, anyhow::Error> {
let cmd = std::env::args().next().unwrap();
let mut process = std::process::Command::new(&cmd)
.args(&["lock", file.as_ref().to_str().unwrap()])
.stdout(std::process::Stdio::piped())
.spawn()?;
let s = process.stdout.as_mut().unwrap();
s.read(&mut [0u8])?;
let mut stream = UnixStream::connect(file).await?;
stream.writable().await?;
stream.write_all(&[MUT_TXN_BEGIN]).await?;
let mut t = [0u8];
stream.read_exact(&mut t).await?;
if t[0] == ACK {
Ok(MutTxnLock { stream })
} else {
bail!("Pristine locked")
}
}
pub async fn txn<P: AsRef<Path>>(file: P) -> Result<TxnLock, anyhow::Error> {
let cmd = std::env::args().next().unwrap();
std::process::Command::new(&cmd)
.args(&["lock", file.as_ref().to_str().unwrap()])
.spawn()?;
let mut stream = UnixStream::connect(file).await?;
stream.write_all(&[TXN_BEGIN]).await?;
let mut t = [0u8];
stream.read_exact(&mut t).await?;
if t[0] == ACK {
Ok(TxnLock { _stream: stream })
} else {
bail!("Pristine locked")
}
}
impl MutTxnLock {
pub async fn commit(&mut self) -> Result<(), anyhow::Error> {
self.stream.write_all(&[COMMIT]).await?;
Ok(())
}
}
use fs2::FileExt;
use std::fs::OpenOptions;
use std::path::Path;
pub struct TxnLock {
file: std::fs::File,
}
pub struct MutTxnLock {
file: std::fs::File,
}
pub async fn mut_txn<P: AsRef<Path>>(file: P) -> Result<MutTxnLock, anyhow::Error> {
let file = OpenOptions::new().write(true).create(true).open(file)?;
file.lock_exclusive()?;
Ok(MutTxnLock { file })
}
pub async fn txn<P: AsRef<Path>>(file: P) -> Result<TxnLock, anyhow::Error> {
let file = OpenOptions::new().write(true).create(true).open(file)?;
file.lock_shared()?;
Ok(TxnLock { file })
}
impl MutTxnLock {
pub async fn commit(&mut self) -> Result<(), anyhow::Error> {
Ok(self.file.unlock()?)
}
}
impl Drop for MutTxnLock {
fn drop(&mut self) {
self.file.unlock().unwrap_or(())
}
}
impl Drop for TxnLock {
fn drop(&mut self) {
self.file.unlock().unwrap_or(())
}
}
SubCommand::Change(change) => change.run(),
SubCommand::Channel(channel) => channel.run(),
SubCommand::Protocol(protocol) => protocol.run(),
SubCommand::Change(change) => change.run().await,
SubCommand::Channel(channel) => channel.run().await,
SubCommand::Protocol(protocol) => protocol.run().await,
SubCommand::Git(git) => git.run(),
SubCommand::Mv(mv) => mv.run(),
SubCommand::Ls(ls) => ls.run(),
SubCommand::Add(add) => add.run(),
SubCommand::Remove(remove) => remove.run(),
SubCommand::Reset(reset) => reset.run(),
SubCommand::Git(git) => git.run().await,
SubCommand::Mv(mv) => mv.run().await,
SubCommand::Ls(ls) => ls.run().await,
SubCommand::Add(add) => add.run().await,
SubCommand::Remove(remove) => remove.run().await,
SubCommand::Reset(reset) => reset.run().await,
SubCommand::Debug(debug) => debug.run(),
SubCommand::Fork(fork) => fork.run(),
SubCommand::Unrecord(unrecord) => unrecord.run(),
SubCommand::Apply(apply) => apply.run(),
SubCommand::Remote(remote) => remote.run(),
SubCommand::Debug(debug) => debug.run().await,
SubCommand::Fork(fork) => fork.run().await,
SubCommand::Unrecord(unrecord) => unrecord.run().await,
SubCommand::Apply(apply) => apply.run().await,
SubCommand::Remote(remote) => remote.run().await,
use fs2::FileExt;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixListener;
use tokio::sync::{watch, Mutex};
use clap::Clap;
#[derive(Clap, Debug)]
pub struct Lock {
path: PathBuf,
}
const COMMIT: u8 = 2;
const ACK: u8 = 3;
const LOCKED: u8 = 4;
impl Lock {
pub async fn run(self) -> Result<(), anyhow::Error> {
let lock = Arc::new(self.path.with_extension("lock"));
let lockfile = OpenOptions::new()
.write(true)
.create(true)
.open(&lock.as_ref())
.unwrap();
if lockfile.try_lock_exclusive().is_err() {
return Ok(());
}
let listener = UnixListener::bind(&self.path).unwrap();
println!();
let n_clients = Arc::new(Mutex::new(0usize));
let file = Arc::new(self.path.clone());
let muttxn = Arc::new(Mutex::new(()));
let (tx, active_at_last_commit) = watch::channel(0);
let tx = std::sync::Arc::new(tx);
let clock = Arc::new(Mutex::new(0usize));
let txn_counter = Arc::new(Mutex::new(0usize));
let last_commit_date = Arc::new(Mutex::new(0usize));
loop {
let x = listener.accept().await;
{
let mut f = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open("log")
.unwrap();
writeln!(f, "accepted").unwrap()
}
match x {
Ok((mut stream, _addr)) => {
*n_clients.lock().await += 1;
{
let mut f = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open("log")
.unwrap();
writeln!(f, "n = {:?}", *n_clients.lock().await).unwrap();
}
let file = file.clone();
let muttxn = muttxn.clone();
let mut active_at_last_commit = active_at_last_commit.clone();
let clock = clock.clone();
let last_commit_date = last_commit_date.clone();
let txn_counter = txn_counter.clone();
let n_clients = n_clients.clone();
let tx = tx.clone();
let lock = lock.clone();
tokio::spawn(async move {
let mut t = [0u8];
while let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == 1 {
// muttxn
let lock = if let Ok(guard) = muttxn.try_lock() {
guard
} else {
stream.write_all(&[LOCKED]).await.unwrap_or(());
muttxn.lock().await
};
while *active_at_last_commit.borrow() > 0 {
stream.write_all(&[LOCKED]).await.unwrap_or(());
active_at_last_commit.changed().await.unwrap();
}
stream.write_all(&[ACK]).await.unwrap_or(());
if let Ok(n) = stream.read(&mut t).await {
if n == 0 {
break;
}
if t[0] == COMMIT {
// commit
let mut clock = clock.lock().await;
*clock += 1;
*last_commit_date.lock().await = *clock;
let counter = *txn_counter.lock().await;
tx.send(counter).unwrap();
}
}
std::mem::drop(lock);
} else {
// txn
*txn_counter.lock().await += 1;
let start_date = *clock.lock().await;
stream.write_all(&[ACK]).await.unwrap_or(());
let n = stream.read(&mut t).await.unwrap_or(0);
if n == 0 {
break;
}
*txn_counter.lock().await -= 1;
if start_date < *last_commit_date.lock().await {
let last = *active_at_last_commit.borrow();
tx.send(last - 1).unwrap();
}
}
}
{
let mut f = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open("log")
.unwrap();
writeln!(f, "n = {:?}", *n_clients.lock().await).unwrap();
}
if *n_clients.lock().await == 1 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if *n_clients.lock().await == 1 {
std::fs::remove_file(file.as_ref()).unwrap_or(());
let lockfile = OpenOptions::new()
.write(true)
.create(true)
.open(lock.as_ref())
.unwrap();
lockfile.unlock().unwrap_or(());
std::process::exit(0)
}
}
*n_clients.lock().await -= 1
});
}
Err(_) => break,
}
}
std::fs::remove_file(file.as_ref()).unwrap_or(());
let lockfile = OpenOptions::new()
.write(true)
.create(true)
.open(lock.as_ref())
.unwrap();
lockfile.unlock().unwrap_or(());
std::process::exit(0)
}
}
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = if let Ok(repo) = Repository::find_root(self.repo_path.clone()) {
pub async fn run(self) -> Result<(), anyhow::Error> {
let repo = if let Ok(repo) = Repository::find_root(self.repo_path.clone()).await {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
pub async fn run(self) -> Result<(), anyhow::Error> {
let repo = unsafe { Repository::find_root_immutable(self.repo_path).await? };
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
pub async fn run(self) -> Result<(), anyhow::Error> {
let repo = unsafe { Repository::find_root_immutable(self.repo_path.clone()).await? };