use crate::Error;
#[cfg(feature = "mmap")]
use fs4::FileExt;
use parking_lot::lock_api::{RawMutex, RawRwLock};
use parking_lot::Mutex;
use sanakirja_core::{CowPage, Storable};
use log::*;
use std::borrow::Borrow;
#[cfg(feature = "mmap")]
use std::fs::OpenOptions;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
mod muttxn;
pub use muttxn::*;
mod global_header;
pub(crate) use global_header::*;
pub use sanakirja_core::PAGE_SIZE;
pub(crate) const PAGE_SIZEU64: u64 = PAGE_SIZE as u64;
const CURRENT_VERSION: u16 = 3;
#[derive(Debug)]
pub(crate) struct Map {
pub(crate) ptr: *mut u8,
#[cfg(feature = "mmap")]
mmap: memmap2::MmapMut,
#[cfg(not(feature = "mmap"))]
layout: std::alloc::Layout,
length: u64,
}
impl Map {
#[cfg(feature = "mmap")]
fn flush(&self) -> Result<(), Error> {
Ok(self.mmap.flush()?)
}
#[cfg(not(feature = "mmap"))]
fn flush(&self) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mmap")]
fn flush_range(&self, a: usize, b: usize) -> Result<(), Error> {
Ok(self.mmap.flush_range(a, b)?)
}
#[cfg(not(feature = "mmap"))]
fn flush_range(&self, _: usize, _: usize) -> Result<(), Error> {
Ok(())
}
}
pub struct Env {
#[cfg(feature = "mmap")]
file: Option<std::fs::File>,
pub(crate) mmaps: Mutex<Vec<Map>>,
mut_txn_lock: parking_lot::RawMutex,
pub(crate) roots: Vec<RootLock>,
}
unsafe impl Send for Env {}
unsafe impl Sync for Env {}
#[cfg(not(feature = "mmap"))]
impl Drop for Env {
fn drop(&mut self) {
let mut mmaps = self.mmaps.lock();
for map in mmaps.drain(..) {
unsafe { std::alloc::dealloc(map.ptr, map.layout) }
}
}
}
pub(crate) struct RootLock {
#[cfg(feature = "mmap")]
lock_file: Option<std::fs::File>,
rw: parking_lot::RawRwLock,
n_txn: AtomicUsize,
}
impl Env {
#[cfg(feature = "mmap")]
pub unsafe fn new_nolock<P: AsRef<Path>>(
path: P,
length: u64,
n_roots: usize,
) -> Result<Self, Error> {
let meta = std::fs::metadata(&path);
let length = if let Ok(ref meta) = meta {
std::cmp::max(meta.len(), length)
} else {
std::cmp::max(length, PAGE_SIZEU64)
};
let length = (length + PAGE_SIZEU64 - 1) & !(PAGE_SIZEU64 - 1);
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&path)?;
file.set_len(length)?;
let mmap = memmap2::MmapMut::map_mut(&file)?;
Self::new_nolock_(Some(file), length, mmap, meta.is_err(), n_roots)
}
#[cfg(feature = "mmap")]
unsafe fn new_nolock_(
file: Option<std::fs::File>,
length: u64,
mut mmap: memmap2::MmapMut,
initialise: bool,
n_roots: usize,
) -> Result<Self, Error> {
assert!(n_roots >= 1);
assert!(n_roots <= ((length >> 12) as usize));
assert!(n_roots < 256);
let map = mmap.as_mut_ptr();
let n_roots = if initialise {
init(map, n_roots);
n_roots
} else {
let g = &*(map as *const GlobalHeader);
if u16::from_le(g.version) != CURRENT_VERSION {
return Err(Error::VersionMismatch);
}
g.n_roots as usize
};
let env = Env {
file,
mmaps: Mutex::new(vec![Map {
ptr: map,
mmap,
length,
}]),
mut_txn_lock: RawMutex::INIT,
roots: (0..n_roots)
.map(|_| RootLock {
rw: RawRwLock::INIT,
n_txn: AtomicUsize::new(0),
lock_file: None,
})
.collect(),
};
Ok(env)
}
#[cfg(not(feature = "mmap"))]
unsafe fn new_nolock_(length: u64, initialise: bool, n_roots: usize) -> Result<Env, Error> {
assert!(n_roots >= 1);
assert!(n_roots <= ((length >> 12) as usize));
assert!(n_roots < 256);
assert!(initialise);
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = std::alloc::alloc(layout);
init(map, n_roots);
let env = Env {
mmaps: Mutex::new(vec![Map {
ptr: map,
layout,
length,
}]),
mut_txn_lock: RawMutex::INIT,
roots: (0..n_roots)
.map(|_| RootLock {
rw: RawRwLock::INIT,
n_txn: AtomicUsize::new(0),
})
.collect(),
};
Ok(env)
}
}
unsafe fn init(map: *mut u8, n_roots: usize) {
for i in 0..n_roots {
*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots: n_roots as u8,
crc: 0,
length: n_roots as u64 * PAGE_SIZE as u64,
free_db: 0,
rc_db: 0,
})
.to_le();
set_crc(map.add(i * PAGE_SIZE));
}
}
impl Env {
#[cfg(feature = "mmap")]
pub fn new<P: AsRef<Path>>(path: P, length: u64, n_roots: usize) -> Result<Env, Error> {
assert!(n_roots < 256);
let path = path.as_ref();
let mut env = unsafe { Self::new_nolock(path, length, n_roots)? };
for (n, l) in env.roots.iter_mut().enumerate() {
l.lock_file = Some(
OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.with_extension(&format!("lock{}", n)))?,
);
}
Ok(env)
}
#[cfg(feature = "mmap")]
pub fn new_anon(length: u64, n_roots: usize) -> Result<Env, Error> {
let length =
(std::cmp::max(length, PAGE_SIZEU64) + (PAGE_SIZEU64 - 1)) & !(PAGE_SIZEU64 - 1);
let mmap = memmap2::MmapMut::map_anon(length as usize)?;
unsafe { Self::new_nolock_(None, length, mmap, true, n_roots) }
}
#[cfg(not(feature = "mmap"))]
pub fn new_anon(length: u64, n_roots: usize) -> Result<Env, Error> {
let length =
(std::cmp::max(length, PAGE_SIZEU64) + (PAGE_SIZEU64 - 1)) & !(PAGE_SIZEU64 - 1);
unsafe { Self::new_nolock_(length, true, n_roots) }
}
#[cfg(not(feature = "mmap"))]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = unsafe { std::alloc::alloc(layout) };
Ok(Map {
ptr: map,
layout,
length,
})
}
#[cfg(feature = "mmap")]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
let offset = (length0 << i) - length0;
if let Some(ref file) = self.file {
file.set_len(offset + length)?;
fallocate(file, offset + length)?;
let mut mmap = unsafe {
memmap2::MmapOptions::new()
.offset(offset)
.len(length as usize)
.map_mut(file)?
};
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
length,
})
} else {
let mut mmap = memmap2::MmapMut::map_anon(length as usize)?;
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
length,
})
}
}
unsafe fn find_offset(&self, mut offset: u64) -> Result<*mut u8, Error> {
let mut i = 0;
let mut mmaps = self.mmaps.lock();
loop {
if i >= mmaps.len() {
let length0 = mmaps[0].length;
info!(
"find_offset, i = {:?}/{:?}, extending, offset = {:?}, length0 = {:?}",
i,
mmaps.len(),
offset,
length0
);
mmaps.push(self.open_mmap(i, length0)?);
}
if offset < mmaps[i].length {
return Ok(mmaps[i].ptr.add(offset as usize));
}
offset -= mmaps[i].length;
i += 1
}
}
#[cfg(not(feature = "mmap"))]
pub unsafe fn close(&mut self) {
let mut mmaps = self.mmaps.lock();
for m in mmaps.drain(..) {
std::alloc::dealloc(m.ptr, m.layout)
}
}
#[cfg(not(feature = "crc32"))]
fn check_crc(&self, _root: usize) -> Result<(), crate::CRCError> {
Ok(())
}
#[cfg(feature = "crc32")]
fn check_crc(&self, root: usize) -> Result<(), crate::CRCError> {
unsafe {
let maps = self.mmaps.lock();
check_crc(maps[0].ptr.add(root * PAGE_SIZE))
}
}
}
#[cfg(feature = "mmap")]
fn fallocate(file: &std::fs::File, length: u64) -> Result<(), Error> {
}
#[cfg(feature = "crc32")]
use lazy_static::*;
#[cfg(feature = "crc32")]
lazy_static! {
static ref HASHER: crc32fast::Hasher = crc32fast::Hasher::new();
}
#[cfg(feature = "mmap")]
#[test]
#[should_panic]
fn nroots_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; Env::new(&path, l0, 19).unwrap();
}
#[cfg(feature = "mmap")]
#[test]
fn mmap_growth_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; {
let env = Env::new(&path, l0, 2).unwrap();
let map1 = env.open_mmap(0, l0).unwrap();
println!("{:?}", map1);
let map2 = env.open_mmap(1, l0).unwrap();
println!("{:?}", map2);
map1.flush().unwrap();
map2.flush().unwrap();
}
let len = std::fs::metadata(&path).unwrap().len();
assert_eq!(len, (l0 << 2) - l0);
}
#[cfg(not(feature = "crc32"))]
fn set_crc(_ptr: *mut u8) {}
#[cfg(feature = "crc32")]
fn set_crc(ptr: *mut u8) {
unsafe {
let root_page = std::slice::from_raw_parts(ptr.add(8), PAGE_SIZE - 8);
let mut h = HASHER.clone();
h.update(root_page);
let globptr = ptr as *mut GlobalHeader;
(&mut *globptr).crc = h.finalize().to_le();
debug!("SETTING CRC {:?}", (&*globptr).crc);
}
}
pub struct Txn<E: Borrow<Env>> {
pub(crate) env: E,
pub(crate) root: usize,
pub(crate) size: u64,
}
impl<E: Borrow<Env>> Txn<E> {
pub fn env_borrow(&self) -> &Env {
self.env.borrow()
}
}
impl Env {
pub fn txn_begin<E: Borrow<Self>>(env: E) -> Result<Txn<E>, Error> {
let env_ = env.borrow();
let root = {
let cur_mut_root =
unsafe { (&*(env_.mmaps.lock()[0].ptr as *const GlobalHeader)).root as usize };
let root = (cur_mut_root + env_.roots.len() - 1) % env_.roots.len();
env_.roots[root].rw.lock_shared();
let old_n_txn = env_.roots[root].n_txn.fetch_add(1, Ordering::SeqCst);
if old_n_txn == 0 {
env_.lock_shared(root)?
}
root
};
let size = unsafe {
let next_page_ptr = env_.mmaps.lock()[0].ptr.offset((root * PAGE_SIZE) as isize);
let header = GlobalHeader::from_le(&*(next_page_ptr as *const GlobalHeader));
header.length
};
env_.check_crc(root)?;
Ok(Txn { env, root, size })
}
#[cfg(feature = "mmap")]
fn lock_shared(&self, root: usize) -> Result<(), Error> {
if let Some(ref f) = self.roots[root].lock_file {
f.lock_shared()?;
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn lock_shared(&self, _root: usize) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mmap")]
fn lock_exclusive(&self, root: usize) -> Result<(), Error> {
if let Some(ref f) = self.roots[root].lock_file {
f.lock_exclusive()?;
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn lock_exclusive(&self, _root: usize) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mmap")]
fn unlock(&self, root: usize) -> Result<(), Error> {
if let Some(ref f) = self.roots[root].lock_file {
f.unlock()?
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn unlock(&self, _root: usize) -> Result<(), Error> {
Ok(())
}
}
impl<E: Borrow<Env>> Drop for Txn<E> {
fn drop(&mut self) {
let env = self.env.borrow();
unsafe { env.roots[self.root].rw.unlock_shared() }
let old_n_txn = env.roots[self.root].n_txn.fetch_sub(1, Ordering::SeqCst);
if old_n_txn == 1 {
env.unlock(self.root).unwrap_or(())
}
}
}
impl<E: Borrow<Env>> sanakirja_core::LoadPage for Txn<E> {
type Error = Error;
unsafe fn load_page(&self, off: u64) -> Result<CowPage, Self::Error> {
if off > self.size {
return Err(Error::Corrupt(off));
}
unsafe {
let data = self.env.borrow().find_offset(off)?;
Ok(CowPage { data, offset: off })
}
}
fn rc(&self, _: u64) -> Result<u64, Self::Error> {
Ok(0)
}
}
pub trait RootPage {
unsafe fn root_page(&self) -> &[u8; 4064];
}
impl<E: Borrow<Env>> RootPage for Txn<E> {
unsafe fn root_page(&self) -> &[u8; 4064] {
let env = self.env.borrow();
let maps = env.mmaps.lock();
let ptr = maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE);
&*(ptr as *const [u8; 4064])
}
}
pub trait RootDb {
fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(
&self,
n: usize,
) -> Option<sanakirja_core::btree::Db_<K, V, P>>;
}
impl<E: Borrow<Env>> RootDb for Txn<E> {
fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(
&self,
n: usize,
) -> Option<sanakirja_core::btree::Db_<K, V, P>> {
unsafe {
let env = self.env.borrow();
let db = {
let maps = env.mmaps.lock();
*(maps[0]
.ptr
.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE + 8 * n)
as *mut u64)
};
if db != 0 {
Some(sanakirja_core::btree::Db_::from_page(u64::from_le(db)))
} else {
None
}
}
}
}
impl<E: Borrow<Env>> Txn<E> {
pub fn root(&self, n: usize) -> u64 {
assert!(n <= (4096 - GLOBAL_HEADER_SIZE) / 8);
unsafe {
let env = self.env.borrow();
let maps = env.mmaps.lock();
u64::from_le(
*(maps[0]
.ptr
.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE + 8 * n)
as *mut u64),
)
}
}
}
#[cfg(feature = "crc32")]
unsafe fn check_crc(p: *const u8) -> Result<(), crate::CRCError> {
let globptr = p as *mut GlobalHeader;
let crc = u32::from_le((&*globptr).crc);
let mut h = crc32fast::Hasher::new();
let data = std::slice::from_raw_parts(p.offset(8), PAGE_SIZE - 8);
h.update(data);
let crc_ = h.finalize();
debug!("CHECKING CRC {:?} {:?}", crc_, crc);
if crc_ == crc {
Ok(())
} else {
Err(crate::CRCError {})
}
}
if let Err(err) = file.allocate(length) {
let code = err.raw_os_error().unwrap();
if code == libc::EINVAL {
Ok(())
} else {
Err(err.into())
}
} else {
Ok(())
}