OP6SVMOD2GTQ7VNJ4E5KYFG4MIYA7HBMXJTADALMZH4PY7OQRMZQC
fn main() {
println!("Hello, world!");
}
#![no_std]
pub mod btree;
pub trait Representable<T> {
/// An iterator over the offsets to pages contained in this
/// value. Only values from this crate can generate non-empty
/// iterators, but combined values (like tuples) must chain the
/// iterators returned by method `page_offsets`.
type PageOffsets: Iterator<Item = u64>;
type Ord: Ord;
fn ord(&self, txn: &T) -> &Self::Ord;
/// If this value is an offset to another page at offset `offset`,
/// return `Some(offset)`. Return `None` else.
fn page_offsets(&self) -> Self::PageOffsets;
const ALIGN: usize;
const SIZE: Option<usize>;
fn size(&self) -> usize;
}
macro_rules! direct_repr {
($t: ty) => {
impl<T> Representable<T> for $t {
type PageOffsets = core::iter::Empty<u64>;
fn page_offsets(&self) -> Self::PageOffsets {
core::iter::empty()
}
const ALIGN: usize = core::mem::align_of::<Self>();
const SIZE: Option<usize> = Some(core::mem::size_of::<Self>());
type Ord = Self;
fn ord(&self, _: &T) -> &Self::Ord {
self
}
fn size(&self) -> usize {
core::mem::size_of::<Self>()
}
}
};
}
direct_repr!(());
direct_repr!(u8);
direct_repr!(i8);
direct_repr!(u16);
direct_repr!(i16);
direct_repr!(u64);
unsafe fn read<T, K: Representable<T>, V: Representable<T>>(p: *mut u8) -> (*mut K, *mut V) {
let k = p as *mut K;
let s = K::size(&*k);
let v = k.add(s);
let al = v.align_offset(V::ALIGN);
let v = v.add(al);
(k, v as *mut V)
}
fn alloc_size<T, K: Representable<T>, V: Representable<T>>(k: &K, v: &V) -> usize {
let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
let al = K::ALIGN.max(V::ALIGN);
(s + al - 1) & !(al - 1)
}
unsafe fn entry_size<T, K: Representable<T>, V: Representable<T>>(k: *mut u8) -> usize {
let ks = (&*(k as *const K)).size();
// next multiple of va, assuming va is a power of 2.
let va = V::ALIGN;
let v_off = (ks + va - 1) & !(va - 1);
let v_ptr = k.add(v_off);
let vs = (&*(v_ptr as *const V)).size();
let ka = K::ALIGN.max(V::ALIGN);
let size = v_off + vs;
(size + ka - 1) & !(ka - 1)
}
#[derive(Debug, Clone, Copy)]
pub struct Page {
pub data: *mut u8,
pub offset: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct MutPage(pub Page);
impl MutPage {
pub fn clear_dirty(&mut self) {
unsafe {
let d = *self.0.data.offset(8);
*self.0.data.offset(8) = d & (!1)
}
}
}
/// Trait for loading a page and a root. Base trait to implement
/// "storage engines" under Sanakirja.
pub trait LoadPage {
type Error;
/// Loading a page.
fn load_page(&self, off: u64) -> Result<Page, Self::Error>;
/// RC
fn rc(&self, off: u64) -> Result<u64, Self::Error>;
}
/// Trait for loading a page and a root. Base trait to implement
/// "storage engines" under Sanakirja.
pub trait AllocPage: LoadPage {
fn alloc_page(&mut self) -> Result<MutPage, Self::Error>;
fn incr_rc(&mut self, off: u64) -> Result<(), Self::Error>;
fn decr_rc(&mut self, off: u64) -> Result<(), Self::Error>;
fn decr_rc_owned(&mut self, off: u64) -> Result<(), Self::Error>;
}
// Copyright 2015 Pierre-Étienne Meunier and Florent Becker. See the
// COPYRIGHT file at the top-level directory of this distribution and
// at http://pijul.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use super::cursor::*;
use super::*;
use log::*;
/// Insert a binding to a database, returning `false` if and only
/// if the exact same binding (key *and* value) was already in the database.
pub fn put<T: AllocPage, K: Representable<T>, V: Representable<T>, P: BTreeMutPage<T, K, V>>(
txn: &mut T,
db: &mut Db<T, K, V, P>,
key: &K,
value: &V,
) -> Result<bool, T::Error> {
let mut cursor = Cursor::new(db);
if cursor.set(txn, Some((key, Some(value))))?.is_some() {
return Ok(false);
}
let ref cur = cursor.current();
let put = P::put(
txn,
cur.page,
cursor.pointer <= cursor.first_rc_level,
cur.cursor.as_ref().unwrap(),
key,
value,
0,
0,
)?;
let mut free = [0; N_CURSORS];
db.db = put_cascade(txn, &mut cursor, put, &mut free)?.0;
for f in free.iter() {
if *f & 1 != 0 {
txn.decr_rc_owned((*f) ^ 1)?;
} else if *f > 0 {
txn.decr_rc(*f)?;
}
}
Ok(true)
}
fn put_cascade<T: AllocPage, K: Representable<T>, V: Representable<T>, P: BTreeMutPage<T, K, V>>(
txn: &mut T,
cursor: &mut Cursor<T, K, V, P>,
mut put: Put<K, V>,
free: &mut [u64; N_CURSORS],
) -> Result<MutPage, T::Error> {
loop {
match put {
Put::Split {
split_key,
split_value,
left,
right,
freed,
} => {
cursor.pointer -= 1;
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
if cursor.pointer == 0 {
debug!("Split root: {:?} {:?}", left.0.offset, right.0.offset);
// Splitting the root.
let p = txn.alloc_page()?;
P::init(p);
P::put(
txn,
p.0,
true,
&P::first_cursor(&p.0),
split_key,
split_value,
left.0.offset,
right.0.offset,
)?;
return Ok(p);
} else {
debug!("put cascade");
let cur = cursor.current();
put = P::put(
txn,
cur.page,
cursor.pointer < cursor.first_rc_level,
cur.cursor.as_ref().unwrap(),
split_key,
split_value,
left.0.offset,
right.0.offset,
)?
}
}
Put::Ok(Ok { page, freed }) if cursor.pointer > 1 => {
debug!("put update");
cursor.pointer -= 1;
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
// update
let ref curs = cursor.current();
put = Put::Ok(P::update_left_child(
txn,
curs.page,
cursor.pointer < cursor.first_rc_level,
curs.cursor.as_ref().unwrap(),
page.0.offset,
)?)
}
Put::Ok(Ok { page, freed }) => {
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
return Ok(page);
}
}
}
}
fn incr_descendants<
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreePage<T, K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<T, K, V, P>,
free: &mut [u64; N_CURSORS],
freed: u64,
) -> Result<(), T::Error> {
if cursor.pointer < cursor.first_rc_level {
// This also includes the case where cursor.pointer == 0.
free[cursor.pointer] = freed;
} else {
if cursor.pointer == cursor.first_rc_level {
debug_assert_ne!(freed, 0);
if freed & 1 != 0 {
txn.decr_rc_owned(freed ^ 1)?;
} else {
txn.decr_rc(freed ^ 1)?;
}
}
let cur = cursor.current();
let mut c = P::first_cursor(&cur.page);
let left = P::left_child(&cur.page, &c);
if left != (freed & !1) {
txn.incr_rc(left)?;
}
while let Some((k, v, r)) = P::next(&cur.page, &mut c) {
for o in k.page_offsets().chain(v.page_offsets()) {
txn.incr_rc(o)?;
}
if r != freed {
txn.incr_rc(r)?;
}
}
}
Ok(())
}
use super::*;
use log::*;
const PAGE_SIZE: usize = 4096;
#[derive(Debug)]
pub struct Page<K, V> {
kv: core::marker::PhantomData<(K, V)>,
}
#[derive(PartialEq, Eq, PartialOrd, Ord)]
#[repr(C)]
struct Tuple<K, V> {
k: K,
v: V,
}
#[repr(C)]
struct Header {
n: u16,
data: u16,
crc: u32,
left_page: u64,
}
impl Header {
fn init(&mut self) {
self.n = 1; // dirty page
self.data = 4096_u16.to_le();
self.crc = 0;
self.left_page = 0;
}
fn n(&self) -> u16 {
u16::from_le(self.n) >> 4
}
fn set_n(&mut self, n: u16) {
let dirty = u16::from_le(self.n) & 1;
self.n = ((n << 4) | dirty).to_le()
}
fn is_dirty(&self) -> bool {
u16::from_le(self.n) & 1 != 0
}
fn left_page(&self) -> u64 {
u64::from_le(self.left_page)
}
fn decr(&mut self, s: usize) {
self.left_page = (self.left_page() - s as u64).to_le();
}
fn is_leaf(&self) -> bool {
u64::from_le(self.left_page) <= 0xfff
}
}
const HDR: usize = core::mem::size_of::<Header>();
impl<T: AllocPage, K: Representable<T>, V: Representable<T>> super::BTreeMutPage<T, K, V>
for Page<K, V>
{
fn init(page: MutPage) {
unsafe { (&mut *(page.0.data as *mut Header)).init() }
}
fn clean(page: MutPage) {
unsafe {
let hdr = &mut *header_mut(page);
hdr.n = (u16::from_le(hdr.n) & 0xfff).to_le()
}
}
fn size(m: &ModifiedPage<T, K, V, Self>) -> usize {
let mut occupied = unsafe {
let hdr = &*header(&m.page);
(hdr.left_page() & 0xfff) as usize
};
if fixed_size::<T, K, V>().is_some() && m.c1.is_leaf {
let al = K::ALIGN.max(V::ALIGN);
occupied += (HDR + al - 1) & (!al - 1);
} else {
occupied += HDR
};
occupied -= Self::current_size(&m.page, &m.c1) as usize;
if let Some((k, v, _)) = m.ins {
occupied += unsafe { crate::alloc_size(&*k, &*v) as usize };
if m.c1.is_leaf {
if fixed_size::<T, K, V>().is_none() {
occupied += 2
}
} else {
occupied += 8
}
}
occupied
}
fn put<'a>(
txn: &mut T,
page: crate::Page,
mutable: bool,
c: &Cursor,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<Put<'a, K, V>, T::Error> {
unsafe {
if r == 0 {
debug!("leaf put");
put::<_, _, _, Leaf>(txn, page, mutable, c.cur, k0, v0, 0, 0)
} else {
debug!("internal put");
put::<_, _, _, Internal>(txn, page, mutable, c.cur, k0, v0, l, r)
}
}
}
fn update_left_child(
txn: &mut T,
page: crate::Page,
mutable: bool,
c: &Self::Cursor,
r: u64,
) -> Result<Ok, T::Error> {
assert!(!c.is_leaf);
let freed;
let page = if mutable && Self::is_dirty(page) {
freed = 0;
MutPage(page)
} else {
unsafe {
let new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(new);
let s = Internal::offset_slice::<T, K, V>(page);
let hdr = &mut *header_mut(new);
let mut n = 0;
clone::<T, K, V, Internal>(hdr, page, new, s, &mut n);
let b = if Self::is_dirty(page) { 1 } else { 0 };
freed = page.offset | b;
new
}
};
assert!(c.cur - 1 < c.total);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (r | (u64::from_le(*off) & 0xfff)).to_le();
}
Ok(Ok { page, freed })
}
fn del(txn: &mut T, page: crate::Page, c: &Cursor, l: u64) -> Result<MutPage, T::Error> {
assert!(c.cur < c.total);
if Self::is_dirty(page) {
let page = MutPage(page);
unsafe {
let hdr = header_mut(page);
if c.is_leaf {
let n = (&*hdr).n() as usize;
if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let hdr_size = (HDR + al - 1) & !(al - 1);
let off = c.cur * f;
let kv_ptr = page.0.data.add(hdr_size + off);
core::ptr::copy(kv_ptr.add(f), kv_ptr, f * (n - c.cur - 1));
(&mut *hdr).decr(f);
} else {
let ptr = page.0.data.add(HDR + c.cur * 2) as *mut u16;
let kv_ptr = page.0.data.add((*ptr) as usize);
let size = entry_size::<T, K, V>(kv_ptr);
core::ptr::copy(ptr.offset(1), ptr, n - c.cur);
(&mut *hdr).decr(size);
}
} else {
let ptr = page.0.data.add(HDR + c.cur * 8) as *mut u64;
let off = (u64::from_le(*ptr) & 0xfff) as usize;
let kv_ptr = page.0.data.add(off);
let size = entry_size::<T, K, V>(kv_ptr);
core::ptr::copy(ptr.offset(1), ptr, (&*hdr).n() as usize - c.cur);
(&mut *hdr).decr(size);
};
if l > 0 {
assert!(!c.is_leaf);
// Updating the left page if necessary.
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
(&mut *hdr).set_n(u16::from_le((&*hdr).n) - 1);
Ok(page)
}
} else {
unsafe {
let new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(new);
if c.is_leaf {
let s = Leaf::offset_slice::<T, K, V>(page);
let (s0, s1) = s.split_at(c.cur);
let hdr = &mut *header_mut(new);
let mut n = 0;
clone::<T, K, V, Leaf>(hdr, page, new, s0, &mut n);
clone::<T, K, V, Leaf>(hdr, page, new, s1, &mut n);
} else {
let s = Internal::offset_slice::<T, K, V>(page);
let (s0, s1) = s.split_at(c.cur);
let hdr = &mut *header_mut(new);
let mut n = 0;
clone::<T, K, V, Internal>(hdr, page, new, s0, &mut n);
let off = (page.data.add(HDR) as *mut u64).offset(n - 1);
*off = l.to_le();
clone::<T, K, V, Internal>(hdr, page, new, s1, &mut n);
}
Ok(new)
}
}
}
fn merge_or_rebalance<'a>(
txn: &mut T,
m: &mut Concat<T, K, V, Self>,
) -> Result<Op<'a, T, K, V>, T::Error> {
let mut hdr_size = HDR;
let mid_size = if m.modified.c0.is_leaf {
if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
hdr_size = (HDR + al - 1) & !(al - 1);
f
} else {
2 + unsafe { alloc_size::<T, K, V>(&*m.mid.0, &*m.mid.1) }
}
} else {
8 + unsafe { alloc_size::<T, K, V>(&*m.mid.0, &*m.mid.1) }
};
let left_size = Self::size(&m.modified);
let occupied = unsafe {
let hdr = &*header(&m.other);
(hdr.left_page() & 0xfff) as usize
};
let size = left_size + mid_size + occupied - hdr_size;
if size <= PAGE_SIZE {
// merge
let new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(new);
unsafe {
if m.modified.c0.is_leaf {
merge::<_, _, _, Leaf>(new, m)
} else {
merge::<_, _, _, Internal>(new, m)
}
}
let b0 = if Self::is_dirty(m.modified.page) {
1
} else {
0
};
let b1 = if Self::is_dirty(m.other) { 1 } else { 0 };
return Ok(Op::Merged {
page: new,
freed: [m.modified.page.offset | b0, m.other.offset | b1],
marker: core::marker::PhantomData,
});
}
let rc = <Page<K, V>>::first_cursor(&m.other);
let first_size = <Page<K, V>>::current_size(&m.other, &rc);
// If we can't rebalance, return.
if left_size >= PAGE_SIZE / 2 || occupied - first_size < PAGE_SIZE / 2 {
unsafe {
return Ok(Op::Put(if let Some((k, v, r)) = m.modified.ins {
Self::replace(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
&*k,
&*v,
m.modified.l,
r,
)?
} else {
Put::Ok(Ok {
page: Self::del(txn, m.modified.page, &m.modified.c1, m.modified.l)?,
freed: 0,
})
}));
}
}
unsafe {
if m.modified.c0.is_leaf {
rebalance::<_, _, _, Leaf>(txn, m)
} else {
rebalance::<_, _, _, Internal>(txn, m)
}
}
}
}
impl<T: LoadPage, K: Representable<T>, V: Representable<T>> super::BTreePage<T, K, V>
for Page<K, V>
{
fn is_dirty(page: crate::Page) -> bool {
unsafe {
let hdr = &*header(&page);
hdr.n & 0x8000 != 0
}
}
fn is_empty(_: &crate::Page, c: &Self::Cursor) -> bool {
c.cur >= c.total
}
type Cursor = Cursor;
fn first_cursor(p: &crate::Page) -> Self::Cursor {
unsafe {
let hdr = &*header(p);
Cursor {
cur: 0,
total: hdr.n() as usize,
is_leaf: hdr.is_leaf(),
}
}
}
fn last_cursor(p: &crate::Page) -> Self::Cursor {
unsafe {
let hdr = &*header(p);
let total = hdr.n() as usize;
Cursor {
cur: total - 1,
total,
is_leaf: hdr.is_leaf(),
}
}
}
unsafe fn unchecked_current(page: &crate::Page, c: &Self::Cursor) -> (*mut K, *mut V, u64) {
if c.is_leaf {
let off = if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let hdr = (HDR + al - 1) & !(al - 1);
hdr + c.cur * f
} else {
u16::from_le(*(page.data.add(HDR + c.cur * 2) as *const u16)) as usize
};
let (k, v) = read::<T, K, V>(page.data.add(off as usize));
(k, v, 0)
} else {
let off = u64::from_le(*(page.data.add(HDR) as *const u64).add(c.cur));
let (k, v) = read::<T, K, V>(page.data.add((off & 0xfff) as usize));
(k, v, off & !0xfff)
}
}
unsafe fn unchecked_current_ptr(page: &crate::Page, c: &Self::Cursor) -> *mut u8 {
page.data.add(if c.is_leaf {
if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let hdr = (HDR + al - 1) & !(al - 1);
hdr + c.cur * f
} else {
u16::from_le(*(page.data.add(HDR + c.cur * 2) as *const u16)) as usize
}
} else {
(u64::from_le(*(page.data.add(HDR + c.cur * 8) as *const u64)) & 0xfff) as usize
})
}
fn current_size(page: &crate::Page, c: &Self::Cursor) -> usize {
unsafe {
if c.is_leaf {
if let Some(f) = fixed_size::<T, K, V>() {
f
} else {
2 + entry_size::<T, K, V>(
page.data
.add(u16::from_le(*(page.data.add(HDR) as *const u16).add(c.cur))
as usize),
)
}
} else {
8 + entry_size::<T, K, V>(page.data.add(
(u64::from_le(*(page.data.add(HDR) as *const u64).add(c.cur)) & 0xfff) as usize,
))
}
}
}
fn move_next(_page: &crate::Page, c: &mut Self::Cursor) -> bool {
if c.cur < c.total {
c.cur += 1;
true
} else {
false
}
}
fn left_child(page: &crate::Page, c: &Self::Cursor) -> u64 {
if c.is_leaf {
0
} else {
let off = unsafe { *(page.data.add((HDR + c.cur * 8) - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: &crate::Page, c: &Self::Cursor) -> u64 {
if c.is_leaf {
0
} else {
let off = unsafe { *(page.data.add(HDR + c.cur * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn set_cursor<'a>(
txn: &T,
page: &crate::Page,
c: &mut Cursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a mut K, &'a mut V, u64), usize> {
unsafe {
let hdr = &*header(&page);
c.total = hdr.n() as usize;
let result;
c.is_leaf = hdr.is_leaf();
let hdr = &*header(&page);
let lookup = if c.is_leaf {
if fixed_size::<T, K, V>().is_some() {
let al = K::ALIGN.max(V::ALIGN);
let hdr_size = (HDR + al - 1) & !(al - 1);
let s = core::slice::from_raw_parts(
page.data.add(hdr_size) as *const Tuple<K, V>,
hdr.n() as usize,
);
s.binary_search_by(|tup| {
if let Some(v0) = v0 {
(tup.k.ord(txn), tup.v.ord(txn)).cmp(&(k0.ord(txn), v0.ord(txn)))
} else {
(tup.k.ord(txn)).cmp(k0.ord(txn))
}
})
} else {
let s = core::slice::from_raw_parts(
page.data.add(HDR) as *const u16,
hdr.n() as usize,
);
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, v) = read::<T, K, V>(page.data.offset(off as isize));
if let Some(v0) = v0 {
((&*k).ord(txn), (&*v).ord(txn)).cmp(&(k0.ord(txn), v0.ord(txn)))
} else {
((&*k).ord(txn)).cmp(k0.ord(txn))
}
})
}
} else {
let s =
core::slice::from_raw_parts(page.data.add(HDR) as *const u64, hdr.n() as usize);
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, v) = read::<T, K, V>(page.data.offset(off as isize & 0xfff));
if let Some(v0) = v0 {
((&*k).ord(txn), (&*v).ord(txn)).cmp(&(k0.ord(txn), v0.ord(txn)))
} else {
((&*k).ord(txn)).cmp(k0.ord(txn))
}
})
};
c.cur = match lookup {
Ok(n) => {
result = if c.is_leaf {
let off = if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let hdr_size = (HDR + al - 1) & !(al - 1);
(0, (hdr_size + f * n) as u16)
} else {
let off = u16::from_le(*(page.data.add(HDR + n * 2) as *const u16));
(0, off)
};
Ok(Leaf::kv(*page, off))
} else {
let off = u64::from_le(*(page.data.add(HDR + n * 8) as *const u64));
Ok(Internal::kv(*page, (off & !0xfff, (off & 0xfff) as u16)))
};
n
}
Err(n) => {
result = Err(n);
n
}
};
result
}
}
fn split_at(_: &crate::Page, c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
Cursor {
cur: 0,
total: c.cur,
is_leaf: c.is_leaf,
},
*c,
)
}
}
fn fixed_size<T, K: Representable<T>, V: Representable<T>>() -> Option<usize> {
if let (Some(ks), Some(vs)) = (K::SIZE, V::SIZE) {
let s = ((ks + V::ALIGN - 1) & !(V::ALIGN - 1)) + vs;
let al = K::ALIGN.max(V::ALIGN);
Some((s + al - 1) & !(al - 1))
} else {
None
}
}
#[derive(Debug, Clone, Copy)]
pub struct Cursor {
cur: usize,
total: usize,
is_leaf: bool,
}
unsafe fn header(page: &crate::Page) -> *const Header {
page.data as *const Header
}
unsafe fn header_mut(page: crate::MutPage) -> *mut Header {
page.0.data as *mut Header
}
trait Alloc {
unsafe fn can_alloc<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool;
unsafe fn can_compact<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool;
unsafe fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16;
unsafe fn alloc_insert<T, K: Representable<T>, V: Representable<T>>(
hdr: &mut Header,
new: &MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize;
unsafe fn set_offset(new: MutPage, n: isize, r: u64, off: u16);
unsafe fn set_right_child(new: MutPage, n: isize, r: u64);
type Offset: Into<(u64, u16)> + Copy + core::fmt::Debug;
unsafe fn offset_slice<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
) -> Offsets<'a, Self::Offset>;
unsafe fn kv<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
off: (u64, u16),
) -> (&'a mut K, &'a mut V, u64);
}
#[derive(Debug, Clone)]
enum Offsets<'a, A> {
Slice(&'a [A]),
Range(core::ops::Range<usize>),
}
impl<'a, A: Into<(u64, u16)> + Copy> Offsets<'a, A> {
fn split_at(&self, mid: usize) -> (Self, Self) {
match self {
Offsets::Slice(s) => {
let (a, b) = s.split_at(mid);
(Offsets::Slice(a), Offsets::Slice(b))
}
Offsets::Range(r) => (
Offsets::Range(r.start..r.start + mid),
Offsets::Range(r.start + mid..r.end),
),
}
}
fn len(&self) -> usize {
match self {
Offsets::Slice(s) => s.len(),
Offsets::Range(r) => r.end - r.start,
}
}
fn first<T, K: Representable<T>, V: Representable<T>>(&self) -> (u64, u16) {
match self {
Offsets::Slice(s) => s[0].into(),
Offsets::Range(r) => {
let size = fixed_size::<T, K, V>().unwrap();
let al = K::ALIGN.max(V::ALIGN);
let hdr_size = (HDR + al - 1) & !(al - 1);
(0, (hdr_size + r.start * size) as u16)
}
}
}
}
struct Leaf {}
struct Internal {}
impl Alloc for Leaf {
unsafe fn can_alloc<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool {
if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let header_size = (HDR + al - 1) & !(al - 1);
header_size + (hdr.n() as usize) * f + size < u16::from_le(hdr.data) as usize
} else {
HDR + (hdr.n() as usize) * 2 + 2 + size < u16::from_le(hdr.data) as usize
}
}
unsafe fn can_compact<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool {
if fixed_size::<T, K, V>().is_some() {
let al = K::ALIGN.max(V::ALIGN);
let header_size = (HDR + al - 1) & !(al - 1);
header_size + ((hdr.left_page() & 0xfff) as usize) + size
< u16::from_le(hdr.data) as usize
} else {
HDR + ((hdr.left_page() & 0xfff) as usize) + 2 + size < 4096
}
}
unsafe fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16 {
let mut data = u16::from_le(hdr.data) - size as u16;
data -= data % (align as u16);
hdr.data = data.to_le();
hdr.set_n(hdr.n() + 1);
hdr.left_page = (hdr.left_page() + size as u64).to_le();
data
}
unsafe fn alloc_insert<T, K: Representable<T>, V: Representable<T>>(
hdr: &mut Header,
new: &MutPage,
n: &mut isize,
size: usize,
_: u64,
) -> usize {
if let Some(f) = fixed_size::<T, K, V>() {
let al = K::ALIGN.max(V::ALIGN);
let hdr_size = (HDR + al - 1) & !(al - 1);
debug!("{:?} {:?} {:?}", new, hdr.n(), *n);
core::ptr::copy(
new.0.data.add(hdr_size + (*n as usize) * f),
new.0.data.add(hdr_size + (*n as usize) * f + f),
(hdr.n() as usize - (*n as usize)) * f,
);
hdr.set_n(hdr.n() + 1);
hdr.left_page = (hdr.left_page() + f as u64).to_le();
hdr_size + (*n as usize) * f
} else {
let off_new = Self::alloc(&mut *hdr, size, K::ALIGN);
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * 2),
new.0.data.add(HDR + (*n as usize) * 2 + 2),
(hdr.n() as usize - (*n as usize)) * 2,
);
Self::set_offset(*new, *n, 0, off_new);
off_new as usize
}
}
unsafe fn set_offset(new: MutPage, n: isize, _: u64, off: u16) {
let ptr = new.0.data.offset(HDR as isize + n * 2) as *mut u16;
*ptr = off.to_le();
}
unsafe fn set_right_child(_: MutPage, _: isize, _: u64) {}
type Offset = LeafOffset;
unsafe fn offset_slice<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
) -> Offsets<'a, Self::Offset> {
let hdr = &*header(&page);
if fixed_size::<T, K, V>().is_some() {
Offsets::Range(0..(hdr.n() as usize))
} else {
Offsets::Slice(core::slice::from_raw_parts(
page.data.add(HDR) as *const LeafOffset,
hdr.n() as usize,
))
}
}
unsafe fn kv<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
(_, off): (u64, u16),
) -> (&'a mut K, &'a mut V, u64) {
let (k, v) = read::<T, K, V>(page.data.add(off as usize));
(&mut *k, &mut *v, 0)
}
}
impl Alloc for Internal {
unsafe fn can_alloc<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool {
(HDR as usize) + (hdr.n() as usize) * 8 + 8 + size < u16::from_le(hdr.data) as usize
}
unsafe fn can_compact<T, K: Representable<T>, V: Representable<T>>(
hdr: &Header,
size: usize,
) -> bool {
(HDR as usize) + ((hdr.left_page() & 0xfff) as usize) + 8 + size < 4096
}
unsafe fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16 {
let mut data = u16::from_le(hdr.data) - size as u16;
data -= data % (align as u16);
hdr.data = data.to_le();
hdr.set_n(hdr.n() + 1);
hdr.left_page = (hdr.left_page() + size as u64).to_le();
data
}
unsafe fn alloc_insert<T, K: Representable<T>, V: Representable<T>>(
hdr: &mut Header,
new: &MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize {
let off_new = Self::alloc(&mut *hdr, size, K::ALIGN.max(V::ALIGN));
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * 8),
new.0.data.add(HDR + (*n as usize) * 8 + 8),
(hdr.n() as usize - (*n as usize)) * 8,
);
Self::set_offset(*new, *n, r, off_new);
off_new as usize
}
unsafe fn set_offset(new: MutPage, n: isize, r: u64, off: u16) {
let ptr = new.0.data.offset(HDR as isize + n * 8) as *mut u64;
*ptr = (r | off as u64).to_le();
}
unsafe fn set_right_child(page: MutPage, n: isize, r: u64) {
let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
let off = u64::from_le(*ptr) & 0xfff;
*ptr = (r | off as u64).to_le();
}
type Offset = InternalOffset;
unsafe fn offset_slice<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
) -> Offsets<'a, Self::Offset> {
let hdr = &*header(&page);
Offsets::Slice(core::slice::from_raw_parts(
page.data.add(HDR) as *const InternalOffset,
hdr.n() as usize,
))
}
unsafe fn kv<'a, T, K: Representable<T>, V: Representable<T>>(
page: crate::Page,
(r, off): (u64, u16),
) -> (&'a mut K, &'a mut V, u64) {
let (k, v) = read::<T, K, V>(page.data.add(off as usize));
(&mut *k, &mut *v, r)
}
}
unsafe fn modify<T: LoadPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
new: MutPage,
m: &mut ModifiedPage<T, K, V, Page<K, V>>,
n: &mut isize,
) {
let hdr = &mut *header_mut(new);
let mut l = <Page<K, V>>::left_child(&m.page, &m.c0);
while let Some((k, v, r)) = <Page<K, V>>::next(&m.page, &mut m.c0) {
alloc::<_, _, _, L>(hdr, new, k, v, l, r, n);
l = 0;
}
if let Some((k, v, r)) = m.ins {
alloc::<_, _, _, L>(hdr, new, &*k, &*v, m.l, r, n);
} else {
l = m.l
}
let mut is_first = m.skip_first;
while let Some((k, v, r)) = <Page<K, V>>::next(&m.page, &mut m.c1) {
if is_first {
is_first = false;
continue;
}
alloc::<_, _, _, L>(hdr, new, k, v, l, r, n);
l = 0;
}
}
unsafe fn merge<T: LoadPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
new: MutPage,
m: &mut Concat<T, K, V, Page<K, V>>,
) {
let hdr = &mut *header_mut(new);
let mut n = 0;
if m.mod_is_left {
modify::<_, _, _, L>(new, m.modified, &mut n);
let mut rc = <Page<K, V>>::first_cursor(&m.other);
let l = <Page<K, V>>::left_child(&m.other, &rc);
alloc::<_, _, _, L>(hdr, new, &*m.mid.0, &*m.mid.1, 0, l, &mut n);
while let Some((k, v, r)) = <Page<K, V>>::next(&m.other, &mut rc) {
alloc::<_, _, _, L>(hdr, new, k, v, 0, r, &mut n);
}
} else {
let mut rc = <Page<K, V>>::first_cursor(&m.other);
let mut l = <Page<K, V>>::left_child(&m.other, &rc);
while let Some((k, v, r)) = <Page<K, V>>::next(&m.other, &mut rc) {
alloc::<_, _, _, L>(hdr, new, k, v, l, r, &mut n);
l = 0;
}
alloc::<_, _, _, L>(hdr, new, &*m.mid.0, &*m.mid.1, 0, 0, &mut n);
modify::<_, _, _, L>(new, m.modified, &mut n);
}
}
unsafe fn rebalance<'a, T: AllocPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
txn: &mut T,
m: &mut Concat<T, K, V, Page<K, V>>,
) -> Result<Op<'a, T, K, V>, T::Error> {
let rc = <Page<K, V>>::first_cursor(&m.other);
let rl = <Page<K, V>>::left_child(&m.other, &rc);
let (k, v, r) = <Page<K, V>>::unchecked_current(&m.other, &rc);
let hdr = &*header(&m.modified.page);
let mut freed = [0, 0];
let new_left = if hdr.is_dirty() {
let page = MutPage(m.modified.page);
let hdr = &mut *header_mut(page);
let mut n = m.modified.c1.total as isize;
alloc::<T, K, V, L>(hdr, page, &*m.mid.0, &*m.mid.1, 0, rl, &mut n);
page
} else {
let new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(new);
let s = L::offset_slice::<T, K, V>(m.modified.page);
let hdr = &mut *header_mut(new);
let mut n = 0;
clone::<T, K, V, L>(hdr, m.modified.page, new, s, &mut n);
alloc::<T, K, V, L>(hdr, new, &*m.mid.0, &*m.mid.1, 0, rl, &mut n);
let b = if hdr.is_dirty() { 1 } else { 0 };
freed[0] = m.modified.page.offset | b;
new
};
let new_right = <Page<K, V>>::del(txn, m.other, &rc, r)?;
if new_right.0.offset != m.other.offset {
let hdr = &*header(&m.other);
let b = if hdr.is_dirty() { 1 } else { 0 };
freed[1] = m.other.offset | b
}
Ok(Op::Rebalanced {
l: new_left.0.offset,
r: new_right.0.offset,
k,
v,
freed,
})
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
struct LeafOffset(u16);
impl Into<(u64, u16)> for LeafOffset {
fn into(self) -> (u64, u16) {
(0, self.0)
}
}
impl Into<usize> for LeafOffset {
fn into(self) -> usize {
self.0 as usize
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
struct InternalOffset(u64);
impl Into<(u64, u16)> for InternalOffset {
fn into(self) -> (u64, u16) {
(self.0 & !0xfff, (self.0 & 0xfff) as u16)
}
}
impl Into<usize> for InternalOffset {
fn into(self) -> usize {
self.0 as usize
}
}
unsafe fn clone<T, K: Representable<T>, V: Representable<T>, L: Alloc>(
hdr: &mut Header,
page: crate::Page,
new: MutPage,
s: Offsets<L::Offset>,
n: &mut isize,
) {
match s {
Offsets::Slice(s) => {
for off in s.iter() {
let (r, off): (u64, u16) = (*off).into();
let off = u16::from_le(off);
let ptr = page.data.add(off as usize);
let size = entry_size::<T, K, V>(ptr);
let off_new = L::alloc(&mut *hdr, size, K::ALIGN);
core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
L::set_offset(new, *n, r, off_new);
*n += 1;
}
}
Offsets::Range(r) => {
let size = fixed_size::<T, K, V>().unwrap();
let a = K::ALIGN.max(V::ALIGN);
let header_size = (HDR + a - 1) & !(a - 1);
for off in r {
let ptr = page.data.add(header_size + off * size);
let new_ptr = new.0.data.add(header_size + (*n as usize) * size);
core::ptr::copy_nonoverlapping(ptr, new_ptr, size);
hdr.set_n(hdr.n() + 1);
hdr.left_page = (hdr.left_page() + size as u64).to_le();
*n += 1;
}
}
}
}
unsafe fn alloc<T, K: Representable<T>, V: Representable<T>, L: Alloc>(
hdr: &mut Header,
new: MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let size = alloc_size(k0, v0);
let off_new = L::alloc_insert::<T, K, V>(hdr, &new, n, size, r);
let new_ptr = new.0.data.add(off_new as usize);
core::ptr::copy_nonoverlapping(k0, new_ptr as *mut K, 1);
let ks = k0.size();
let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));
core::ptr::copy_nonoverlapping(v0, v_ptr as *mut V, 1);
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
unsafe fn put<'a, T: AllocPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
txn: &mut T,
page: crate::Page,
mutable: bool,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<Put<'a, K, V>, T::Error> {
let size = alloc_size(k0, v0);
let hdr = &*header(&page);
if mutable && hdr.is_dirty() && L::can_alloc::<T, K, V>(&*header(&page), size) {
let page = MutPage(page);
let hdr = &mut *header_mut(page);
let mut n = u as isize;
alloc::<_, _, _, L>(hdr, page, k0, v0, l, r, &mut n);
Ok(Put::Ok(Ok { page, freed: 0 }))
} else if L::can_compact::<T, K, V>(hdr, size) {
let new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(new);
let s = L::offset_slice::<T, K, V>(page);
let (s0, s1) = s.split_at(u as usize);
let hdr = &mut *header_mut(new);
let mut n = 0;
clone::<T, K, V, L>(hdr, page, new, s0, &mut n);
alloc::<T, K, V, L>(hdr, new, k0, v0, l, r, &mut n);
clone::<T, K, V, L>(hdr, page, new, s1, &mut n);
let b0 = if hdr.is_dirty() { 1 } else { 0 };
return Ok(Put::Ok(Ok {
page: new,
freed: page.offset | b0,
}));
} else {
if let Some(s) = fixed_size::<T, K, V>() {
return split::<_, _, _, L>(txn, page, mutable, s, u, k0, v0, l, r);
} else {
return split_unsized::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r);
}
}
}
unsafe fn split<'a, T: AllocPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
txn: &mut T,
page: crate::Page,
mutable: bool,
size: usize,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<Put<'a, K, V>, T::Error> {
let left;
let right = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(right);
let hdr = &*header(&page);
let page_is_dirty = if hdr.is_dirty() { 1 } else { 0 };
let n = hdr.n() + 1;
let s = L::offset_slice::<T, K, V>(page);
let k = n / 2;
debug!("s = {:?}", s);
let (s0, s1) = s.split_at(k as usize);
debug!("s0 = {:?} s1 = {:?}", s0, s1);
let (split_key, split_value, mid_child, s1) = if u == k as usize {
// The inserted element is exactly in the middle.
(k0, v0, r, s1)
} else {
let (s1a, s1b) = s1.split_at(1);
let (k, v, r) = L::kv(page, s1a.first::<T, K, V>());
(&*k, &*v, r, s1b)
};
let mut freed = 0;
if mutable && hdr.is_dirty() && u >= k as usize {
// (k0, v0) is to be inserted on the right-hand side of
// the split, hence we don't have to clone the left-hand
// side, we can just truncate it.
let hdr = &mut *header_mut(MutPage(page));
left = MutPage(page);
hdr.set_n(k);
hdr.decr((n - 1 - k) as usize * size);
} else {
left = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<T, K, V>>::init(left);
if u < k as usize {
let mut n = 0;
let hdr = &mut *header_mut(left);
let (s0a, s0b) = s1.split_at(u as usize - k as usize);
clone::<T, K, V, L>(hdr, page, left, s0a, &mut n);
alloc::<T, K, V, L>(hdr, left, k0, v0, r, l, &mut n);
clone::<T, K, V, L>(hdr, page, left, s0b, &mut n);
let hdr = &mut *header_mut(right);
let mut n = 0;
L::set_right_child(right, -1, mid_child);
clone::<T, K, V, L>(hdr, page, right, s1, &mut n);
return Ok(Put::Split {
split_key,
split_value,
left,
right,
freed: page.offset | page_is_dirty,
});
} else {
let hdr = &mut *header_mut(left);
let mut n = 0;
clone::<T, K, V, L>(hdr, page, left, s0, &mut n);
freed = page.offset | page_is_dirty
}
}
// If we are here, u > k, i.e. the insertion is in the right-hand
// side of the split.
let hdr = &mut *header_mut(right);
let mut n = 0;
let kk = u as usize - k as usize;
let (s1a, s1b) = if kk < s1.len() {
debug!("s1 = {:?}, kk = {:?}", s1, kk);
s1.split_at(kk)
} else {
(s1, Offsets::Slice(&[][..]))
};
debug!("s1a = {:?} s1b = {:?}", s1a, s1b);
L::set_right_child(right, -1, mid_child);
debug!("clone {:?} {:?}", n, s1a);
clone::<T, K, V, L>(hdr, page, right, s1a, &mut n);
debug!("alloc {:?}", n);
alloc::<T, K, V, L>(hdr, right, k0, v0, l, r, &mut n);
debug!("clone 2 {:?}", n);
clone::<T, K, V, L>(hdr, page, right, s1b, &mut n);
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed,
})
}
unsafe fn split_unsized<'a, T: AllocPage, K: Representable<T>, V: Representable<T>, L: Alloc>(
_txn: &mut T,
_page: crate::Page,
_mutable: bool,
_u: usize,
_k0: &'a K,
_v0: &'a V,
_l: u64,
_r: u64,
) -> Result<Put<'a, K, V>, T::Error> {
unimplemented!()
}
use crate::*;
pub mod cursor;
mod del;
pub use del::*;
mod put;
pub use put::*;
pub mod page;
#[derive(Debug)]
pub struct Ok {
page: MutPage,
freed: u64,
}
#[derive(Debug)]
pub enum Put<'a, K, V> {
Ok(Ok),
Split {
split_key: &'a K,
split_value: &'a V,
left: MutPage,
right: MutPage,
freed: u64,
},
}
pub trait BTreePage<T: LoadPage, K: Representable<T>, V: Representable<T>> {
type Cursor: Clone + Copy + core::fmt::Debug;
fn is_dirty(page: Page) -> bool;
fn first_cursor(p: &Page) -> Self::Cursor;
fn last_cursor(p: &Page) -> Self::Cursor;
fn next<'b>(p: &Page, c: &mut Self::Cursor) -> Option<(&'b mut K, &'b mut V, u64)> {
unsafe {
if let Some((k, v, r)) = Self::current(p, c) {
Self::move_next(p, c);
Some((&mut *k, &mut *v, r))
} else {
None
}
}
}
fn move_next<'b>(p: &Page, c: &mut Self::Cursor) -> bool;
unsafe fn unchecked_current(p: &Page, c: &Self::Cursor) -> (*mut K, *mut V, u64);
unsafe fn current(p: &Page, c: &Self::Cursor) -> Option<(*mut K, *mut V, u64)> {
if Self::is_empty(p, c) {
None
} else {
Some(Self::unchecked_current(p, c))
}
}
unsafe fn unchecked_current_ptr(p: &Page, c: &Self::Cursor) -> *mut u8;
fn current_size(p: &Page, c: &Self::Cursor) -> usize;
fn left_child(p: &Page, c: &Self::Cursor) -> u64;
fn right_child(p: &Page, c: &Self::Cursor) -> u64;
fn set_cursor<'a>(
txn: &T,
page: &Page,
c: &mut Self::Cursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a mut K, &'a mut V, u64), usize>;
fn split_at(p: &Page, c: &Self::Cursor) -> (Self::Cursor, Self::Cursor);
fn is_empty(p: &Page, c: &Self::Cursor) -> bool;
}
pub struct PageIterator<
'a,
T: LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreePage<T, K, V>,
> {
cursor: P::Cursor,
page: &'a Page,
}
impl<
'a,
T: LoadPage,
K: Representable<T> + 'a,
V: Representable<T> + 'a,
P: BTreePage<T, K, V>,
> Iterator for PageIterator<'a, T, K, V, P>
{
type Item = (&'a mut K, &'a mut V, u64);
fn next(&mut self) -> Option<Self::Item> {
P::next(self.page, &mut self.cursor)
}
}
pub trait BTreeMutPage<T: AllocPage, K: Representable<T>, V: Representable<T>>:
BTreePage<T, K, V> + Sized
{
fn init(page: MutPage);
fn clean(page: MutPage);
fn put<'a>(
txn: &mut T,
page: Page,
mutable: bool,
c: &Self::Cursor,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<Put<'a, K, V>, T::Error>;
fn update_left_child(
txn: &mut T,
page: Page,
mutable: bool,
c: &Self::Cursor,
r: u64,
) -> Result<Ok, T::Error>;
fn del(txn: &mut T, page: Page, c: &Self::Cursor, l: u64) -> Result<MutPage, T::Error>;
fn replace<'a>(
txn: &mut T,
page: Page,
mutable: bool,
c: &Self::Cursor,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<Put<'a, K, V>, T::Error> {
// TODO: optimise this for page::Page<K, V>, since this moves
// all the offsets twice in this case.
let new = Self::del(txn, page, c, l)?;
Self::put(txn, new.0, mutable, c, k0, v0, l, r)
}
fn merge_or_rebalance<'a>(
txn: &mut T,
m: &mut Concat<T, K, V, Self>,
) -> Result<Op<'a, T, K, V>, T::Error>;
fn modify<'a>(
txn: &mut T,
m: &mut ModifiedPage<T, K, V, Self>,
) -> Result<Put<'a, K, V>, T::Error> {
unsafe {
if let Some((k, v, r)) = m.ins {
if m.skip_first {
Self::replace(txn, m.page, m.mutable, &m.c1, &*k, &*v, m.l, r)
} else {
Self::put(txn, m.page, m.mutable, &m.c1, &*k, &*v, m.l, r)
}
} else {
let page = Self::del(txn, m.page, &m.c1, m.l)?;
Ok(Put::Ok(Ok {
page,
freed: if page.0.offset != m.page.offset {
m.page.offset
} else {
0
},
}))
}
}
}
fn size(m: &ModifiedPage<T, K, V, Self>) -> usize;
}
pub enum Op<'a, T, K: Representable<T>, V: Representable<T>> {
Merged {
page: MutPage,
freed: [u64; 2],
marker: core::marker::PhantomData<T>,
},
Rebalanced {
k: *mut K,
v: *mut V,
l: u64,
r: u64,
freed: [u64; 2],
},
Put(Put<'a, K, V>),
}
#[derive(Debug, Clone, Copy)]
pub struct ModifiedPage<
T: LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreePage<T, K, V>,
> {
pub page: Page,
/// Whether the page can be written to (useful for RC).
pub mutable: bool,
// Start copying c0 (keep `page`'s left child).
pub c0: P::Cursor,
// Replace the right child of c0's last element with `l`.
pub l: u64,
// Possibly insert a new binding.
pub ins: Option<(*const K, *const V, u64)>,
// The first element of c1 is to be deleted, the others must be copied.
pub c1: P::Cursor,
// Whether to skip `c1`'s first element.
pub skip_first: bool,
}
impl<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>>
ModifiedPage<T, K, V, P>
{
pub fn single_child(&self) -> Option<u64> {
let mut c1 = self.c1.clone();
if self.skip_first {
P::move_next(&self.page, &mut c1);
}
if P::is_empty(&self.page, &self.c0) && self.ins.is_none() && P::is_empty(&self.page, &c1) {
Some(self.l)
} else {
None
}
}
}
#[derive(Debug)]
pub struct Concat<'a, T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>>
{
pub mid: (*mut K, *mut V),
pub modified: &'a mut ModifiedPage<T, K, V, P>,
pub other: Page,
// Is the modified field on the left or on the right of the
// concatenation?
pub mod_is_left: bool,
}
#[derive(Debug)]
pub struct Db<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>> {
pub db: Page,
pub marker: core::marker::PhantomData<(T, K, V, P)>,
}
pub fn create_db<
T: AllocPage,
K: Representable<T>,
V: Representable<T>,
P: BTreeMutPage<T, K, V>,
>(
txn: &mut T,
) -> Result<Db<T, K, V, P>, T::Error> {
let page = txn.alloc_page()?;
P::init(page);
Ok(Db {
db: page.0,
marker: core::marker::PhantomData,
})
}
// Copyright 2015 Pierre-Étienne Meunier and Florent Becker. See the
// COPYRIGHT file at the top-level directory of this distribution and
// at http://pijul.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use super::cursor::*;
use super::*;
use crate::LoadPage;
use core::mem::MaybeUninit;
/// If `value` is `None`, delete one of the bindings associated to
/// `key` from the database (without any specific
/// preference). Else, delete the specified binding if present.
pub fn del<
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreeMutPage<T, K, V> + core::fmt::Debug,
>(
txn: &mut T,
db: &mut Db<T, K, V, P>,
key: &K,
value: Option<&V>,
) -> Result<bool, T::Error> {
let mut cursor = Cursor::new(db);
let found = cursor.set(txn, Some((key, value)))?;
if found.is_none() {
return Ok(false);
}
del_at_cursor(txn, db, &mut cursor)
}
pub fn del_at_cursor<
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreeMutPage<T, K, V> + core::fmt::Debug,
>(
txn: &mut T,
db: &mut Db<T, K, V, P>,
cursor: &mut Cursor<T, K, V, P>,
) -> Result<bool, T::Error> {
let p0 = cursor.pointer;
// Find the leftmost page in the right subtree, that is where
// the "replacement" is.
find_min(txn, cursor)?;
// Mark the replacement for deletion in the leaf. This is all
// lazy, so we don't do anything for now, in order to avoid
// duplicate work in case the page can be merged or
// rebalanced.
let ref mut curs0 = unsafe { cursor.stack[cursor.pointer].assume_init() };
let (c0, c1) = P::split_at(&curs0.page, curs0.cursor.as_ref().unwrap());
let mut last_op = ModifiedPage {
page: curs0.page,
mutable: cursor.pointer < cursor.first_rc_level,
c0,
l: 0,
ins: None,
c1,
skip_first: true,
};
if cursor.pointer == cursor.first_rc_level {
txn.decr_rc(curs0.page.offset)?;
}
if cursor.pointer >= cursor.first_rc_level {
let mut c0 = c0.clone();
let mut c1 = c1.clone();
P::move_next(&curs0.page, &mut c1);
while let Some((k, v, _)) =
P::next(&curs0.page, &mut c0).or_else(|| P::next(&curs0.page, &mut c1))
{
for o in k.page_offsets().chain(v.page_offsets()) {
txn.incr_rc(o)?;
}
}
}
if p0 < cursor.pointer && cursor.pointer >= cursor.first_rc_level {
// increase the RC of the replacement.
unsafe {
let (k, v, _) = P::unchecked_current(&curs0.page, &c0);
for o in (&*k).page_offsets().chain((&*v).page_offsets()) {
txn.incr_rc(o)?;
}
}
}
let mut free = [[0, 0]; N_CURSORS];
// Then, climb up the stack, and perform the lazy operations.
cursor.pointer -= 1;
while cursor.pointer > 0 {
// Compute the need for merge/rebalancing
let ref curs = cursor.current();
let c = curs.cursor.as_ref().unwrap();
let mut concat = concat(txn, &cursor, &curs0, p0, &mut last_op)?;
let (c0, c1) = P::split_at(&curs.page, c);
match P::merge_or_rebalance(txn, &mut concat)? {
Op::Merged {
page,
freed,
marker: _,
} => {
last_op = ModifiedPage {
page: curs.page,
mutable: cursor.pointer < cursor.first_rc_level,
c0,
l: page.0.offset,
ins: None,
c1,
skip_first: true,
};
if cursor.pointer < cursor.first_rc_level {
free[cursor.pointer] = freed;
} else {
if cursor.pointer == cursor.first_rc_level {
txn.decr_rc(curs.page.offset)?;
}
modify_rc(txn, &last_op)?;
}
}
Op::Rebalanced { k, v, l, r, freed } => {
if cursor.pointer < cursor.first_rc_level {
free[cursor.pointer] = freed;
}
last_op = ModifiedPage {
page: curs.page,
mutable: cursor.pointer < cursor.first_rc_level,
c0,
l,
ins: Some((k, v, r)),
c1,
skip_first: true,
};
if cursor.pointer < cursor.first_rc_level {
free[cursor.pointer] = freed;
} else {
if cursor.pointer == cursor.first_rc_level {
txn.decr_rc(curs.page.offset)?;
}
modify_rc(txn, &last_op)?;
}
}
Op::Put(Put::Ok(Ok { page, freed })) => {
last_op = ModifiedPage {
page: curs.page,
mutable: cursor.pointer < cursor.first_rc_level,
c0,
l: page.0.offset,
ins: None,
c1,
skip_first: false,
};
if cursor.pointer < cursor.first_rc_level {
free[cursor.pointer][0] = freed;
} else {
if cursor.pointer == cursor.first_rc_level {
txn.decr_rc(curs.page.offset)?;
}
modify_rc(txn, &last_op)?;
}
}
Op::Put(Put::Split {
left,
right,
split_key,
split_value,
freed,
}) => {
last_op = ModifiedPage {
page: curs.page,
mutable: cursor.pointer < cursor.first_rc_level,
c0,
l: left.0.offset,
ins: Some((split_key, split_value, right.0.offset)),
c1,
skip_first: false,
};
if cursor.pointer < cursor.first_rc_level {
free[cursor.pointer][0] = freed;
} else {
if cursor.pointer == cursor.first_rc_level {
txn.decr_rc(curs.page.offset)?;
}
modify_rc(txn, &last_op)?;
}
if cursor.pointer + 1 >= cursor.first_rc_level {
for o in split_key.page_offsets().chain(split_value.page_offsets()) {
txn.incr_rc(o)?;
}
}
}
}
cursor.pointer -= 1;
}
// The root was merged or rebalanced.
if let Some(d) = last_op.single_child() {
db.db = txn.load_page(d)?
} else {
match P::modify(txn, &mut last_op)? {
Put::Ok(Ok { page, freed }) => {
free[0][0] = freed;
db.db = page.0
}
Put::Split {
split_key,
split_value,
left,
right,
freed,
} => {
free[0][0] = freed;
let page = txn.alloc_page()?;
P::init(page);
P::put(
txn,
page.0,
true,
&P::first_cursor(&page.0),
split_key,
split_value,
left.0.offset,
right.0.offset,
)?;
if cursor.first_rc_level <= 1 {
for o in split_key.page_offsets().chain(split_value.page_offsets()) {
txn.incr_rc(o)?;
}
}
db.db = page.0
}
}
}
for p in free.iter() {
if p[0] & 1 == 1 {
txn.decr_rc_owned(p[0] ^ 1)?
} else if p[0] > 0 {
txn.decr_rc_owned(p[0])?
}
if p[1] & 1 == 1 {
txn.decr_rc_owned(p[1] ^ 1)?
} else if p[1] > 0 {
txn.decr_rc_owned(p[1])?
}
}
Ok(true)
}
fn find_min<
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreeMutPage<T, K, V> + core::fmt::Debug,
>(
txn: &mut T,
cursor: &mut Cursor<T, K, V, P>,
) -> Result<(), T::Error> {
let cur = cursor.current();
let mut left_page = P::right_child(&cur.page, cur.cursor.as_ref().unwrap());
while left_page > 0 {
cursor.pointer += 1;
let page = txn.load_page(left_page)?;
let curs = P::first_cursor(&page);
left_page = P::left_child(&page, &curs);
cursor.stack[cursor.pointer] = MaybeUninit::new(PageCursor {
page,
cursor: Some(curs),
});
}
Ok(())
}
fn concat<
'a,
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreeMutPage<T, K, V> + core::fmt::Debug,
>(
txn: &mut T,
cursor: &Cursor<T, K, V, P>,
curs0: &PageCursor<T, K, V, P>,
p0: usize,
last_op: &'a mut ModifiedPage<T, K, V, P>,
) -> Result<Concat<'a, T, K, V, P>, T::Error> {
let ref curs = cursor.current();
let c = curs.cursor.as_ref().unwrap();
if cursor.pointer == p0 {
let other = txn.load_page(P::left_child(&curs.page, c))?;
let (k, v, _) =
unsafe { P::unchecked_current(&curs0.page, curs0.cursor.as_ref().unwrap()) };
Ok(Concat {
modified: last_op,
mid: (k, v),
other,
mod_is_left: false,
})
} else {
let (k, v, r) = unsafe { P::unchecked_current(&curs.page, c) };
let other = txn.load_page(r)?;
Ok(Concat {
modified: last_op,
mid: (k, v),
other,
mod_is_left: true,
})
}
}
fn modify_rc<
T: AllocPage + LoadPage,
K: Representable<T>,
V: Representable<T>,
P: BTreePage<T, K, V>,
>(
txn: &mut T,
m: &ModifiedPage<T, K, V, P>,
) -> Result<(), T::Error> {
let mut c0 = m.c0.clone();
let mut c1 = m.c1.clone();
let mut left = P::left_child(&m.page, &c0);
while let Some((k, v, r)) = P::next(&m.page, &mut c0) {
for o in k.page_offsets().chain(v.page_offsets()) {
txn.incr_rc(o)?;
}
txn.incr_rc(left)?;
left = r;
}
if m.skip_first {
P::move_next(&m.page, &mut c1);
} else {
txn.incr_rc(left)?;
}
while let Some((k, v, r)) = P::next(&m.page, &mut c1) {
for o in k.page_offsets().chain(v.page_offsets()) {
txn.incr_rc(o)?;
}
txn.incr_rc(r)?;
}
Ok(())
}
use super::*;
use crate::{LoadPage, Page};
use core::mem::MaybeUninit;
use log::*;
#[doc(hidden)]
#[derive(Debug)]
pub struct PageCursor<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>>
{
pub page: Page,
pub cursor: Option<P::Cursor>,
}
impl<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>> Clone
for PageCursor<T, K, V, P>
{
fn clone(&self) -> Self {
PageCursor {
page: self.page,
cursor: self.cursor.clone(),
}
}
}
impl<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>> Copy
for PageCursor<T, K, V, P>
{
}
// This is 1 + the maximal depth of a tree. Cursors start at 1 to
// allow for splitting the root on an insertion.
//
// Since pages are of size 2^12, there are at most 2^52 addressable
// pages (potentially less depending on the platform). Since each page
// of a B tree below the root has at least 4 elements, the arity is at
// least 5, except for the root. Since 5^23 is the smallest power of 5
// larger than 2^52, the maximum depth is 24, and we add the extra
// room to allow for splitting.
pub(crate) const N_CURSORS: usize = 25;
#[derive(Debug, Clone)]
/// A position in a database (mostly for internal use).
pub struct Cursor<T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>> {
pub stack: [core::mem::MaybeUninit<PageCursor<T, K, V, P>>; N_CURSORS],
pub first_rc_level: usize,
pub pointer: usize,
}
impl<'a, T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>>
Cursor<T, K, V, P>
{
pub fn new(db: &Db<T, K, V, P>) -> Self {
let mut stack = [core::mem::MaybeUninit::uninit(); N_CURSORS];
stack[1] = MaybeUninit::new(PageCursor {
page: db.db,
cursor: None,
});
Cursor {
stack,
first_rc_level: N_CURSORS,
pointer: 1,
}
}
}
impl<'a, T: LoadPage, K: Representable<T>, V: Representable<T>, P: BTreePage<T, K, V>>
Cursor<T, K, V, P>
{
pub fn current(&self) -> &PageCursor<T, K, V, P> {
unsafe { &*self.stack[self.pointer].as_ptr() }
}
pub fn set(
&mut self,
txn: &'a T,
k: Option<(&K, Option<&V>)>,
) -> Result<Option<(&'a mut K, &'a mut V)>, T::Error> {
// Set the "cursor stack" by setting a skip list cursor in
// each page from the root to the appropriate leaf.
let mut last_matching_page = 0;
let mut last_match = None;
loop {
let current = unsafe { &mut *self.stack[self.pointer].as_mut_ptr() };
let page = current.page;
if self.first_rc_level >= N_CURSORS && txn.rc(page.offset)? >= 2 {
self.first_rc_level = self.pointer
}
if current.cursor.is_none() {
current.cursor = Some(P::first_cursor(&page));
}
let cursor = current.cursor.as_mut().unwrap();
if let Some((k, v)) = k {
if let Ok((kk, vv, _)) = P::set_cursor(txn, &page, cursor, k, v) {
if v.is_some() {
return Ok(Some((kk, vv)));
}
last_match = Some((kk, vv));
last_matching_page = self.pointer
}
}
let next_page = P::left_child(&page, cursor);
debug!("next = {:?}", next_page);
if next_page > 0 {
self.pointer += 1;
self.stack[self.pointer] = MaybeUninit::new(PageCursor {
page: txn.load_page(next_page)?,
cursor: None,
});
} else {
break;
}
}
if last_matching_page > 0 {
self.pointer = last_matching_page;
Ok(last_match)
} else {
Ok(None)
}
}
pub fn set_last(&mut self, txn: &'a T) -> Result<Option<(&'a K, &'a V)>, T::Error> {
// Set the "cursor stack" by setting a skip list cursor in
// each page from the root to the appropriate leaf.
let mut last_match;
loop {
let current = unsafe { &mut *self.stack[self.pointer].as_mut_ptr() };
if self.first_rc_level >= N_CURSORS && txn.rc(current.page.offset)? >= 2 {
self.first_rc_level = self.pointer
}
if current.cursor.is_none() {
current.cursor = Some(P::last_cursor(¤t.page));
}
let cursor = current.cursor.as_mut().unwrap();
let (k, v, r) = unsafe { P::unchecked_current(¤t.page, cursor) };
last_match = Some((k, v));
if r > 0 {
self.pointer += 1;
self.stack[self.pointer] = MaybeUninit::new(PageCursor {
page: txn.load_page(r)?,
cursor: None,
})
} else {
break;
}
}
Ok(last_match.map(|(k, v)| unsafe { (&*k, &*v) }))
}
pub fn next<L: LoadPage>(
&mut self,
txn: &'a mut L,
) -> Result<Option<(&mut K, &mut V)>, L::Error> {
loop {
if self.pointer == 0 {
return Ok(None);
} else {
let current = unsafe { &mut *self.stack[self.pointer].as_mut_ptr() };
if let Some(ref mut c) = current.cursor {
// We're inside the page, and have already
// processed the left child of the current page
// cursor.
if let Some((k, v, r)) = P::next(¤t.page, c) {
if r > 0 {
self.pointer += 1;
self.stack[self.pointer] = MaybeUninit::new(PageCursor {
page: txn.load_page(r)?,
cursor: None,
})
}
return Ok(Some((&mut *k, &mut *v)));
} else if self.pointer > 1 {
self.pointer -= 1
} else {
return Ok(None);
}
} else {
current.cursor = Some(P::first_cursor(¤t.page));
// First element of a page (not a binding).
let cursor = current.cursor.as_ref().unwrap();
let left = P::left_child(¤t.page, cursor);
debug!("left = {:?}", left);
// Then visit the right child (if any), i.e. push.
if left != 0 {
self.pointer += 1;
self.stack[self.pointer] = MaybeUninit::new(PageCursor {
page: txn.load_page(left)?,
cursor: None,
})
}
}
}
}
}
}
[package]
name = "sanakirja-core"
version = "0.1.0"
authors = ["pe"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "*"
use sanakirja_core::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub fn parallel_benchmark() {
let mut stack: [[u64; 512]; N] = [TESTPAGE; N];
let stack: [SafePage; N] = [
SafePage::from_buf(&mut stack[0], 4096),
SafePage::from_buf(&mut stack[1], 4096 * 2),
SafePage::from_buf(&mut stack[2], 4096 * 3),
SafePage::from_buf(&mut stack[3], 4096 * 4),
/*SafePage::from_buf(&mut stack[4], 4096 * 5),
SafePage::from_buf(&mut stack[5], 4096 * 6),
SafePage::from_buf(&mut stack[6], 4096 * 7),
SafePage::from_buf(&mut stack[7], 4096 * 8),
SafePage::from_buf(&mut stack[8], 4096 * 9),
SafePage::from_buf(&mut stack[9], 4096 * 10),
SafePage::from_buf(&mut stack[10], 4096 * 11),
SafePage::from_buf(&mut stack[11], 4096 * 12),*/
];
let pointer_clone = Arc::new(AtomicUsize::new(0));
let result = Arc::new(AtomicUsize::new(0));
[
start_thread(pointer_clone.clone(), result.clone(), stack.clone()),
start_thread(pointer_clone.clone(), result.clone(), stack.clone()),
start_thread(pointer_clone.clone(), result.clone(), stack.clone()),
start_thread(pointer_clone.clone(), result.clone(), stack.clone()),
];
// Wait for the other thread to release the lock
while result.load(Ordering::SeqCst) != 0 {}
let mut total = std::time::Duration::from_secs(0);
let mut total_seq = std::time::Duration::from_secs(0);
for _ in 0..100_000 {
let time = std::time::SystemTime::now();
pointer_clone.store(4, Ordering::Release);
result.store(4, Ordering::Release);
while result.load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
total += time.elapsed().unwrap();
let time = std::time::SystemTime::now();
for &s in stack.iter() {
let s: MutPage = s.into();
let header_size = 16;
let size = 8;
for off in 0..500 {
unsafe {
let ptr = (TESTPAGE.as_ptr() as *const u8).add(header_size + off * size);
let new_ptr = s.0.data.add(header_size + off * size);
std::ptr::copy_nonoverlapping(ptr, new_ptr, size);
}
}
}
total_seq += time.elapsed().unwrap();
}
println!("{:?} {:?}", total, total_seq);
}
#[derive(Debug, Clone, Copy)]
struct SafePage {
data: *mut u8,
offset: u64,
}
impl From<MutPage> for SafePage {
fn from(m: MutPage) -> Self {
SafePage {
data: m.0.data,
offset: m.0.offset,
}
}
}
impl From<SafePage> for MutPage {
fn from(m: SafePage) -> Self {
MutPage(Page {
data: m.data,
offset: m.offset,
})
}
}
impl SafePage {
fn from_buf(b: &mut [u64], offset: u64) -> Self {
SafePage {
data: b.as_mut_ptr() as *mut u8,
offset,
}
}
}
unsafe impl Sync for SafePage {}
unsafe impl Send for SafePage {}
fn start_thread(
n: Arc<AtomicUsize>,
result: Arc<AtomicUsize>,
stack: [SafePage; N],
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || loop {
let mut current = 0;
while current == 0 {
current = n.load(Ordering::Acquire);
std::hint::spin_loop();
}
while current > 0 {
let mut cur = n.compare_and_swap(current, current - 1, Ordering::Acquire);
if cur != current {
current -= 1;
continue;
}
cur -= 1;
let s: MutPage = stack[cur].into();
let header_size = 16;
let size = 8;
for off in 0..500 {
unsafe {
let ptr = (TESTPAGE.as_ptr() as *const u8).add(header_size + off * size);
let new_ptr = s.0.data.add(header_size + off * size);
std::ptr::copy_nonoverlapping(ptr, new_ptr, size);
}
}
result.fetch_sub(1, Ordering::Release);
current = cur;
}
})
}
const N: usize = 4;
const TESTPAGE: [u64; 512] = [
268435955, 3992, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45,
46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93,
94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113,
114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132,
133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151,
152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170,
171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189,
190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208,
209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227,
228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246,
247, 248, 249, 250, 251, 252, 253, 254, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266,
267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285,
286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304,
305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323,
324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342,
343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361,
362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380,
381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399,
400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418,
419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437,
438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456,
457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475,
476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494,
495, 496, 497, 498, 499, 499, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
use lmdb_rs::*;
use log::*;
use sanakirja::environment::*;
use sanakirja_core::btree::*;
// mod parallel_benchmark;
type B = sanakirja_core::btree::page::Page<u64, u64>;
fn main() {
env_logger::init();
main::main();
// main::parallel_benchmark();
}
mod main {
use super::*;
pub fn main() {
let env = Env::new_anon(409600000).unwrap();
let mut txn = Env::mut_txn_begin(&env).unwrap();
let mut db = create_db::<MutTxn<&Env<Exclusive>, ()>, u64, u64, B>(&mut txn).unwrap();
let n = 1_000_000u64;
let now = std::time::SystemTime::now();
for i in 0..n {
debug!("put {:?}", i);
put(&mut txn, &mut db, &(i * i), &(i * i * i)).unwrap();
}
println!("moi: {:?}", now.elapsed());
/*
let mut btree = std::collections::BTreeMap::new();
let now = std::time::SystemTime::now();
for i in 0..n {
debug!("put {:?}", i);
btree.insert(i*i, i*i*i);
}
println!("std: {:?}", now.elapsed());
std::mem::forget(txn);
{
let env = EnvBuilder::new()
.map_size(1 << 27)
.open("test-lmdb", 0o777)
.unwrap();
let db_handle = env.get_default_db(DbFlags::empty()).unwrap();
let txn = env.new_transaction().unwrap();
{
let db = txn.bind(&db_handle); // get a database bound to this transaction
let now = std::time::SystemTime::now();
for i in 0..n {
db.set(&(i * i), &(i * i * i)).unwrap();
}
println!("lmdb: {:?}", now.elapsed());
}
}
*/
}
}
use thiserror::*;
pub mod environment;
/// Errors that can occur while transacting.
#[derive(Debug, Error)]
pub enum Error {
/// IO errors, from the `std::io` module.
#[error(transparent)]
IO(#[from] std::io::Error),
/// Lock poisoning error.
#[error("Lock poisoning")]
Poison,
/// Version mismatch
#[error("Version mismatch")]
VersionMismatch,
/// CRC check failed
#[error(transparent)]
CRC(#[from] CRCError),
}
/// A CRC check failed
#[derive(Debug, Error)]
#[error("CRC check failed")]
pub struct CRCError {}
use super::*;
use log::*;
use sanakirja_core::{btree, MutPage, Page};
use std::borrow::Borrow;
/// A mutable transaction.
pub struct MutTxn<E: Borrow<Env<Exclusive>>, T> {
env: E,
parent: T,
length: u64,
free: Page,
rc: Option<btree::Db<Self, u64, (), btree::page::Page<u64, ()>>>,
/// Offsets of pages that were allocated by this transaction, and
/// have not been freed since.
occupied_owned_pages: Vec<MutPage>,
/// Offsets of pages that were allocated by this transaction, and
/// then freed.
free_owned_pages: Vec<u64>,
/// Offsets of old pages freed by this transaction. These were
/// *not* allocated by this transaction.
free_pages: Vec<u64>,
roots: [u64; 508],
}
impl<E: Borrow<Env<Exclusive>>, T> Drop for MutTxn<E, T> {
fn drop(&mut self) {
let env = self.env.borrow();
unsafe {
let root = env.root.lock();
env.roots[*root].unlock_exclusive()
}
}
}
/// Transactions that can be committed.
pub trait Commit {
/// Commit the transaction.
fn commit(self) -> Result<(), Error>;
}
impl<'a, E: Borrow<Env<Exclusive>>, T> Commit for MutTxn<E, &'a mut MutTxn<E, T>> {
fn commit(self) -> Result<(), Error> {
self.parent.length = self.length;
self.parent.free = self.free;
self.parent
.occupied_owned_pages
.extend(self.occupied_owned_pages.iter().cloned());
self.parent
.free_owned_pages
.extend(self.free_owned_pages.iter());
self.parent.free_pages.extend(self.free_pages.iter());
for (u, v) in self.roots.iter().enumerate() {
if *v != 0 {
self.parent.roots[u] = *v
}
}
Ok(())
}
}
impl Env<Exclusive> {
/// Start a mutable transaction. Mutable transactions that go out
/// of scope are automatically aborted.
pub fn mut_txn_begin<E: Borrow<Self>>(env: E) -> Result<MutTxn<E, ()>, Error> {
unsafe {
let (header, free, rc) = {
let env_ = env.borrow();
let maps = env_.mmaps.lock()[0].ptr;
let v = env_.root.lock();
let n = env_.roots.len();
env_.roots[(*v + 1) % n].lock_exclusive();
debug!("{:?} {:?}", v, env_.roots.len());
// Copy the roots of the last transaction onto this one.
let page_ptr = maps.offset((*v * PAGE_SIZE) as isize);
let next_page_ptr = maps.offset((((*v + 1) % n) * PAGE_SIZE) as isize);
std::ptr::copy_nonoverlapping(page_ptr, next_page_ptr, PAGE_SIZE);
let header = GlobalHeader::from_le(&*(page_ptr as *const GlobalHeader));
let free = Page {
data: env_.find_offset(header.free_db),
offset: header.free_db,
};
let rc = if header.rc_db == 0 {
None
} else {
Some(btree::Db {
db: Page {
offset: header.rc_db,
data: env_.find_offset(header.rc_db),
},
marker: std::marker::PhantomData,
})
};
(header, free, rc)
};
Ok(MutTxn {
env,
parent: (),
rc,
length: if header.length == 0 {
(PAGE_SIZE as u64) * (header.n_roots as u64)
} else {
header.length
},
free,
occupied_owned_pages: Vec::with_capacity(100),
free_owned_pages: Vec::new(),
free_pages: Vec::new(),
roots: [0; 508],
})
}
}
}
impl<E: Borrow<Env<Exclusive>>> Commit for MutTxn<E, ()> {
fn commit(mut self) -> Result<(), Error> {
unsafe {
let mut free_db: btree::Db<Self, u64, (), btree::page::Page<u64, ()>> = btree::Db {
db: std::mem::replace(
&mut self.free,
Page {
data: std::ptr::null_mut(),
offset: 0,
},
),
marker: std::marker::PhantomData,
};
while !self.free_owned_pages.is_empty() || !self.free_pages.is_empty() {
let mut free_owned_pages =
std::mem::replace(&mut self.free_owned_pages, Vec::new());
let mut free_pages = std::mem::replace(&mut self.free_pages, Vec::new());
for p in free_owned_pages.drain(..).chain(free_pages.drain(..)) {
btree::put(&mut self, &mut free_db, &p, &())?;
}
}
for p in self.occupied_owned_pages.iter_mut() {
p.clear_dirty();
}
let env = self.env.borrow();
let mut maps = env.mmaps.lock();
let root = env.root.lock();
let globptr = maps[0].ptr.offset((PAGE_SIZE * *root) as isize) as *mut GlobalHeader;
(&mut *globptr).length = self.length.to_le();
(&mut *globptr).free_db = free_db.db.offset.to_le();
// Moving the root page, both on page 0, and on the environment.
let globptr = maps[0].ptr as *mut GlobalHeader;
(&mut *globptr).root = ((&mut *globptr).root + 1) % (&mut *globptr).n_roots;
let mut root = env.root.lock();
*root = (*root + 1) % env.roots.len();
for m in maps.iter_mut() {
m.flush()?
}
*env.first_unused_page.lock() = self.length;
Ok(())
}
}
}
impl<E: Borrow<Env<Exclusive>>, T> MutTxn<E, T> {
pub fn set_root(&mut self, num: usize, value: u64) {
self.roots[num] = value;
}
fn free_owned_page(&mut self, offset: u64) {
self.free_owned_pages.push(offset);
}
fn free_page(&mut self, offset: u64) {
self.free_pages.push(offset)
}
/// Pop a free page from the list of free pages.
fn free_pages_pop(&mut self) -> Result<Option<u64>, crate::Error> {
if self.free.offset == 0 {
return Ok(None);
}
let mut db: btree::Db<Self, u64, (), btree::page::Page<u64, ()>> = btree::Db {
db: self.free,
marker: std::marker::PhantomData,
};
let mut curs = btree::cursor::Cursor::new(&db);
let f = if let Some((f, ())) = curs.set_last(self)? {
*f
} else {
return Ok(None);
};
btree::del_at_cursor(self, &mut db, &mut curs)?;
self.free = db.db;
Ok(Some(f))
}
}
impl<E: Borrow<Env<Exclusive>>, T> sanakirja_core::AllocPage for MutTxn<E, T> {
/// Allocate a single page.
fn alloc_page(&mut self) -> Result<MutPage, Error> {
// If we have allocated and freed a page in this transaction, use it first.
if let Some(page) = self.free_owned_pages.pop() {
let page = MutPage(Page {
data: unsafe { self.env.borrow().find_offset(page) },
offset: page,
});
self.occupied_owned_pages.push(page);
Ok(page)
} else {
// Else, if there are free pages, take one.
if let Some(page) = self.free_pages_pop()? {
let page = MutPage(Page {
data: unsafe { self.env.borrow().find_offset(page) },
offset: page,
});
self.occupied_owned_pages.push(page);
Ok(page)
} else {
// Else, allocate in the free space.
debug!("allocate in the free space {:?}", self.length);
let last = self.length;
self.length += PAGE_SIZE as u64;
let page = MutPage(Page {
data: unsafe { self.env.borrow().find_offset(last) },
offset: last,
});
self.occupied_owned_pages.push(page);
Ok(page)
}
}
}
fn decr_rc(&mut self, off: u64) -> Result<(), Error> {
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(&rc_);
curs.set(self, Some((&off, None)))?;
if let Some((rc, ())) = curs.next(self)? {
let off_ = *rc & !0xfff;
if off_ == off {
let r = *rc & 0xfff;
if r > 2 {
*rc = off_ | (r - 1);
} else {
btree::del(self, &mut rc_, rc, None)?;
}
self.rc = Some(rc_);
return Ok(());
}
}
}
self.free_page(off);
Ok(())
}
fn decr_rc_owned(&mut self, off: u64) -> Result<(), Error> {
let mut rc_del = 0;
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(&rc_);
if let Some((rc, _)) = curs.set(self, Some((&off, None)))? {
let off_ = *rc & !0xfff;
if off_ == off {
let r = *rc & 0xfff;
if r > 2 {
*rc = off_ | (r - 1);
self.rc = Some(rc_);
return Ok(());
} else {
rc_del = *rc;
}
}
}
if rc_del != 0 {
btree::del(self, &mut rc_, &rc_del, None)?;
}
self.rc = Some(rc_);
}
self.free_owned_page(off);
Ok(())
}
fn incr_rc(&mut self, off: u64) -> Result<(), Error> {
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(&rc_);
if let Some((rc, _)) = curs.set(self, Some((&off, None)))? {
let off_ = *rc & !0xfff;
if off_ == off {
let r = *rc & 0xfff;
*rc = off_ | (r + 1);
} else {
btree::put(self, &mut rc_, &(off | 2), &())?;
}
} else {
btree::put(self, &mut rc_, &(off | 2), &())?;
}
self.rc = Some(rc_)
}
Ok(())
}
}
impl<E: Borrow<Env<Exclusive>>, A> sanakirja_core::LoadPage for MutTxn<E, A> {
type Error = crate::Error;
fn load_page(&self, off: u64) -> Result<Page, Self::Error> {
unsafe {
let data = self.env.borrow().find_offset(off);
Ok(Page { data, offset: off })
}
}
fn rc(&self, page: u64) -> Result<u64, Self::Error> {
if let Some(ref rc) = self.rc {
let mut curs: btree::cursor::Cursor<Self, u64, (), btree::page::Page<u64, ()>> =
btree::cursor::Cursor::new(rc);
if let Some((rc, _)) = curs.set(self, Some((&page, None)))? {
if *rc & !0xfff == page {
let r = *rc & 0xfff;
if r >= 2 {
return Ok(r);
}
}
}
}
Ok(0)
}
}
use crate::Error;
use parking_lot::lock_api::RawRwLock;
use parking_lot::Mutex;
use std::borrow::Borrow;
use std::path::Path;
use sanakirja_core::Page;
mod muttxn;
pub use muttxn::*;
mod global_header;
use global_header::*;
pub(crate) const PAGE_SIZE: usize = 4096;
const CURRENT_VERSION: u16 = 3;
#[derive(Debug)]
pub(crate) struct Map {
pub(crate) ptr: *mut u8,
#[cfg(feature = "mmap")]
file: Option<std::fs::File>,
#[cfg(feature = "mmap")]
mmap: memmap::MmapMut,
#[cfg(not(feature = "mmap"))]
layout: std::alloc::Layout,
length: u64,
}
impl Map {
#[cfg(feature = "mmap")]
fn flush(&self) -> Result<(), Error> {
Ok(self.mmap.flush()?)
}
#[cfg(not(feature = "mmap"))]
fn flush(&self) -> Result<(), Error> {
Ok(())
}
}
/// Represents an exclusive lock taken on the environment.
pub struct Exclusive(std::fs::File);
#[cfg(feature = "mmap")]
impl Drop for Exclusive {
fn drop(&mut self) {
self.0.unlock().unwrap_or(());
}
}
/// Represents a shared lock taken on the environment.
pub struct Shared(std::fs::File);
#[cfg(feature = "mmap")]
impl Drop for Shared {
fn drop(&mut self) {
self.0.unlock().unwrap_or(());
}
}
// Lock order: first take thread locks, then process locks.
// Why are there two synchronization mechanisms?
// Because we would need to upgrade the read lock into a write lock,
// and there is no real way to do this with standard mechanisms.
// So, we take a mutex to make sure no other mutable transaction can start,
// and then at the time of writing, we also take the RwLock.
/// Environment, required to start any transactions. Thread-safe, but
/// opening the same database several times in the same process is not
/// cross-platform.
pub struct Env<T> {
#[cfg(feature = "mmap")]
path: Option<PathBuf>,
/// It is undefined behavior to have a file mmapped for than once.
#[cfg(feature = "mmap")]
lock_file: Option<T>,
#[cfg(not(feature = "mmap"))]
lock_file: std::marker::PhantomData<T>,
pub(crate) mmaps: Mutex<Vec<Map>>,
first_unused_page: Mutex<u64>,
roots: Vec<parking_lot::RawRwLock>,
root: Mutex<usize>,
}
unsafe impl<T> Send for Env<T> {}
unsafe impl<T> Sync for Env<T> {}
#[cfg(feature = "mmap")]
impl<T> Drop for Env<T> {
fn drop(&mut self) {
for map in self.mmaps.lock().unwrap().drain(..) {
drop(map.mmap);
drop(map.file);
}
}
}
#[cfg(not(feature = "mmap"))]
impl<T> Drop for Env<T> {
fn drop(&mut self) {
let mut mmaps = self.mmaps.lock();
for map in mmaps.drain(..) {
unsafe { std::alloc::dealloc(map.ptr, map.layout) }
}
}
}
/// An immutable transaction.
pub struct Txn<L, E: Borrow<Env<L>>> {
env: E,
lock: std::marker::PhantomData<L>,
root: usize,
}
impl<T> Env<T> {
/// std::fs::File size of the database path, if it exists.
pub fn file_size<P: AsRef<Path>>(path: P) -> Result<u64, Error> {
let db_path = path.as_ref().join("db");
Ok(std::fs::metadata(&db_path)?.len())
}
/// Same as [`new`](#new), but does not take a lock on the file
/// system.
///
/// This method is provided because waiting for a lock on the file
/// system may block the whole process, whereas.
///
/// However, the database is very likely to get corrupted if open
/// more than once at the same time, even within the same process.
///
/// Therefore, do not use this method without another locking
/// mechanism in place to avoid that situation.
#[cfg(feature = "mmap")]
pub unsafe fn new_nolock<P: AsRef<Path>>(path: P, length: u64) -> Result<Self, Error> {
// let length = (1 as u64).shl(log_length);
let mut db_path = path.as_ref().join("db0");
let db_exists = std::fs::metadata(&db_path).is_ok();
let length = if let Ok(meta) = std::fs::metadata(&db_path) {
std::cmp::max(meta.len(), length)
} else {
std::cmp::max(length, 4096)
};
let length = length.next_power_of_two();
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&db_path)?;
file.set_len(length)?;
let mmap = memmap::MmapMut::map_mut(&file)?;
let mut env = Self::new_nolock_mmap(Some(file), length, mmap, !db_exists)?;
db_path.pop();
env.path = Some(db_path);
Ok(env)
}
#[cfg(feature = "mmap")]
unsafe fn new_nolock_mmap(
file: Option<std::fs::File>,
length: u64,
mut mmap: memmap::MmapMut,
initialise: bool,
) -> Result<Self, Error> {
let map = mmap.as_mut_ptr();
let n_roots = 2;
let glob = if initialise {
for i in 0..n_roots {
*(map.offset(i * PAGE_SIZE) as *mut GlobalHeader) = GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots,
crc: 0,
length: n_roots * PAGE_SIZE,
free_db: 0,
rc_db: 0,
};
}
GlobalHeader::from_le(*(map as *const GlobalHeader))
} else {
GlobalHeader::from_le(*(map as *const GlobalHeader))
};
let mut versions = Vec::with_capacity(glob.n_versions as usize);
for _ in 0..glob.n_versions {
versions.push(Mutex::new(()))
}
let env = Env {
path: None,
mmaps: Mutex::new(vec![Map {
ptr: map,
mmap,
file,
length,
}]),
lock_file: None,
first_unused_page: Mutex::new(2),
mutable: parking_lot::RawMutex::INIT,
versions,
};
Ok(env)
}
#[cfg(not(feature = "mmap"))]
unsafe fn new_nolock_mmap(length: u64, initialise: bool) -> Result<Env<Exclusive>, Error> {
assert!(initialise);
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = std::alloc::alloc(layout);
let n_roots = 2;
for i in 0..n_roots {
*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots: n_roots as u8,
crc: 0,
length: n_roots as u64 * PAGE_SIZE as u64,
free_db: 0,
rc_db: 0,
})
.to_le();
}
let glob = GlobalHeader::from_le(&*(map as *const GlobalHeader));
let mut roots = Vec::with_capacity(glob.n_roots as usize);
for _ in 0..glob.n_roots {
roots.push(<parking_lot::RawRwLock as parking_lot::lock_api::RawRwLock>::INIT)
}
let env = Env {
mmaps: Mutex::new(vec![Map {
ptr: map,
layout,
length,
}]),
lock_file: std::marker::PhantomData,
first_unused_page: Mutex::new(2),
roots,
root: Mutex::new(glob.root as usize),
};
Ok(env)
}
}
impl Env<Shared> {
/// Initialize an environment. `length` must be a strictly
/// positive multiple of 4096. The same file can only be open in
/// one process or thread at the same time, and this is enforced
/// by a locked file.
#[cfg(feature = "mmap")]
pub fn new_shared<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Shared>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.lock_shared()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Shared(lock_file));
Ok(env)
}
/// Initialize an environment. `length` must be a strictly
/// positive multiple of 4096. Returns an error if the database is
/// locked by another process or thread.
#[cfg(feature = "mmap")]
pub fn try_new_shared<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Shared>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.try_lock_shared()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Shared(lock_file));
Ok(env)
}
}
impl Env<Exclusive> {
/// Initialize an environment. `length` must be a strictly
/// positive multiple of 4096. The same file can only be open in
/// one process or thread at the same time, and this is enforced
/// by a locked file.
#[cfg(feature = "mmap")]
pub fn new<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Exclusive>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.lock_exclusive()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Exclusive(lock_file));
Ok(env)
}
/// Initialize an environment. `length` must be a strictly
/// positive multiple of 4096. Returns an error if the database is
/// locked by another process or thread.
#[cfg(feature = "mmap")]
pub fn try_new<P: AsRef<Path>>(path: P, length: u64) -> Result<Env<Exclusive>, Error> {
let lock_file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(path.as_ref().join("db").with_extension("lock"))?;
lock_file.try_lock_exclusive()?;
let mut env = unsafe { Self::new_nolock(path, length)? };
env.lock_file = Some(Exclusive(lock_file));
Ok(env)
}
/// Create a new anonymous database, backed by memory. The length
/// is the total size in bytes of the database.
#[cfg(feature = "mmap")]
pub fn new_anon(length: u64) -> Result<Env<Exclusive>, Error> {
let length = std::cmp::max(length, 4096).next_power_of_two();
let mmap = memmap::MmapMut::map_anon(length as usize)?;
unsafe { Self::new_nolock_mmap(None, length, mmap, true) }
}
/// Create a new anonymous database, backed by memory. The length
/// is the total size in bytes of the database.
#[cfg(not(feature = "mmap"))]
pub fn new_anon(length: u64) -> Result<Env<Exclusive>, Error> {
let length = std::cmp::max(length, 4096).next_power_of_two();
unsafe { Self::new_nolock_mmap(length, true) }
}
}
impl<L> Env<L> {
#[cfg(not(feature = "mmap"))]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
let layout = std::alloc::Layout::from_size_align(length as usize, 64).unwrap();
let map = unsafe { std::alloc::alloc(layout) };
Ok(Map {
ptr: map,
layout,
length,
})
}
#[cfg(feature = "mmap")]
fn open_mmap(&self, i: usize, length0: u64) -> Result<Map, Error> {
let length = length0 << i;
if let Some(ref path) = self.path {
let mut db_path = path.join(&format!("db{}", i));
let file = OpenOptions::new()
.read(true)
.write(true)
.truncate(false)
.create(true)
.open(&db_path)?;
file.set_len(length)?;
db_path.pop();
let mut mmap = unsafe { memmap::MmapMut::map_mut(&file)? };
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
file: Some(file),
length,
})
} else {
let mut mmap = memmap::MmapMut::map_anon(length as usize)?;
Ok(Map {
ptr: mmap.as_mut_ptr(),
mmap,
file: None,
length,
})
}
}
unsafe fn find_offset(&self, mut offset: u64) -> *mut u8 {
let mut i = 0;
let mut mmaps = self.mmaps.lock();
loop {
if i >= mmaps.len() {
let length0 = mmaps[0].length;
mmaps.push(self.open_mmap(i, length0).unwrap());
}
if offset < mmaps[i].length {
return mmaps[i].ptr.offset(offset as isize);
}
offset -= mmaps[i].length;
i += 1
}
}
/// Close this repository, releasing the locks. It is undefined
/// behaviour to use the environment afterwards. This method can
/// be used for instance to release the locks before allocating a
/// new environment (note that `std::mem::replace` followed by
/// `Drop::drop` of the previous value would not release the locks
/// in the correct order).
///
/// The safe alternative to this method is to use an `Option<Env>`
/// instead of an `Env`.
#[cfg(feature = "mmap")]
pub unsafe fn close(&mut self) {
for m in self.mmaps.lock().unwrap().drain(..) {
drop(m.mmap);
}
if let Some(lock_file) = self.lock_file.take() {
drop(lock_file)
}
}
/// Close this repository.
///
/// The safe alternative to this method is to use an `Option<Env>`
/// instead of an `Env`.
#[cfg(not(feature = "mmap"))]
pub unsafe fn close(&mut self) {
let mut mmaps = self.mmaps.lock();
for m in mmaps.drain(..) {
std::alloc::dealloc(m.ptr, m.layout)
}
}
}
impl<L> Env<L> {
/// Start a read-only transaction.
pub fn txn_begin<E: Borrow<Self>>(env: E) -> Result<Txn<L, E>, Error> {
let root = {
let env_ = env.borrow();
let root = env_.root.lock();
let root = (*root + env_.roots.len() - 1) % env_.roots.len();
env_.roots[root].lock_shared();
root
};
Ok(Txn {
env,
lock: std::marker::PhantomData,
root,
})
}
}
impl<L, E: Borrow<Env<L>>> Drop for Txn<L, E> {
fn drop(&mut self) {
let env = self.env.borrow();
unsafe { env.roots[self.root].unlock_shared() }
}
}
impl<L, E: Borrow<Env<L>>> sanakirja_core::LoadPage for Txn<L, E> {
type Error = crate::Error;
/// Find the appropriate map segment
fn load_page(&self, off: u64) -> Result<Page, Self::Error> {
unsafe {
let data = self.env.borrow().find_offset(off);
check_crc(data)?;
Ok(Page { data, offset: off })
}
}
fn rc(&self, _: u64) -> Result<u64, Self::Error> {
Ok(0)
}
}
#[cfg(feature = "crc32")]
unsafe fn check_crc(p: *const u8) -> Result<(), crate::CRCError> {
let crc = u32::from_le(*(p as *const u32));
let mut h = crc32fast::Hasher::new();
let data = std::slice::from_raw_parts(p.offset(4), PAGE_SIZE - 4);
h.update(data);
let crc_ = h.finalize();
if crc_ == crc {
Ok(())
} else {
Err(CRCError {})
}
}
#[cfg(not(feature = "crc32"))]
unsafe fn check_crc(_: *const u8) -> Result<(), crate::CRCError> {
Ok(())
}
#[derive(Debug)]
pub struct GlobalHeader {
/// Version of Sanakirja
pub version: u16,
/// Which page is currently the root page? (only valid for page 0)
pub root: u8,
pub n_roots: u8,
/// CRC of this page
pub crc: u32,
/// First free page at the end of the file (only valid for page 0)
pub length: u64,
/// Offset of the free list
pub free_db: u64,
/// Offset of the RC db,
pub rc_db: u64,
}
impl GlobalHeader {
pub fn from_le(&self) -> Self {
GlobalHeader {
version: u16::from_le(self.version),
root: self.root,
n_roots: self.n_roots,
crc: u32::from_le(self.crc),
free_db: u64::from_le(self.free_db),
length: u64::from_le(self.length),
rc_db: u64::from_le(self.rc_db),
}
}
pub fn to_le(&self) -> Self {
GlobalHeader {
version: self.version.to_le(),
root: self.root,
n_roots: self.n_roots,
crc: self.crc.to_le(),
free_db: self.free_db.to_le(),
length: self.length.to_le(),
rc_db: self.rc_db.to_le(),
}
}
}
[package]
name = "sanakirja"
version = "0.1.0"
authors = [ "Pierre-Étienne Meunier" ]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
parking_lot = "*"
thiserror = "*"
log = { version = "*", features = [ "release_max_level_off" ] }
env_logger = "*"
lmdb-rs = "*"
sanakirja-core = { path = "../sanakirja-core", version = "*" }
[workspace]
members = [ "sanakirja-core", "sanakirja" ]