use super::*;
use crate::btree::del::*;
use crate::btree::put::*;
use core::cmp::Ordering;
pub(super) mod header;
mod alloc;
pub(super) mod cursor;
mod put;
pub(super) mod rebalance;
use alloc::*;
use cursor::*;
use header::*;
use rebalance::*;
#[derive(Debug)]
pub struct Page<K: ?Sized, V: ?Sized> {
k: core::marker::PhantomData<K>,
v: core::marker::PhantomData<V>,
}
#[async_trait]
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
for Page<K, V>
{
fn is_empty(c: &Self::Cursor) -> bool {
c.cur >= c.total as isize
}
fn is_init(c: &Self::Cursor) -> bool {
c.cur < 0
}
type Cursor = PageCursor;
fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
PageCursor::new(p, -1)
}
fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
PageCursor::after(p)
}
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
PageCursor {
cur: 0,
total: c.cur.max(0) as usize,
is_leaf: c.is_leaf,
},
*c,
)
}
fn move_next(c: &mut Self::Cursor) -> bool {
if c.cur < c.total as isize {
c.cur += 1;
true
} else {
false
}
}
fn move_prev(c: &mut Self::Cursor) -> bool {
if c.cur > 0 {
c.cur -= 1;
true
} else {
c.cur = -1;
false
}
}
async fn current<'a, T: LoadPage>(
txn: &T,
page: crate::Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)> {
if c.cur < 0 || c.cur >= c.total as isize {
None
} else if c.is_leaf {
unsafe {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
(Const(k as *const u8), Const(v as *const u8))
};
Some((
K::from_raw_ptr(txn, k).await,
V::from_raw_ptr(txn, v).await,
0,
))
}
} else {
unsafe {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().add((off & 0xfff) as usize));
(Const(k as *const u8), Const(v as *const u8))
};
Some((
K::from_raw_ptr(txn, k).await,
V::from_raw_ptr(txn, v).await,
off & !0xfff,
))
}
}
}
fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur >= 0);
if c.is_leaf {
0
} else {
assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8 - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur < c.total as isize);
if c.is_leaf {
0
} else {
assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
async fn set_cursor<'a, 'b, T: LoadPage>(
txn: &T,
page: crate::Page<'b>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize> {
unsafe {
match lookup(txn, page, c, k0, v0).await {
Ok(n) => {
c.cur = n as isize;
if c.is_leaf {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
let k = Const(k as *const u8);
let v = Const(v as *const u8);
(k, v)
};
Ok((
K::from_raw_ptr(txn, k).await,
V::from_raw_ptr(txn, v).await,
0,
))
} else {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize & 0xfff));
(Const(k as *const u8), Const(v as *const u8))
};
Ok((
K::from_raw_ptr(txn, k).await,
V::from_raw_ptr(txn, v).await,
off & !0xfff,
))
}
}
Err(n) => {
c.cur = n as isize;
Err(n)
}
}
}
}
}
async unsafe fn lookup<
'a,
K: UnsizedStorable + ?Sized,
V: UnsizedStorable + ?Sized,
T: LoadPage,
>(
txn: &T,
page: crate::Page<'a>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<usize, usize> {
let hdr = header(page);
c.total = hdr.n() as usize;
c.is_leaf = hdr.is_leaf();
if c.is_leaf {
lookup_leaf(txn, page, k0, v0, hdr).await
} else {
lookup_internal(txn, page, k0, v0, hdr).await
}
}
async fn binary_search_by<'a, T, F, O>(s: &'a [T], mut f: F) -> Result<usize, usize>
where
F: FnMut(&'a T) -> O,
O: core::future::Future<Output = Ordering>,
{
use Ordering::*;
let mut size = s.len();
let mut left = 0;
let mut right = size;
while left < right {
let mid = left + size / 2;
let cmp = f(unsafe { s.get_unchecked(mid) }).await;
if cmp == Less {
left = mid + 1;
} else if cmp == Greater {
right = mid;
} else {
return Ok(mid);
}
size = right - left;
}
Err(left)
}
async unsafe fn lookup_internal<
'a,
K: UnsizedStorable + ?Sized,
V: UnsizedStorable + ?Sized,
T: LoadPage,
>(
txn: &T,
page: crate::Page<'a>,
k0: &K,
v0: Option<&V>,
hdr: &header::Header,
) -> Result<usize, usize> {
let s =
core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u64, hdr.n() as usize);
if let Some(v0) = v0 {
binary_search_by(s, |&off| async move {
let off = u64::from_le(off) & 0xfff;
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));
(Const(k), Const(v))
};
let k = K::from_raw_ptr(txn, k).await;
match k.compare(txn, k0).await {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v).await;
v.compare(txn, v0).await
}
cmp => cmp,
}
})
.await
} else {
match binary_search_by(s, |&off| async move {
let off = u64::from_le(off) & 0xfff;
let k = Const(read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff)).0);
let k = K::from_raw_ptr(txn, k).await;
k.compare(txn, k0).await
})
.await
{
Err(i) => Err(i),
Ok(mut i) => {
while i > 0 {
let off = u64::from_le(s[i - 1]) & 0xfff;
let k = Const(read::<K, V>(page.data.as_ptr().offset(off as isize)).0);
let k = K::from_raw_ptr(txn, k).await;
if let Ordering::Equal = k.compare(txn, k0).await {
i -= 1
} else {
break;
}
}
Ok(i)
}
}
}
}
async unsafe fn lookup_leaf<
'a,
K: UnsizedStorable + ?Sized,
V: UnsizedStorable + ?Sized,
T: LoadPage,
>(
txn: &T,
page: crate::Page<'a>,
k0: &K,
v0: Option<&V>,
hdr: &header::Header,
) -> Result<usize, usize> {
let s =
core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u16, hdr.n() as usize);
if let Some(v0) = v0 {
binary_search_by(s, |&off| async move {
let off = u16::from_le(off);
let (k, v) = {
let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize));
(Const(k as *const u8), Const(v as *const u8))
};
let k = K::from_raw_ptr(txn, k).await;
match k.compare(txn, k0).await {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v).await;
v.compare(txn, v0).await
}
cmp => cmp,
}
})
.await
} else {
match binary_search_by(s, |&off| async move {
let off = u16::from_le(off);
let k = Const(read::<K, V>(page.data.as_ptr().offset(off as isize)).0);
let k = K::from_raw_ptr(txn, k).await;
k.compare(txn, k0).await
})
.await
{
Err(e) => Err(e),
Ok(mut i) => {
while i > 0 {
let off = u16::from_le(s[i - 1]);
let k = Const(read::<K, V>(page.data.as_ptr().offset(off as isize)).0);
let k = K::from_raw_ptr(txn, k).await;
if let Ordering::Equal = k.compare(txn, k0).await {
i -= 1
} else {
break;
}
}
Ok(i)
}
}
}
}
#[async_trait]
impl<
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
> super::BTreeMutPage<K, V> for Page<K, V>
{
fn init(page: &mut MutPage) {
let h = header_mut(page);
h.init();
}
type Saved = (*const K, *const V);
fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
(k as *const K, v as *const V)
}
unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
(&*s.0, &*s.1)
}
async fn put<'a, T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
c: &PageCursor,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
debug_assert!(c.cur >= 0);
debug_assert!(k1v1.is_none() || replace);
if r == 0 {
put::put::<_, _, _, Leaf>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
0,
0,
)
.await
} else {
put::put::<_, _, _, Internal>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
l,
r,
)
.await
}
}
async unsafe fn put_mut<T: AllocPage>(
txn: &mut T,
page: &mut MutPage,
c: &mut Self::Cursor,
k0: &K,
v0: &V,
r: u64,
) {
let mut n = c.cur;
if r == 0 {
Leaf::alloc_write(txn, page, k0, v0, 0, r, &mut n).await;
} else {
Internal::alloc_write(txn, page, k0, v0, 0, r, &mut n).await;
}
c.total += 1;
}
unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
async fn update_left_child<T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
c: &Self::Cursor,
l: u64,
) -> Result<crate::btree::put::Ok, T::Error> {
assert!(!c.is_leaf);
let freed;
let page = if mutable && page.is_dirty() {
freed = 0;
MutPage(page)
} else {
let mut new = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
let l = header(page.as_page()).left_page() & !0xfff;
let hdr = header_mut(&mut new);
hdr.set_left_page(l);
let s = Internal::offset_slice(page.as_page());
clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
freed = page.offset | if page.is_dirty() { 1 } else { 0 };
new
};
assert!(c.cur >= 0);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
Ok(Ok { page, freed })
}
async fn del<T: AllocPage>(
txn: &mut T,
page: crate::CowPage,
mutable: bool,
c: &PageCursor,
l: u64,
) -> Result<(MutPage, u64), T::Error> {
debug_assert!(c.cur >= 0 && c.cur < c.total as isize);
if mutable && page.is_dirty() {
let p = page.data;
let mut page = MutPage(page);
let hdr = header_mut(&mut page);
let n = hdr.n();
if c.is_leaf {
unsafe {
let ptr = p.add(HDR + c.cur as usize * 2) as *mut u16;
let off = u16::from_le(*ptr);
assert_eq!(off & 0xf000, 0);
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));
}
} else {
unsafe {
let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
let off = u64::from_le(*ptr);
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
if l > 0 {
let p = ptr.offset(-1);
*p = (l | (u64::from_le(*p) & 0xfff)).to_le();
}
hdr.decr(8 + entry_size::<K, V>(p.add((off & 0xfff) as usize)));
}
};
hdr.set_n(n - 1);
Ok((page, 0))
} else {
let mut new = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
if c.is_leaf {
let s = Leaf::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
} else {
let hdr = header(page.as_page());
let left = hdr.left_page() & !0xfff;
unsafe {
*(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();
}
let s = Internal::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
if l > 0 {
unsafe {
let off = (new.0.data.add(HDR) as *mut u64).offset(n - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
}
clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
}
Ok((new, page.offset))
}
}
async fn merge_or_rebalance<'a, T: AllocPage>(
txn: &mut T,
m: Concat<'a, K, V, Self>,
) -> Result<Op<'a, T, K, V>, T::Error> {
let mid_size = if m.modified.c0.is_leaf {
2 + alloc_size::<K, V>(m.mid.0, m.mid.1)
} else {
8 + alloc_size::<K, V>(m.mid.0, m.mid.1)
};
let mod_size = size::<K, V>(&m.modified);
let occupied = {
let hdr = header(m.other.as_page());
(hdr.left_page() & 0xfff) as usize
};
if mod_size + mid_size + occupied <= PAGE_SIZE {
return if m.modified.c0.is_leaf {
merge::<_, _, _, _, Leaf>(txn, m).await
} else {
merge::<_, _, _, _, Internal>(txn, m).await
};
}
let first_other = PageCursor::new(&m.other, 0);
let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);
if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {
if let Some((k, v)) = m.modified.ins {
return Ok(Op::Put(
Self::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
)
.await?,
));
} else if m.modified.skip_first {
debug_assert!(m.modified.ins2.is_none());
let (page, freed) = Self::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
)
.await?;
return Ok(Op::Put(Put::Ok(Ok { page, freed })));
} else {
let mut c1 = m.modified.c1.clone();
let mut l = m.modified.l;
if l == 0 {
Self::move_next(&mut c1);
l = m.modified.r;
}
return Ok(Op::Put(Put::Ok(
Self::update_left_child(txn, m.modified.page, m.modified.mutable, &c1, l)
.await?,
)));
}
}
if m.mod_is_left {
if m.modified.c0.is_leaf {
rebalance_left::<_, _, _, Leaf>(txn, m).await
} else {
rebalance_left::<_, _, _, Internal>(txn, m).await
}
} else {
rebalance_right::<_, _, _, Self>(txn, m).await
}
}
}
fn size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
m: &ModifiedPage<K, V, Page<K, V>>,
) -> usize {
let mut total = {
let hdr = header(m.page.as_page());
(hdr.left_page() & 0xfff) as usize
};
total += HDR;
let extra = if m.c1.is_leaf { 2 } else { 8 };
if let Some((k, v)) = m.ins {
total += extra + alloc_size(k, v) as usize;
if let Some((k, v)) = m.ins2 {
total += extra + alloc_size(k, v) as usize;
}
}
if m.skip_first {
total -= current_size::<K, V>(m.page.as_page(), &m.c1) as usize;
}
total
}
fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
let al = K::ALIGN.max(V::ALIGN);
(s + al - 1) & !(al - 1)
}
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page,
c: &PageCursor,
) -> usize {
if c.is_leaf {
Leaf::current_size::<K, V>(page, c.cur)
} else {
Internal::current_size::<K, V>(page, c.cur)
}
}
#[async_trait]
pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {
async fn alloc_write<T: AllocPage>(
txn: &mut T,
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
);
fn set_left_child(new: &mut MutPage, n: isize, l: u64);
}
async fn modify<
'a,
K: core::fmt::Debug + ?Sized,
V: core::fmt::Debug + ?Sized,
P: BTreeMutPage<K, V>,
L: AllocWrite<K, V>,
T: AllocPage,
>(
txn: &mut T,
new: &mut MutPage,
m: &mut ModifiedPage<'a, K, V, P>,
n: &mut isize,
) {
let mut l = P::left_child(m.page.as_page(), &m.c0);
while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut m.c0).await {
L::alloc_write(txn, new, k, v, l, r, n).await;
l = 0;
}
let mut rr = m.r;
if let Some((k, v)) = m.ins {
if let Some((k2, v2)) = m.ins2 {
L::alloc_write(txn, new, k, v, l, m.l, n).await;
L::alloc_write(txn, new, k2, v2, 0, m.r, n).await;
} else if m.l > 0 {
L::alloc_write(txn, new, k, v, m.l, m.r, n).await;
} else {
L::alloc_write(txn, new, k, v, l, m.r, n).await;
}
l = 0;
rr = 0;
} else if m.l > 0 {
l = m.l
}
let mut c1 = m.c1.clone();
if m.skip_first {
P::move_next(&mut c1);
}
while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut c1).await {
if rr > 0 {
L::alloc_write(txn, new, k, v, l, rr, n).await;
rr = 0;
} else {
L::alloc_write(txn, new, k, v, l, r, n).await;
}
l = 0;
}
if l != 0 {
L::set_left_child(new, *n, l)
}
}
pub(super) async fn merge<
'a,
T: AllocPage + LoadPage,
K: ?Sized + core::fmt::Debug,
V: ?Sized + core::fmt::Debug,
P: BTreeMutPage<K, V>,
L: AllocWrite<K, V>,
>(
txn: &mut T,
mut m: Concat<'a, K, V, P>,
) -> Result<Op<'a, T, K, V>, T::Error> {
let mut new = unsafe { txn.alloc_page().await? };
P::init(&mut new);
let mut n = 0;
if m.mod_is_left {
modify::<_, _, _, L, _>(txn, &mut new, &mut m.modified, &mut n).await;
let mut rc = P::cursor_first(&m.other);
let l = P::left_child(m.other.as_page(), &rc);
L::alloc_write(txn, &mut new, m.mid.0, m.mid.1, 0, l, &mut n).await;
while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc).await {
L::alloc_write(txn, &mut new, k, v, 0, r, &mut n).await;
}
} else {
let mut rc = P::cursor_first(&m.other);
let mut l = P::left_child(m.other.as_page(), &rc);
while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc).await {
L::alloc_write(txn, &mut new, k, v, l, r, &mut n).await;
l = 0;
}
L::alloc_write(txn, &mut new, m.mid.0, m.mid.1, 0, 0, &mut n).await;
modify::<_, _, _, L, _>(txn, &mut new, &mut m.modified, &mut n).await;
}
let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
let b1 = if m.other.is_dirty() { 1 } else { 0 };
Ok(Op::Merged {
page: new,
freed: [m.modified.page.offset | b0, m.other.offset | b1],
marker: core::marker::PhantomData,
})
}
fn clone<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
page: crate::Page,
new: &mut MutPage,
s: Offsets<L::Offset>,
n: &mut isize,
) {
for off in s.0.iter() {
let (r, off): (u64, usize) = (*off).into();
unsafe {
let ptr = page.data.as_ptr().add(off);
let size = entry_size::<K, V>(ptr);
let hdr = header_mut(new);
let data = hdr.data() as u16;
let off_new = data - size as u16;
hdr.set_data(off_new);
hdr.set_n(hdr.n() + 1);
if hdr.is_leaf() {
hdr.incr(2 + size);
let ptr = new.0.data.offset(HDR as isize + *n * 2) as *mut u16;
*ptr = (off_new as u16).to_le();
} else {
hdr.incr(8 + size);
let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
*ptr = (r | off_new as u64).to_le();
}
core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
}
*n += 1;
}
}