use crate::repository::*;
use ::replication::Update;
use libpijul::pristine::sanakirja::MutTxn;
use libpijul::{ChannelMutTxnT, MutTxnT, MutTxnTExt, TxnT, TxnTExt};
use std::pin::Pin;
use thiserror::*;
use tracing::*;
#[derive(Clone)]
pub struct H {
locks: RepositoryLocks,
db: crate::config::Db,
}
impl H {
pub fn new(locks: RepositoryLocks, db: crate::config::Db) -> Self {
H { locks, db }
}
}
use libpijul::pristine::sanakirja::SanakirjaError;
use libpijul::pristine::{ForkError, TxnErr};
use libpijul::{ApplyError, UnrecordError};
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Sk(#[from] SanakirjaError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Txn(#[from] TxnErr<SanakirjaError>),
#[error(transparent)]
Fork(#[from] ForkError<SanakirjaError>),
#[error(transparent)]
Unrec(#[from] UnrecordError<crate::repository::changestore::Error, MutTxn<()>>),
#[error(transparent)]
Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn<()>>),
#[error("Join error")]
Join,
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
}
impl ::replication::Handler for H {
type Error = Error;
type F =
Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;
fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
let s = self.clone();
Box::pin(async move {
match update {
Update::Change { .. } => {}
Update::Apply {
repo,
channel,
hash,
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let channel_ = txn.open_or_create_channel(&channel)?;
let result =
txn.apply_change_rec(&repo_.changes, &mut *channel_.write(), &hash);
match result {
Err(libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::ChangeAlreadyOnChannel { .. },
)) => {
error!(
"ignored error, apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(None);
}
Err(libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::DependencyMissing { hash },
)) => {
error!(
"apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(Some(
libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::DependencyMissing { hash },
)
.into(),
));
}
Ok(_) => {
debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
txn.touch_channel(&mut *channel_.write(), None);
txn.commit()?;
Ok::<_, Error>(None)
})
.await
{
Ok(Ok(Some(x))) => return Ok(Some(x)),
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::Unrecord {
repo,
channel,
hash,
..
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let mut channel_ = txn.open_or_create_channel(&channel)?;
let result = txn.unrecord(&repo_.changes, &mut channel_, &hash, 0);
match result {
Err(libpijul::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
let mut in_other_channels = false;
for ch in txn.channels("")? {
if txn.get_revchanges(&ch, &hash.into())?.is_some() {
in_other_channels = true;
break;
}
}
txn.touch_channel(&mut *channel_.write(), None);
txn.commit()?;
Ok::<_, Error>(in_other_channels)
})
.await
{
Ok(Ok(false)) => {
let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
libpijul::changestore::filesystem::push_filename(&mut p, &hash);
if let Ok(meta) = tokio::fs::metadata(&p).await {
std::fs::remove_file(&p).unwrap_or(());
std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
if is_source {
crate::repository::free_used_storage(
&mut *s.db.get().await.unwrap(),
repo,
meta.len(),
)
.await
.unwrap();
}
}
}
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::NewChannel { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.open_or_create_channel(&channel)?;
txn.commit()?;
}
Update::Fork { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let chan = txn.open_or_create_channel(&channel)?;
let new = format!("{}_{}", repo, new);
match txn.fork(&chan, &new) {
Ok(_) => txn.commit()?,
Err(ForkError::ChannelNameExists(_)) => {}
Err(e) => return Err(e.into()),
}
}
Update::Prune { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.drop_channel(&channel)?;
txn.commit()?;
}
Update::Rename { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
if let Some(mut c) = txn.load_channel(&channel)? {
let new = format!("{}_{}", repo, new);
debug!("rename {:?} to {:?}", channel, new);
txn.rename_channel(&mut c, &new)?;
txn.commit()?;
} else {
debug!("not rename");
}
}
}
Ok(None)
})
}
}