use libpijul::change::{Change, ChangeFile, ChangeHeader};
use libpijul::changestore::filesystem::*;
use libpijul::changestore::*;
use libpijul::pristine::{Base32, ChangeId, Hash, Merkle, Position, Vertex};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};
use thiserror::*;
use tracing::*;
// use crate::config::Db;

#[derive(Clone)]
pub struct FileSystem {
    pub change_cache:
        Arc<Mutex<lru_cache::LruCache<(uuid::Uuid, ChangeId), Arc<Mutex<ChangeFile>>>>>,
    pub hash_cache: Arc<Mutex<lru_cache::LruCache<(uuid::Uuid, Hash), Arc<Mutex<ChangeFile>>>>>,
    pub id: uuid::Uuid,
    pub changes_dir: PathBuf,
    pub db: crate::config::Db,
}

#[derive(Debug, Error)]
pub enum Error {
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Tag(#[from] libpijul::tag::TagError),
    #[error(transparent)]
    Utf8(#[from] std::str::Utf8Error),
    #[error(transparent)]
    ChangeFile(#[from] libpijul::change::ChangeError),
    #[error(transparent)]
    Persist(#[from] tempfile::PersistError),
    #[error("Change too large")]
    TooLarge,
}

pub fn push_filename(changes_dir: &mut PathBuf, hash: &Hash) {
    let h32 = hash.to_base32();
    let (a, b) = h32.split_at(2);
    changes_dir.push(a);
    changes_dir.push(b);
    changes_dir.set_extension("change");
}

impl FileSystem {
    pub fn filename(&self, hash: &Hash) -> PathBuf {
        let mut path = self.changes_dir.clone();
        push_filename(&mut path, hash);
        path
    }

    pub fn tag_filename(&self, hash: &Merkle) -> PathBuf {
        let mut path = self.changes_dir.clone();
        push_tag_filename(&mut path, hash);
        path
    }

    pub fn has_change(&self, hash: &Hash) -> bool {
        std::fs::metadata(&self.filename(hash)).is_ok()
    }

    fn load<'a, F: Fn(ChangeId) -> Option<Hash>>(
        &'a self,
        hash: F,
        change: ChangeId,
    ) -> Result<
        MutexGuard<'a, lru_cache::LruCache<(uuid::Uuid, ChangeId), Arc<Mutex<ChangeFile>>>>,
        libpijul::change::ChangeError,
    > {
        let mut cache = self.change_cache.lock().unwrap();
        debug!("cache size = {:?}", cache.len());
        if !cache.contains_key(&(self.id, change)) {
            let h = hash(change).unwrap();
            let mut cache_ = self.hash_cache.lock().unwrap();
            if let Some(p) = cache_.get_mut(&(self.id, h)) {
                cache.insert((self.id, change), p.clone());
            } else {
                debug!("cache does not contain {:?} {:?}", change, h);
                let path = self.filename(&h);
                let p = Arc::new(Mutex::new(libpijul::change::ChangeFile::open(
                    h,
                    &path.to_str().unwrap(),
                )?));
                cache.insert((self.id, change), p.clone());
                cache_.insert((self.id, h), p);
            }
        }
        Ok(cache)
    }

    pub async fn check(
        &self,
        file: &mut tokio::fs::File,
        hash: &Hash,
        change_id: Option<ChangeId>,
    ) -> Result<(), libpijul::change::ChangeError> {
        check_from_reader(file, hash).await?;
        if let Some(change_id) = change_id {
            let mut cache = self.change_cache.lock().unwrap();
            cache.remove(&(self.id, change_id));
        }
        Ok(())
    }
}

use libpijul::change::*;
use tokio::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

pub async fn check_from_reader<R: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin>(
    mut r: R,
    hash: &Hash,
) -> Result<Hashed<Hunk<Option<Hash>, Local>, Author>, libpijul::change::ChangeError> {
    use libpijul::change::*;
    use libpijul::pristine::Hasher;
    let mut buf = [0; 4096];
    debug!("read");
    r.read_exact(&mut buf[..Change::OFFSETS_SIZE as usize])
        .await?;
    debug!("read done");
    let offsets: Offsets = bincode::deserialize(&buf)?;
    if offsets.version != VERSION && offsets.version != VERSION_NOENC {
        return Err(ChangeError::VersionMismatch {
            got: offsets.version,
        });
    }

    // Hashing the hashed part
    let mut hasher = Hasher::default();

    let mut dstream = zstd_seekable::DStream::new()?;
    let mut n = 0;
    let mut input = [0; 4096];
    let len = (offsets.unhashed_off - Change::OFFSETS_SIZE) as usize;
    let mut out: Vec<u8> = Vec::new();
    while n < len {
        let len = (len - n).min(4096);
        let mut ia = 0;
        let ib = r.read(&mut input[..len]).await? as usize;
        debug!("read {:?} bytes", ib);
        if ib == 0 {
            break;
        }
        while ia < ib {
            let (a, b) = dstream.decompress(&mut buf[..], &input[ia..ib])?;
            debug!("decompressed {:?} bytes from compressed {:?}", a, b);
            hasher.update(&buf[..a]);
            out.extend(&buf[..a]);
            ia += b;
        }
        n += ib;
    }

    let computed_hash = hasher.finish();
    debug!("{:?} {:?}", computed_hash, hash);
    if &computed_hash != hash {
        return Err((ChangeError::ChangeHashMismatch {
            claimed: *hash,
            computed: computed_hash,
        })
        .into());
    }

    let hashed: Hashed<Hunk<Option<Hash>, Local>, Author> = if offsets.version == VERSION {
        bincode::deserialize(&out)?
    } else {
        let h: Hashed<noenc::Hunk<Option<Hash>, Local>, noenc::Author> =
            bincode::deserialize(&out)?;
        h.into()
    };

    let mut hasher = Hasher::default();
    r.seek(SeekFrom::Start(offsets.contents_off)).await?;
    let mut dstream = zstd_seekable::DStream::new()?;
    let mut n = 0;
    let mut input = [0; 4096];
    let len = offsets.total - offsets.contents_off;
    while n < len {
        let len = (len - n).min(4096);
        let mut ia = 0;
        let ib = r.read(&mut input[..len as usize]).await?;
        debug!("read {:?} bytes", ib);
        if ib == 0 {
            break;
        }
        while ia < ib {
            let (a, b) = dstream.decompress(&mut buf[..], &input[ia..ib])?;
            debug!("decompressed {:?} bytes from compressed {:?}", a, b);
            hasher.update(&buf[..a]);
            ia += b;
        }
        n += ib as u64;
    }

    let computed_hash = hasher.finish();
    debug!(
        "contents hash: {:?}, computed: {:?}",
        hashed.contents_hash, computed_hash
    );
    if computed_hash != hashed.contents_hash {
        return Err(ChangeError::ContentsHashMismatch {
            claimed: hashed.contents_hash,
            computed: computed_hash,
        });
    }
    Ok(hashed)
}

impl ChangeStore for FileSystem {
    type Error = Error;
    fn has_contents(&self, hash: Hash, change_id: Option<ChangeId>) -> bool {
        if let Some(change_id) = change_id {
            let mut cache = self.load(|_| Some(hash), change_id).unwrap();
            let mut poisoned = false;
            if let Some(c) = cache.get_mut(&(self.id, change_id)) {
                if let Ok(l) = c.lock() {
                    return l.has_contents();
                } else {
                    poisoned = true
                }
            }
            if poisoned {
                cache.remove(&(self.id, change_id));
            }
        }
        debug!("has_contents {:?} {:?}", hash, change_id);
        let path = self.filename(&hash);
        if let Ok(p) = libpijul::change::ChangeFile::open(hash, &path.to_str().unwrap()) {
            p.has_contents()
        } else {
            false
        }
    }

    fn get_header(&self, h: &Hash) -> Result<ChangeHeader, Self::Error> {
        debug!("get_header {:?}", h);
        let mut cache = self.hash_cache.lock().unwrap();
        if let Some(c) = cache.get_mut(&(self.id, *h)) {
            return Ok(c.lock().unwrap().hashed().header.clone());
        }
        let path = self.filename(h);
        let p = libpijul::change::ChangeFile::open(*h, &path.to_str().unwrap())?;
        let hdr = p.hashed().header.clone();
        cache.insert((self.id, *h), Arc::new(Mutex::new(p)));
        Ok(hdr)
    }

    fn get_tag_header(&self, h: &Merkle) -> Result<ChangeHeader, Self::Error> {
        let path = self.tag_filename(h);
        let mut p = libpijul::tag::OpenTagFile::open(&path, h)?;
        Ok(p.header()?)
    }

    fn get_contents<F: Fn(ChangeId) -> Option<Hash>>(
        &self,
        hash: F,
        key: Vertex<ChangeId>,
        buf: &mut [u8],
    ) -> Result<usize, Self::Error> {
        let key_end: u64 = key.end.0.into();
        let key_start: u64 = key.start.0.into();
        // buf.resize((key_end - key_start) as usize, 0);
        if key_end <= key_start || key.is_root() {
            return Ok(0);
        }
        let mut cache = self.load(hash, key.change)?;
        let p = cache.get_mut(&(self.id, key.change)).unwrap();
        let mut p = p.lock().unwrap();
        let n = p.read_contents(key.start.0.into(), buf)?;
        Ok(n)
    }

    fn get_contents_ext(
        &self,
        key: Vertex<Option<Hash>>,
        buf: &mut [u8],
    ) -> Result<usize, Self::Error> {
        debug!("get_contents_ext {:?}", key);
        if let Some(change) = key.change {
            // let key_end: u64 = key.end.0.into();
            // let key_start: u64 = key.start.0.into();
            // buf.resize((key_end - key_start) as usize, 0);
            if key.end <= key.start {
                return Ok(0);
            }
            {
                if let Some(p) = self.hash_cache.lock().unwrap().get_mut(&(self.id, change)) {
                    let n = p.lock().unwrap().read_contents(key.start.0.into(), buf)?;
                    return Ok(n);
                }
            }
            let path = self.filename(&change);
            let mut p = libpijul::change::ChangeFile::open(change, &path.to_str().unwrap())?;
            let n = p.read_contents(key.start.0.into(), buf)?;
            debug!("taking lock");
            self.hash_cache
                .lock()
                .unwrap()
                .insert((self.id, change), Arc::new(Mutex::new(p)));
            debug!("inserted");
            Ok(n)
        } else {
            Ok(0)
        }
    }

    fn change_deletes_position<F: Fn(ChangeId) -> Option<Hash>>(
        &self,
        hash: F,
        change: ChangeId,
        pos: Position<Option<Hash>>,
    ) -> Result<Vec<Hash>, Self::Error> {
        let mut cache = self.load(hash, change)?;
        let p = cache.get_mut(&(self.id, change)).unwrap();
        let p = p.lock().unwrap();
        let mut v = Vec::new();
        for c in p.hashed().changes.iter() {
            for c in c.iter() {
                v.extend(c.deletes_pos(pos).into_iter())
            }
        }
        Ok(v)
    }

    fn save_change<
        E: From<Self::Error> + From<libpijul::change::ChangeError>,
        F: FnOnce(&mut Change, &Hash) -> Result<(), E>,
    >(
        &self,
        p: &mut Change,
        ff: F,
    ) -> Result<Hash, E> {
        let mut f = match tempfile::NamedTempFile::new_in(&self.changes_dir) {
            Ok(f) => f,
            Err(e) => return Err(E::from(Self::Error::from(e))),
        };
        let hash = {
            let w = std::io::BufWriter::new(&mut f);
            p.serialize(w, ff)?
        };
        let file_name = self.filename(&hash);
        if let Err(e) = std::fs::create_dir_all(file_name.parent().unwrap()) {
            return Err(E::from(Self::Error::from(e)));
        }
        debug!("file_name = {:?}", file_name);
        if let Err(e) = f.persist(file_name) {
            return Err(E::from(Self::Error::from(e)));
        }
        Ok(hash)
    }

    fn del_change(&self, hash: &Hash) -> Result<bool, Self::Error> {
        let file_name = self.filename(hash);
        debug!("file_name = {:?}", file_name);
        let db = self.db.clone();
        let id = self.id.clone();
        tokio::spawn(async move {
            if let Ok(meta) = tokio::fs::metadata(&file_name).await {
                if tokio::fs::remove_file(&file_name).await.is_ok() {
                    tokio::fs::remove_dir(file_name.parent().unwrap())
                        .await
                        .unwrap_or(()); // fails silently if there are still changes with the same 2-letter prefix.

                    super::free_used_storage(&mut *db.get().await?, id, meta.len()).await?
                    /*
                                        let n = db.execute(
                                            "UPDATE users SET storage_used = GREATEST(0, storage_used - $2)
                    FROM repositories
                    WHERE repositories.id = $1 AND repositories.owner = users.id",
                                            &[&id, &(meta.len() as i64)]
                                    ).await?;
                                        */
                }
            }
            Ok::<(), crate::Error>(())
        });
        Ok(true)
    }

    fn get_change(&self, h: &Hash) -> Result<Change, Self::Error> {
        let file_name = self.filename(h);
        let file_name = file_name.to_str().unwrap();
        debug!("file_name = {:?}", file_name);
        let m = std::fs::metadata(&file_name)?;
        if m.len() >= MAX_CHANGE_SIZE {
            info!("Change {:?} {:?} too large", self.id, h);
            // return Err(Error::TooLarge)
        }
        Ok(Change::deserialize(&file_name, Some(h))?)
    }
}

// 4Mb
const MAX_CHANGE_SIZE: u64 = 1 << 22;