use crate::config::Config;
use crate::permissions::*;
use diesel::sql_types::Bool;
use diesel::{
    ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use libpijul::{Base32, MutTxnT};
use lru_cache::LruCache;
use serde_derive::*;
use std::fs::{create_dir_all, metadata};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tracing::*;
use uuid::Uuid;

pub mod changestore;
pub mod router;
pub mod admin;

#[derive(Debug, Deserialize)]
pub struct TreePath {
    pub owner: String,
    pub repo: String,
}

#[derive(Serialize)]
pub struct RepositoryPath<'a> {
    pub owner: &'a str,
    pub repo: &'a str,
    pub path: &'a [PathElement],
    pub n_followers: u64,
    pub user_is_follower: bool,
}

#[derive(Serialize)]
pub struct Description<'a> {
    pub descr: &'a str,
    pub repo_id: uuid::Uuid,
}

#[derive(Clone, Debug)]
pub struct RepositoryId {
    pub owner_id: Uuid,
    pub repo_id: Uuid,
    pub fork_origin: Option<Uuid>,
}

pub fn nest_path(config: &Config, fork_origin: Uuid) -> PathBuf {
    let mut p = config.repositories_path.clone();
    p.push(&format!("{}", fork_origin));
    p
}

pub fn nest_pristine_path(config: &Config, fork_origin: Uuid) -> PathBuf {
    let mut p = nest_path(config, fork_origin);
    p.push(".pijul");
    p.push("pristine");
    p
}

pub fn nest_changes_path(config: &Config, fork_origin: Uuid) -> PathBuf {
    let mut p = nest_path(config, fork_origin);
    p.push(".pijul");
    p.push("changes");
    p
}

impl RepositoryId {
    pub fn origin_id(&self) -> Uuid {
        self.fork_origin.unwrap_or(self.repo_id)
    }

    pub fn nest_changes_path(&self, config: &Config) -> PathBuf {
        nest_changes_path(config, self.origin_id())
    }
}

type Locks = Mutex<LruCache<Uuid, Arc<Repo>>>;

pub struct Repo {
    pub pristine: RwLock<libpijul::pristine::sanakirja::Pristine>,
    pub changes: changestore::FileSystem,
}

#[derive(Clone)]
pub struct RepositoryLocks {
    pub config: Arc<Config>,
    locks: Pin<Arc<Locks>>,
}

impl std::fmt::Debug for RepositoryLocks {
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
        write!(fmt, "RepositoryLocks {{}}")
    }
}

impl RepositoryLocks {
    pub fn new(config: Arc<Config>) -> Self {
        let cache_size = config.repository_cache_size;
        RepositoryLocks {
            config,
            locks: Arc::pin(Mutex::new(LruCache::new(cache_size))),
        }
    }

    pub async fn remove(&self, repo_id: &uuid::Uuid) -> Result<(), crate::Error> {
        let mut locks = self.locks.lock().await;
        drop(locks.remove(repo_id));
        Ok(())
    }

    pub async fn get(&self, repo_id: &Uuid) -> Result<Arc<Repo>, crate::Error> {
        debug!("get: taking lock {:?}", repo_id);
        let mut locks = self.locks.lock().await;
        unsafe {
            info!(
                "repositorylocks size: {:?} {:?}",
                locks.len(),
                heapsize::heap_size_of::<LruCache<_, _>>(&*locks)
            );
        }
        debug!("get: lock taken");
        {
            if let Some(repo_lock) = locks.get_mut(repo_id).map(|x| x.clone()) {
                debug!("get: found");
                std::mem::drop(locks);
                return Ok(repo_lock);
            }
        }
        debug!("get: not found, inserting");
        let path = nest_pristine_path(&self.config, *repo_id);
        debug!("get: open_or_create {:?}", path);
        let repo = unsafe {
            // Ok to do here because we have a lock.
            self.open_or_create_repository(&path)?
        };
        debug!("get: open_or_create done");
        let path = nest_changes_path(&self.config, *repo_id);
        debug!("path = {:?}", path);
        create_dir_all(&path)?;
        let repo = Arc::new(Repo {
            pristine: RwLock::new(repo),
            changes: changestore::FileSystem {
                change_cache: self.config.change_cache.clone(),
                hash_cache: self.config.hash_cache.clone(),
                changes_dir: path,
                id: *repo_id,
                db: self.config.db.clone(),
            },
        });
        locks.insert(*repo_id, repo.clone());
        std::mem::drop(locks);
        debug!("get: done");
        Ok(repo)
    }

    // `branch` comes with its prefix
    unsafe fn open_or_create_repository(
        &self,
        path: &Path,
    ) -> Result<libpijul::pristine::sanakirja::Pristine, crate::Error> {
        if metadata(&path).is_err() {
            create_dir_all(&path)?;
            let repo = libpijul::pristine::sanakirja::Pristine::new_nolock(&path.join("db"))?;
            repo.mut_txn_begin()?.commit()?;
            Ok(repo)
        } else {
            Ok(libpijul::pristine::sanakirja::Pristine::new_nolock(
                &path.join("db"),
            )?)
        }
    }
}

#[derive(Debug)]
pub enum ChannelSpec {
    Channel(String),
    CI(String),
    Discussion(i32),
}

impl ChannelSpec {
    pub fn channel(c: std::borrow::Cow<str>) -> Self {
        if c.ends_with("/ci") {
            ChannelSpec::CI(c.split_at(c.len() - 3).0.to_string())
        } else {
            ChannelSpec::Channel(c.into_owned())
        }
    }
}

pub fn channel_spec(repo_id: &RepositoryId, channel: &str) -> String {
    if channel.ends_with("/ci") {
        format!(
            "{}_{}",
            repo_id.repo_id,
            channel.split_at(channel.len() - 3).0
        )
    } else {
        format!("{}_{}", repo_id.repo_id, channel)
    }
}

pub fn channel_spec_id(repo_id: uuid::Uuid, channel: &str) -> String {
    if channel.ends_with("/ci") {
        format!("{}_{}", repo_id, channel.split_at(channel.len() - 3).0)
    } else {
        format!("{}_{}", repo_id, channel)
    }
}

pub async fn repository_id(
    db: &mut AsyncPgConnection,
    owner: &str,
    name: &str,
    user: Option<uuid::Uuid>,
    required: Perm,
) -> Result<(Uuid, Perm), crate::Error> {
    use crate::db::repositories::dsl as r;
    use crate::db::users::dsl as u;
    if let Some((rid, perms, suspended, is_public)) = r::repositories
        .inner_join(u::users)
        .filter(u::login.eq(owner))
        .filter(u::is_active)
        .filter(r::name.eq(name))
        .filter(r::is_active)
        .select((
            r::id,
            crate::permissions!(user.unwrap_or(uuid::Uuid::nil()), r::id),
            u::suspended,
            is_public(),
        ))
        .get_result::<(uuid::Uuid, i64, bool, bool)>(db)
        .await.optional()?
    {
        let got = if suspended {
            Perm::empty()
        } else if is_public {
            Perm::from_bits(perms).unwrap() | Perm::READ
        } else {
            Perm::from_bits(perms).unwrap()
        };
        if got.contains(required) {
            Ok((rid, got))
        } else {
            Err(crate::Error::Permissions {
                required, got
            })
        }
    } else {
        Err(crate::Error::RepositoryNotFound)
    }
}

pub fn is_public() -> diesel::expression::SqlLiteral<Bool> {
    diesel::dsl::sql::<Bool>("EXISTS (SELECT 1 FROM permissions WHERE permissions.user_id = '00000000-0000-0000-0000-000000000000' AND permissions.repo_id = repositories.id)")
}

pub async fn repository(
    db: &mut AsyncPgConnection,
    owner: &str,
    name: &str,
    user: Uuid,
    required: Perm,
) -> Result<Repository, crate::Error> {
    use crate::db::repositories::dsl as r;
    use crate::db::users::dsl as u;

    if let Some((
        repo_id,
        perms,
        suspended,
        is_public,
    )) = r::repositories
        .inner_join(u::users)
        .filter(u::login.eq(owner))
        .filter(u::is_active)
        .filter(r::name.eq(name))
        .filter(r::is_active)
        .select((
            r::id,
            crate::permissions!(user, r::id),
            u::suspended,
            is_public(),
        ))
        .get_result::<(
            uuid::Uuid,
            i64,
            bool,
            bool,
        )>(db)
        .await
        .optional()?
    {
        let got = if suspended {
            Perm::empty()
        } else if is_public {
            Perm::from_bits(perms).unwrap() | Perm::READ
        } else {
            Perm::from_bits(perms).unwrap()
        };
        if !got.contains(required) {
            Err(crate::Error::Permissions { required, got })
        } else {
            Ok(Repository {
                id: repo_id,
                permissions: got,
            })
        }
    } else {
        Err(crate::Error::RepositoryNotFound)
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RepoPath {
    pub owner: String,
    pub repo: String,
    pub channel: Option<String>,
}

#[derive(Debug, Deserialize)]
pub struct RepoPath_ {
    owner: String,
    repo: String,
}

impl<S> axum::extract::FromRequestParts<S> for RepoPath
where
    S: Send + Sync,
{
    type Rejection =
        <axum::extract::Path<RepoPath_> as axum::extract::FromRequestParts<S>>::Rejection;

    async fn from_request_parts(
        parts: &mut http::request::Parts,
        state: &S,
    ) -> Result<Self, Self::Rejection> {
        use axum::extract::Path;
        let Path(r) = Path::<RepoPath_>::from_request_parts(parts, state).await?;
        debug!("path r {:?}", r);
        let mut it = r.repo.split(':');
        Ok(RepoPath {
            owner: r.owner,
            repo: it.next().unwrap().to_string(),
            channel: it.next().map(|x| x.to_string()),
        })
    }
}

impl Repo {
    pub async fn channels(&self) -> Vec<String> {
        use libpijul::TxnT;
        let mut c = Vec::new();
        let pristine = self.pristine.read().await;
        let txn = pristine.txn_begin().unwrap();
        let mut at_least_one = false;
        for chan in txn.channels("").unwrap() {
            at_least_one = true;
            let name = {
                let s = chan.read();
                debug!("channels: {:?}", s.name.as_str());
                let (_, s) = s.name.as_str().split_at(37);
                s.to_string()
            };
            c.push(name)
        }
        c.sort();
        if !at_least_one {
            c.push("main".to_string())
        }
        c
    }
}

pub async fn free_used_storage(
    db: &mut diesel_async::AsyncPgConnection,
    repo: uuid::Uuid,
    size: u64,
) -> Result<(), diesel::result::Error> {
    use crate::db::repositories::dsl as repositories;
    use crate::db::users::dsl as users;
    use diesel::sql_types::BigInt;
    let n = diesel::update(users::users)
        .filter(
            users::id.eq_any(
                repositories::repositories
                    .filter(repositories::id.eq(repo))
                    .select(repositories::owner),
            ),
        )
        .set(
            users::storage_used.eq(diesel::dsl::sql("GREATEST(0, storage_used - ")
                .bind::<BigInt, _>(size as i64)
                .sql(")")),
        )
        .execute(db)
        .await?;

    debug!("del_change updated {:?}", n);
    Ok(())
}

#[derive(Debug)]
pub struct Repository {
    pub id: uuid::Uuid,
    pub permissions: Perm,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Identity {
    pub public_key: libpijul::key::PublicKey,
    pub login: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub name: Option<String>,
    #[serde(default, skip_serializing_if = "String::is_empty")]
    pub origin: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub email: Option<String>,
    pub last_modified: u64,
}

#[derive(Debug, Serialize)]
pub struct PathElement {
    pub basename: String,
    pub pos: String,
    pub meta: libpijul::pristine::InodeMetadata,
}

fn current_path<
    T: libpijul::GraphTxnT + libpijul::ChannelTxnT,
    C: libpijul::changestore::ChangeStore,
>(
    txn: &T,
    channel: &T::Channel,
    changes: &C,
    pos: libpijul::pristine::Position<libpijul::ChangeId>,
) -> Result<Vec<PathElement>, crate::Error> {
    let mut current_path = Vec::new();
    // Deciding whether this is a file or a directory.
    {
        let mut pos = pos;
        while !pos.is_root() {
            if let Some(x) = libpijul::fs::iter_basenames(txn, changes, txn.graph(channel), pos)
                .map_err(|_| crate::Error::Txn)?
                .next()
            {
                let (grandparent, meta, basename) = x.map_err(|_| crate::Error::Txn)?;
                current_path.push(PathElement {
                    basename: basename,
                    pos: pos.to_base32(),
                    meta,
                });
                pos = grandparent
            } else {
                // non-null root.
                break;
            }
        }
    }
    current_path.reverse();
    Ok(current_path)
}