use super::*;
use crate::L64;
use sanakirja_core::{btree, CowPage, MutPage};
use std::borrow::Borrow;
impl<E: Borrow<Env>, T> std::fmt::Debug for MutTxn<E, T> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "MutTxn {{ }}")
}
}
pub struct MutTxn<E: Borrow<Env>, T> {
pub(crate) env: E,
pub(crate) root: usize,
parent: Option<T>,
pub(crate) length: u64,
pub(crate) free: u64,
pub(crate) rc: Option<btree::Db<L64, ()>>,
pub(crate) occupied_owned_pages: Vec<(MutPage, bool)>,
pub(crate) free_owned_pages: Vec<u64>,
pub(crate) free_pages: Vec<u64>,
initial_free: Vec<u64>,
initial_allocated: Vec<u64>,
roots: Vec<u64>,
}
impl<E: Borrow<Env>, T> MutTxn<E, T> {
pub fn env_borrow(&self) -> &Env {
self.env.borrow()
}
}
impl<E: Borrow<Env>, T> Drop for MutTxn<E, T> {
fn drop(&mut self) {
if self.parent.is_none() {
let env = self.env.borrow();
unsafe {
env.mut_txn_unlock().unwrap_or(());
env.roots[self.root].rw.unlock_exclusive();
env.unlock(self.root).unwrap_or(())
}
}
}
}
pub trait Commit {
fn commit(self) -> Result<(), Error>;
}
impl<'a, E: Borrow<Env>, T> Commit for MutTxn<E, &'a mut MutTxn<E, T>> {
fn commit(mut self) -> Result<(), Error> {
let parent = self.parent.as_mut().unwrap();
parent.length = self.length;
parent.free = self.free;
parent.rc = self.rc.take();
parent
.occupied_owned_pages
.extend(self.occupied_owned_pages.drain(..));
parent.free_owned_pages.extend(self.free_owned_pages.iter());
parent.free_pages.extend(self.free_pages.iter());
parent.initial_free = std::mem::replace(&mut self.initial_free, Vec::new());
parent.initial_allocated = std::mem::replace(&mut self.initial_allocated, Vec::new());
for (u, v) in self.roots.iter().enumerate() {
if *v != 0 {
parent.roots[u] = *v
}
}
for (n, &r) in self.roots.iter().enumerate() {
if r > 0 {
if parent.roots.get(n).is_none() {
parent.roots.resize(n + 1, 0u64)
}
parent.roots[n] = r
}
}
Ok(())
}
}
impl Env {
#[cfg(feature = "mmap")]
fn mut_txn_lock(&self) -> Result<(), Error> {
self.mut_txn_lock.lock();
if let Some(ref f) = self.file {
f.lock_exclusive()?;
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn mut_txn_lock(&self) -> Result<(), Error> {
self.mut_txn_lock.lock();
Ok(())
}
#[cfg(feature = "mmap")]
fn mut_txn_unlock(&self) -> Result<(), Error> {
unsafe {
self.mut_txn_lock.unlock();
}
if let Some(ref f) = self.file {
f.unlock()?
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn mut_txn_unlock(&self) -> Result<(), Error> {
unsafe {
self.mut_txn_lock.unlock();
}
Ok(())
}
pub fn mut_txn_begin<E: Borrow<Self>>(env: E) -> Result<MutTxn<E, ()>, Error> {
unsafe {
let env_ = env.borrow();
env_.mut_txn_lock()?;
let maps = env_.mmaps.lock()[0].ptr;
let root = (&*(maps as *const GlobalHeader)).root as usize;
debug!("BEGIN_TXN root = {:?}", root);
env_.roots[root].rw.lock_exclusive();
env_.lock_exclusive(root)?;
let v0 = (root + env_.roots.len() - 1) % env_.roots.len();
env_.check_crc(v0)?;
let page_ptr = maps.offset((v0 * PAGE_SIZE) as isize);
let next_page_ptr = maps.offset((root * PAGE_SIZE) as isize);
if page_ptr != next_page_ptr {
std::ptr::copy_nonoverlapping(page_ptr.add(8), next_page_ptr.add(8), PAGE_SIZE - 8);
}
let header = GlobalHeader::from_le(&*(next_page_ptr as *const GlobalHeader));
debug!("n_roots = {:?}", header.n_roots);
debug!("initial free_page {:x}", header.free_db);
let mut txn = MutTxn {
env,
root,
parent: None,
rc: if header.rc_db == 0 {
None
} else {
Some(btree::Db::from_page(header.rc_db))
},
length: if header.length == 0 {
(PAGE_SIZE as u64) * (header.n_roots as u64)
} else {
header.length
},
free: header.free_db,
occupied_owned_pages: Vec::with_capacity(100),
free_owned_pages: Vec::new(),
free_pages: Vec::new(),
initial_free: Vec::new(),
initial_allocated: Vec::new(),
roots: Vec::new(),
};
if txn.free > 0 {
let free_db: btree::Db<L64, ()> = btree::Db::from_page(txn.free);
let mut init = Vec::new();
for p in btree::rev_iter(&txn, &free_db, None)? {
let (p, _) = p?;
init.push(p.as_u64());
}
txn.initial_free = init;
}
Ok(txn)
}
}
}
#[cfg(feature = "crc32")]
unsafe fn clear_dirty(p: &mut MutPage) {
p.clear_dirty(&HASHER)
}
#[cfg(not(feature = "crc32"))]
unsafe fn clear_dirty(p: &mut MutPage) {
p.clear_dirty()
}
impl<E: Borrow<Env>> Commit for MutTxn<E, ()> {
fn commit(mut self) -> Result<(), Error> {
debug!("COMMIT");
let free_db =
if self.free == 0 && self.free_owned_pages.is_empty() && self.free_pages.is_empty() {
assert!(self.initial_free.is_empty());
assert!(self.initial_allocated.is_empty());
None
} else {
let mut free_db: btree::Db<L64, ()> = if self.free == 0 {
unsafe { btree::create_db(&mut self)? }
} else {
unsafe { btree::Db::from_page(self.free) }
};
debug!("free_db = {:x}", free_db.db);
if cfg!(debug_assertions) {
for p in self.initial_free.iter() {
debug!("initial_free {:x}", p);
}
for p in self.initial_allocated.iter() {
debug!("initial_alloc {:x}", p);
}
}
let mut changed = true;
while changed {
changed = false;
while let Some(p) = self.initial_allocated.pop() {
btree::del(&mut self, &mut free_db, &L64(p.to_le()), None)?;
changed = true;
}
while !self.free_pages.is_empty() || !self.free_owned_pages.is_empty() {
while let Some(p) = self.free_pages.pop() {
let p = p & !0xfff;
btree::put(&mut self, &mut free_db, &L64(p.to_le()), &())?;
changed = true;
}
while let Some(p) = self.free_owned_pages.pop() {
let p = p & !0xfff;
btree::put(&mut self, &mut free_db, &L64(p.to_le()), &())?;
changed = true;
}
}
}
Some(free_db)
};
let mut occ = std::mem::replace(&mut self.occupied_owned_pages, Vec::new());
for (p, uses_dirty) in occ.iter_mut() {
if *uses_dirty {
if let Some(ref free_db) = free_db {
if let Some((pp, ())) =
btree::get(&self, free_db, &L64(p.0.offset.to_le()), None)?
{
if u64::from_le(pp.0) == p.0.offset {
continue;
}
}
}
unsafe {
trace!(
"commit page {:x}: {:?}",
p.0.offset,
std::slice::from_raw_parts(p.0.data, 32)
);
}
unsafe {
clear_dirty(p);
}
}
}
let env = self.env.borrow();
let mut maps = env.mmaps.lock();
for m in maps.iter_mut() {
m.flush()?
}
let globptr =
unsafe { &mut *(maps[0].ptr.add(self.root * PAGE_SIZE) as *mut GlobalHeader) };
globptr.length = self.length.to_le();
if let Some(free_db) = free_db {
debug!("COMMIT: free_db = 0x{:x}", free_db.db);
let free: u64 = free_db.db.into();
globptr.free_db = free.to_le();
}
if let Some(ref rc_db) = self.rc {
debug!("COMMIT: rc_db = 0x{:x}", rc_db.db);
let rc: u64 = rc_db.db.into();
globptr.rc_db = rc.to_le();
}
let root_dbs = unsafe {
std::slice::from_raw_parts_mut(
maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE) as *mut u64,
N_ROOTS,
)
};
for (&r, rr) in self.roots.iter().zip(root_dbs.iter_mut()) {
if r > 0 {
*rr = r.to_le()
}
}
unsafe {
set_crc(maps[0].ptr.add(self.root * PAGE_SIZE));
}
unsafe {
(&mut *(maps[0].ptr as *mut GlobalHeader)).root =
(self.root as u8 + 1) % (env.roots.len() as u8);
}
maps[0].flush_range(0, env.roots.len() * PAGE_SIZE)?;
debug!("commit: unlock {:?}", self.root);
unsafe { env.roots[self.root].rw.unlock_exclusive() };
env.unlock(self.root)?;
env.mut_txn_unlock()?;
debug!("/COMMIT");
Ok(())
}
}
impl<E: Borrow<Env>, T> MutTxn<E, T> {
pub fn set_root(&mut self, num: usize, value: u64) {
if self.roots.get(num).is_none() {
self.roots.resize(num + 1, 0u64);
}
self.roots[num] = value;
}
pub fn remove_root(&mut self, num: usize) {
if self.roots.get(num).is_none() {
self.roots.resize(num + 1, 0u64);
}
self.roots[num] = 0;
}
fn free_owned_page(&mut self, offset: u64) {
debug!("FREEING OWNED PAGE {:?} {:x}", offset, offset);
assert_ne!(offset, 0);
self.free_owned_pages.push(offset);
}
fn free_page(&mut self, offset: u64) {
debug!("FREEING PAGE {:?} {:x}", offset, offset);
assert_ne!(offset, 0);
self.free_pages.push(offset)
}
fn free_pages_pop(&mut self) -> Result<Option<u64>, Error> {
while let Some(p) = self.initial_free.pop() {
if self.free_for_all(p)? {
self.initial_allocated.push(p);
return Ok(Some(p));
}
}
Ok(None)
}
fn free_for_all(&self, f: u64) -> Result<bool, Error> {
let env = self.env.borrow();
for i in 1..env.roots.len() - 1 {
debug!("free_for_all {:?}", i);
let db: btree::Db<L64, ()> = unsafe {
let p = &*(env.mmaps.lock()[0]
.ptr
.add(((self.root + i) % env.roots.len()) * PAGE_SIZE)
as *const GlobalHeader);
if f >= u64::from_le(p.length) {
continue;
}
if p.free_db == 0 {
return Ok(false);
}
btree::Db::from_page(u64::from_le(p.free_db))
};
if let Some((&f_, ())) = btree::get(self, &db, &L64(f.to_le()), None)? {
if f_.as_u64() != f {
return Ok(false);
}
}
}
Ok(true)
}
}
impl<E: Borrow<Env>, T> MutTxn<E, T> {
unsafe fn alloc_page_(&mut self, dirty: bool) -> Result<MutPage, Error> {
if let Some(offset) = self.free_owned_pages.pop() {
assert_ne!(offset, 0);
debug!("free owned pop 0x{:x}", offset);
let data = unsafe { self.env.borrow().find_offset(offset)? };
let page = MutPage(CowPage { data, offset });
self.occupied_owned_pages
.push((MutPage(CowPage { data, offset }), dirty));
Ok(page)
} else {
if let Some(offset) = self.free_pages_pop()? {
assert_ne!(offset, 0);
debug!("free pages pop 0x{:x}", offset);
let data = unsafe { self.env.borrow().find_offset(offset)? };
self.occupied_owned_pages
.push((MutPage(CowPage { data, offset }), dirty));
Ok(MutPage(CowPage { data, offset }))
} else {
debug!("allocate in the free space 0x{:x}", self.length);
let offset = self.length;
self.length += PAGE_SIZE as u64;
let data = unsafe { self.env.borrow().find_offset(offset)? };
self.occupied_owned_pages
.push((MutPage(CowPage { data, offset }), dirty));
Ok(MutPage(CowPage { data, offset }))
}
}
}
}
impl<E: Borrow<Env>, T> sanakirja_core::AllocPage for MutTxn<E, T> {
unsafe fn alloc_page(&mut self) -> Result<MutPage, Error> {
self.alloc_page_(true)
}
unsafe fn alloc_page_no_dirty(&mut self) -> Result<MutPage, Error> {
self.alloc_page_(false)
}
unsafe fn alloc_contiguous(&mut self, length: u64) -> Result<MutPage, Error> {
assert_eq!(length & (PAGE_SIZE as u64 - 1), 0);
self.free_owned_pages.sort_by(|a, b| b.cmp(a));
self.initial_free.sort_by(|a, b| b.cmp(a));
let mut i = self.free_owned_pages.len();
let mut ni = 0;
let mut j = self.initial_free.len();
let mut nj = 0;
let mut result = 0u64;
let mut current = 0u64;
let mut current_p = std::ptr::null_mut();
while current + PAGE_SIZE as u64 - result < length {
let (m, ic, jc) = if i > 0 && j > 0 {
let a = self.free_owned_pages[i - 1];
let b = self.initial_free[j - 1];
if a < b || !self.free_for_all(b)? {
i -= 1;
(a, 1, 0)
} else {
j -= 1;
(b, 0, 1)
}
} else if i > 0 {
i -= 1;
(self.free_owned_pages[i], 1, 0)
} else if j > 0 {
j -= 1;
let p = self.initial_free[j];
if !self.free_for_all(p)? {
ni = 0;
nj = 0;
current = result;
current_p = unsafe { self.env.borrow().find_offset(current)? };
continue;
}
(p, 0, 1)
} else if current == result {
let offset = self.length;
let data = unsafe { self.env.borrow().find_offset(offset)? };
self.length += length;
return Ok(MutPage(CowPage { offset, data }));
} else if current + PAGE_SIZE as u64 == self.length {
self.length += length - (current + PAGE_SIZE as u64 - result);
break;
} else {
unreachable!()
};
if current > 0 && m == current + PAGE_SIZE as u64 {
let next_p = unsafe { self.env.borrow().find_offset(m)? };
if next_p as usize == current_p as usize + PAGE_SIZE {
ni += ic;
nj += jc;
} else {
result = m;
ni = ic;
nj = jc;
}
current = m;
current_p = next_p
} else {
result = m;
current = m;
current_p = unsafe { self.env.borrow().find_offset(m)? };
ni = ic;
nj = jc;
}
}
for offset in self
.free_owned_pages
.drain(i..i + ni)
.chain(self.initial_free.drain(j..j + nj))
{
let data = unsafe { self.env.borrow().find_offset(offset)? };
self.occupied_owned_pages
.push((MutPage(CowPage { data, offset }), false))
}
let data = unsafe { self.env.borrow().find_offset(result)? };
Ok(MutPage(CowPage {
data,
offset: result,
}))
}
fn incr_rc(&mut self, off: u64) -> Result<usize, Error> {
assert!(off > 0);
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(self, &rc_)?;
curs.set(self, &L64(off.to_le()), None)?;
let rc = if let Some((rc, _)) = curs.current(self)? {
let rc = rc.as_u64();
if rc & !0xfff == off {
rc & 0xfff
} else {
1
}
} else {
1
};
if rc > 1 {
btree::del::del_at_cursor(self, &mut rc_, &mut curs, true)?;
}
debug!("incr rc 0x{:x} {:?}", off, rc + 1);
assert!(rc + 1 <= 0xfff);
btree::put(self, &mut rc_, &L64((off | (rc + 1)).to_le()), &())?;
self.rc = Some(rc_);
Ok(rc as usize + 1)
} else {
let mut rc = unsafe { btree::create_db(self)? };
btree::put(self, &mut rc, &L64((off | 2).to_le()), &())?;
self.rc = Some(rc);
Ok(2)
}
}
unsafe fn decr_rc(&mut self, off: u64) -> Result<usize, Error> {
let rc = self.decr_rc_(off)?;
if rc == 0 {
self.free_page(off);
}
Ok(rc)
}
unsafe fn decr_rc_owned(&mut self, off: u64) -> Result<usize, Error> {
let rc = self.decr_rc_(off)?;
if rc == 0 {
self.free_owned_page(off);
}
Ok(rc)
}
}
impl<E: Borrow<Env>, A> MutTxn<E, A> {
fn decr_rc_(&mut self, off: u64) -> Result<usize, Error> {
debug!("decr_rc 0x{:x} {:?}", off, self.rc);
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(self, &rc_)?;
curs.set(self, &L64(off.to_le()), None)?;
let rc = if let Some((rc, ())) = curs.next(self)? {
let rc = rc.as_u64();
if rc & !0xfff == off {
rc
} else {
1
}
} else {
1
};
debug!("decr_rc, rc = 0x{:x}", rc);
if rc > 1 {
btree::del(self, &mut rc_, &L64(rc.to_le()), None)?;
if rc & 0xfff > 2 {
btree::put(self, &mut rc_, &L64((rc - 1).to_le()), &())?;
self.rc = Some(rc_);
} else {
self.rc = Some(rc_)
}
return Ok((rc & 0xfff) as usize - 1);
} else {
self.rc = Some(rc_)
}
}
Ok(0)
}
pub unsafe fn root_page_mut(&mut self) -> &mut [u8; 4064] {
let env = self.env.borrow();
let maps = env.mmaps.lock();
let ptr = maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE);
&mut *(ptr as *mut [u8; 4064])
}
pub unsafe fn root_page(&self) -> &[u8; 4064] {
let env = self.env.borrow();
let maps = env.mmaps.lock();
let ptr = maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE);
&*(ptr as *const [u8; 4064])
}
}
impl<E: Borrow<Env>, A> sanakirja_core::LoadPage for MutTxn<E, A> {
type Error = Error;
unsafe fn load_page(&self, off: u64) -> Result<CowPage, Self::Error> {
let data = self.env.borrow().find_offset(off)?;
Ok(CowPage { data, offset: off })
}
fn rc(&self, page: u64) -> Result<u64, Self::Error> {
if let Some(ref rc) = self.rc {
if let Some((rc, _)) = btree::get(self, rc, &L64(page.to_le()), None)? {
let rc = rc.as_u64();
if rc & !0xfff == page {
let r = rc & 0xfff;
if r >= 2 {
return Ok(r);
}
}
}
}
Ok(0)
}
}
impl<E: Borrow<Env>, T> RootPage for MutTxn<E, T> {
unsafe fn root_page(&self) -> &[u8; 4064] {
let env = self.env.borrow();
let maps = env.mmaps.lock();
let ptr = maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE);
&*(ptr as *const [u8; 4064])
}
}
impl<E: Borrow<Env>, T> MutTxn<E, T> {
pub fn root(&self, n: usize) -> Option<u64> {
if let Some(db) = self.roots.get(n) {
if *db == 0 {
None
} else {
Some(*db)
}
} else {
unsafe {
let env = self.env.borrow();
let db = {
let maps = env.mmaps.lock();
u64::from_le(
*(maps[0]
.ptr
.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE + 8 * n)
as *mut u64),
)
};
if db != 0 {
Some(db)
} else {
None
}
}
}
}
}
impl<E: Borrow<Env>, T> RootDb for MutTxn<E, T> {
fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: btree::BTreePage<K, V>>(
&self,
n: usize,
) -> Option<btree::Db_<K, V, P>> {
if let Some(db) = self.root(n) {
Some(unsafe { btree::Db_::from_page(db) })
} else {
None
}
}
}