use libpijul::change::{Change, ChangeFile, ChangeHeader};
use libpijul::changestore::filesystem::*;
use libpijul::changestore::*;
use libpijul::pristine::{Base32, ChangeId, Hash, Merkle, Position, Vertex};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Mutex, MutexGuard};
use thiserror::*;
use tracing::*;
#[derive(Clone)]
pub struct FileSystem {
pub change_cache:
Arc<Mutex<lru_cache::LruCache<(uuid::Uuid, ChangeId), Arc<Mutex<ChangeFile>>>>>,
pub hash_cache: Arc<Mutex<lru_cache::LruCache<(uuid::Uuid, Hash), Arc<Mutex<ChangeFile>>>>>,
pub id: uuid::Uuid,
pub changes_dir: PathBuf,
pub db: crate::config::Db,
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Tag(#[from] libpijul::tag::TagError),
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
#[error(transparent)]
ChangeFile(#[from] libpijul::change::ChangeError),
#[error(transparent)]
Persist(#[from] tempfile::PersistError),
#[error("Change too large")]
TooLarge,
}
pub fn push_filename(changes_dir: &mut PathBuf, hash: &Hash) {
let h32 = hash.to_base32();
let (a, b) = h32.split_at(2);
changes_dir.push(a);
changes_dir.push(b);
changes_dir.set_extension("change");
}
impl FileSystem {
pub fn filename(&self, hash: &Hash) -> PathBuf {
let mut path = self.changes_dir.clone();
push_filename(&mut path, hash);
path
}
pub fn tag_filename(&self, hash: &Merkle) -> PathBuf {
let mut path = self.changes_dir.clone();
push_tag_filename(&mut path, hash);
path
}
pub fn has_change(&self, hash: &Hash) -> bool {
std::fs::metadata(&self.filename(hash)).is_ok()
}
fn load<'a, F: Fn(ChangeId) -> Option<Hash>>(
&'a self,
hash: F,
change: ChangeId,
) -> Result<
MutexGuard<'a, lru_cache::LruCache<(uuid::Uuid, ChangeId), Arc<Mutex<ChangeFile>>>>,
libpijul::change::ChangeError,
> {
let mut cache = self.change_cache.lock().unwrap();
debug!("cache size = {:?}", cache.len());
if !cache.contains_key(&(self.id, change)) {
let h = hash(change).unwrap();
let mut cache_ = self.hash_cache.lock().unwrap();
if let Some(p) = cache_.get_mut(&(self.id, h)) {
cache.insert((self.id, change), p.clone());
} else {
debug!("cache does not contain {:?} {:?}", change, h);
let path = self.filename(&h);
let p = Arc::new(Mutex::new(libpijul::change::ChangeFile::open(
h,
&path.to_str().unwrap(),
)?));
cache.insert((self.id, change), p.clone());
cache_.insert((self.id, h), p);
}
}
Ok(cache)
}
pub async fn check(
&self,
file: &mut tokio::fs::File,
hash: &Hash,
change_id: Option<ChangeId>,
) -> Result<(), libpijul::change::ChangeError> {
check_from_reader(file, hash).await?;
if let Some(change_id) = change_id {
let mut cache = self.change_cache.lock().unwrap();
cache.remove(&(self.id, change_id));
}
Ok(())
}
}
use libpijul::change::*;
use tokio::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub async fn check_from_reader<R: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin>(
mut r: R,
hash: &Hash,
) -> Result<Hashed<Hunk<Option<Hash>, Local>, Author>, libpijul::change::ChangeError> {
use libpijul::change::*;
use libpijul::pristine::Hasher;
let mut buf = [0; 4096];
debug!("read");
r.read_exact(&mut buf[..Change::OFFSETS_SIZE as usize])
.await?;
debug!("read done");
let offsets: Offsets = bincode::deserialize(&buf)?;
if offsets.version != VERSION && offsets.version != VERSION_NOENC {
return Err(ChangeError::VersionMismatch {
got: offsets.version,
});
}
let mut hasher = Hasher::default();
let mut dstream = zstd_seekable::DStream::new()?;
let mut n = 0;
let mut input = [0; 4096];
let len = (offsets.unhashed_off - Change::OFFSETS_SIZE) as usize;
let mut out: Vec<u8> = Vec::new();
while n < len {
let len = (len - n).min(4096);
let mut ia = 0;
let ib = r.read(&mut input[..len]).await? as usize;
debug!("read {:?} bytes", ib);
if ib == 0 {
break;
}
while ia < ib {
let (a, b) = dstream.decompress(&mut buf[..], &input[ia..ib])?;
debug!("decompressed {:?} bytes from compressed {:?}", a, b);
hasher.update(&buf[..a]);
out.extend(&buf[..a]);
ia += b;
}
n += ib;
}
let computed_hash = hasher.finish();
debug!("{:?} {:?}", computed_hash, hash);
if &computed_hash != hash {
return Err((ChangeError::ChangeHashMismatch {
claimed: *hash,
computed: computed_hash,
})
.into());
}
let hashed: Hashed<Hunk<Option<Hash>, Local>, Author> = if offsets.version == VERSION {
bincode::deserialize(&out)?
} else {
let h: Hashed<noenc::Hunk<Option<Hash>, Local>, noenc::Author> =
bincode::deserialize(&out)?;
h.into()
};
let mut hasher = Hasher::default();
r.seek(SeekFrom::Start(offsets.contents_off)).await?;
let mut dstream = zstd_seekable::DStream::new()?;
let mut n = 0;
let mut input = [0; 4096];
let len = offsets.total - offsets.contents_off;
while n < len {
let len = (len - n).min(4096);
let mut ia = 0;
let ib = r.read(&mut input[..len as usize]).await?;
debug!("read {:?} bytes", ib);
if ib == 0 {
break;
}
while ia < ib {
let (a, b) = dstream.decompress(&mut buf[..], &input[ia..ib])?;
debug!("decompressed {:?} bytes from compressed {:?}", a, b);
hasher.update(&buf[..a]);
ia += b;
}
n += ib as u64;
}
let computed_hash = hasher.finish();
debug!(
"contents hash: {:?}, computed: {:?}",
hashed.contents_hash, computed_hash
);
if computed_hash != hashed.contents_hash {
return Err(ChangeError::ContentsHashMismatch {
claimed: hashed.contents_hash,
computed: computed_hash,
});
}
Ok(hashed)
}
impl ChangeStore for FileSystem {
type Error = Error;
fn has_contents(&self, hash: Hash, change_id: Option<ChangeId>) -> bool {
if let Some(change_id) = change_id {
let mut cache = self.load(|_| Some(hash), change_id).unwrap();
let mut poisoned = false;
if let Some(c) = cache.get_mut(&(self.id, change_id)) {
if let Ok(l) = c.lock() {
return l.has_contents();
} else {
poisoned = true
}
}
if poisoned {
cache.remove(&(self.id, change_id));
}
}
debug!("has_contents {:?} {:?}", hash, change_id);
let path = self.filename(&hash);
if let Ok(p) = libpijul::change::ChangeFile::open(hash, &path.to_str().unwrap()) {
p.has_contents()
} else {
false
}
}
fn get_header(&self, h: &Hash) -> Result<ChangeHeader, Self::Error> {
debug!("get_header {:?}", h);
let mut cache = self.hash_cache.lock().unwrap();
if let Some(c) = cache.get_mut(&(self.id, *h)) {
return Ok(c.lock().unwrap().hashed().header.clone());
}
let path = self.filename(h);
let p = libpijul::change::ChangeFile::open(*h, &path.to_str().unwrap())?;
let hdr = p.hashed().header.clone();
cache.insert((self.id, *h), Arc::new(Mutex::new(p)));
Ok(hdr)
}
fn get_tag_header(&self, h: &Merkle) -> Result<ChangeHeader, Self::Error> {
let path = self.tag_filename(h);
let mut p = libpijul::tag::OpenTagFile::open(&path, h)?;
Ok(p.header()?)
}
fn get_contents<F: Fn(ChangeId) -> Option<Hash>>(
&self,
hash: F,
key: Vertex<ChangeId>,
buf: &mut [u8],
) -> Result<usize, Self::Error> {
let key_end: u64 = key.end.0.into();
let key_start: u64 = key.start.0.into();
if key_end <= key_start || key.is_root() {
return Ok(0);
}
let mut cache = self.load(hash, key.change)?;
let p = cache.get_mut(&(self.id, key.change)).unwrap();
let mut p = p.lock().unwrap();
let n = p.read_contents(key.start.0.into(), buf)?;
Ok(n)
}
fn get_contents_ext(
&self,
key: Vertex<Option<Hash>>,
buf: &mut [u8],
) -> Result<usize, Self::Error> {
debug!("get_contents_ext {:?}", key);
if let Some(change) = key.change {
if key.end <= key.start {
return Ok(0);
}
{
if let Some(p) = self.hash_cache.lock().unwrap().get_mut(&(self.id, change)) {
let n = p.lock().unwrap().read_contents(key.start.0.into(), buf)?;
return Ok(n);
}
}
let path = self.filename(&change);
let mut p = libpijul::change::ChangeFile::open(change, &path.to_str().unwrap())?;
let n = p.read_contents(key.start.0.into(), buf)?;
debug!("taking lock");
self.hash_cache
.lock()
.unwrap()
.insert((self.id, change), Arc::new(Mutex::new(p)));
debug!("inserted");
Ok(n)
} else {
Ok(0)
}
}
fn change_deletes_position<F: Fn(ChangeId) -> Option<Hash>>(
&self,
hash: F,
change: ChangeId,
pos: Position<Option<Hash>>,
) -> Result<Vec<Hash>, Self::Error> {
let mut cache = self.load(hash, change)?;
let p = cache.get_mut(&(self.id, change)).unwrap();
let p = p.lock().unwrap();
let mut v = Vec::new();
for c in p.hashed().changes.iter() {
for c in c.iter() {
v.extend(c.deletes_pos(pos).into_iter())
}
}
Ok(v)
}
fn save_change<
E: From<Self::Error> + From<libpijul::change::ChangeError>,
F: FnOnce(&mut Change, &Hash) -> Result<(), E>,
>(
&self,
p: &mut Change,
ff: F,
) -> Result<Hash, E> {
let mut f = match tempfile::NamedTempFile::new_in(&self.changes_dir) {
Ok(f) => f,
Err(e) => return Err(E::from(Self::Error::from(e))),
};
let hash = {
let w = std::io::BufWriter::new(&mut f);
p.serialize(w, ff)?
};
let file_name = self.filename(&hash);
if let Err(e) = std::fs::create_dir_all(file_name.parent().unwrap()) {
return Err(E::from(Self::Error::from(e)));
}
debug!("file_name = {:?}", file_name);
if let Err(e) = f.persist(file_name) {
return Err(E::from(Self::Error::from(e)));
}
Ok(hash)
}
fn del_change(&self, hash: &Hash) -> Result<bool, Self::Error> {
let file_name = self.filename(hash);
debug!("file_name = {:?}", file_name);
let db = self.db.clone();
let id = self.id.clone();
tokio::spawn(async move {
if let Ok(meta) = tokio::fs::metadata(&file_name).await {
if tokio::fs::remove_file(&file_name).await.is_ok() {
tokio::fs::remove_dir(file_name.parent().unwrap())
.await
.unwrap_or(());
super::free_used_storage(&mut *db.get().await?, id, meta.len()).await?
}
}
Ok::<(), crate::Error>(())
});
Ok(true)
}
fn get_change(&self, h: &Hash) -> Result<Change, Self::Error> {
let file_name = self.filename(h);
let file_name = file_name.to_str().unwrap();
debug!("file_name = {:?}", file_name);
let m = std::fs::metadata(&file_name)?;
if m.len() >= MAX_CHANGE_SIZE {
info!("Change {:?} {:?} too large", self.id, h);
}
Ok(Change::deserialize(&file_name, Some(h))?)
}
}
const MAX_CHANGE_SIZE: u64 = 1 << 22;