use crate::repository::*;
use ::replication::Update;
use libpijul::pristine::sanakirja::MutTxn;
use libpijul::{ChannelMutTxnT, MutTxnT, MutTxnTExt, TxnT, TxnTExt};
use std::pin::Pin;
use thiserror::*;
use tracing::*;

#[derive(Clone)]
pub struct H {
    locks: RepositoryLocks,
    db: crate::config::Db,
}

impl H {
    pub fn new(locks: RepositoryLocks, db: crate::config::Db) -> Self {
        H { locks, db }
    }
}

use libpijul::pristine::sanakirja::SanakirjaError;
use libpijul::pristine::{ForkError, TxnErr};
use libpijul::{ApplyError, UnrecordError};

#[derive(Debug, Error)]
pub enum Error {
    #[error(transparent)]
    Sk(#[from] SanakirjaError),
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Txn(#[from] TxnErr<SanakirjaError>),
    #[error(transparent)]
    Fork(#[from] ForkError<SanakirjaError>),
    #[error(transparent)]
    Unrec(#[from] UnrecordError<crate::repository::changestore::Error, MutTxn<()>>),
    #[error(transparent)]
    Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn<()>>),
    #[error("Join error")]
    Join,
    #[error(transparent)]
    Diesel(#[from] diesel::result::Error),
}

impl ::replication::Handler for H {
    type Error = Error;
    type F =
        Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;

    fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
        let s = self.clone();
        Box::pin(async move {
            match update {
                Update::Change { .. } => {}
                Update::Apply {
                    repo,
                    channel,
                    hash,
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let mut txn = pri.mut_txn_begin()?;
                        let channel = format!("{}_{}", repo, channel);
                        let channel_ = txn.open_or_create_channel(&channel)?;
                        let result =
                            txn.apply_change_rec(&repo_.changes, &mut *channel_.write(), &hash);
                        match result {
                            Err(libpijul::ApplyError::LocalChange(
                                libpijul::LocalApplyError::ChangeAlreadyOnChannel { .. },
                            )) => {
                                error!(
                                    "ignored error, apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(None);
                            }
                            Err(libpijul::ApplyError::LocalChange(
                                libpijul::LocalApplyError::DependencyMissing { hash },
                            )) => {
                                error!(
                                    "apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(Some(
                                    libpijul::ApplyError::LocalChange(
                                        libpijul::LocalApplyError::DependencyMissing { hash },
                                    )
                                    .into(),
                                ));
                            }
                            Ok(_) => {
                                debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }
                        txn.touch_channel(&mut *channel_.write(), None);
                        txn.commit()?;
                        Ok::<_, Error>(None)
                    })
                    .await
                    {
                        Ok(Ok(Some(x))) => return Ok(Some(x)),
                        Ok(Ok(None)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::Unrecord {
                    repo,
                    channel,
                    hash,
                    ..
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let mut txn = pri.mut_txn_begin()?;
                        let channel = format!("{}_{}", repo, channel);
                        let mut channel_ = txn.open_or_create_channel(&channel)?;
                        let result = txn.unrecord(&repo_.changes, &mut channel_, &hash, 0);
                        match result {
                            Err(libpijul::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
                                debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }

                        let mut in_other_channels = false;
                        for ch in txn.channels("")? {
                            if txn.get_revchanges(&ch, &hash.into())?.is_some() {
                                in_other_channels = true;
                                break;
                            }
                        }
                        txn.touch_channel(&mut *channel_.write(), None);
                        txn.commit()?;
                        Ok::<_, Error>(in_other_channels)
                    })
                    .await
                    {
                        Ok(Ok(false)) => {
                            let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
                            libpijul::changestore::filesystem::push_filename(&mut p, &hash);
                            if let Ok(meta) = tokio::fs::metadata(&p).await {
                                std::fs::remove_file(&p).unwrap_or(());
                                std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
                                if is_source {
                                    crate::repository::free_used_storage(
                                        &mut *s.db.get().await.unwrap(),
                                        repo,
                                        meta.len(),
                                    )
                                    .await
                                    .unwrap();
                                }
                            }
                        }
                        Ok(Ok(_)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::NewChannel { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.open_or_create_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Fork { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    let chan = txn.open_or_create_channel(&channel)?;
                    let new = format!("{}_{}", repo, new);
                    match txn.fork(&chan, &new) {
                        Ok(_) => txn.commit()?,
                        Err(ForkError::ChannelNameExists(_)) => {}
                        Err(e) => return Err(e.into()),
                    }
                }
                Update::Prune { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.drop_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Rename { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    if let Some(mut c) = txn.load_channel(&channel)? {
                        let new = format!("{}_{}", repo, new);
                        debug!("rename {:?} to {:?}", channel, new);
                        txn.rename_channel(&mut c, &new)?;
                        txn.commit()?;
                    } else {
                        debug!("not rename");
                    }
                }
            }
            Ok(None)
        })
    }
}