KK3SBH4P3FBZ3ER344TO6WEK7EX5AFB57UX2G5PKANG3NUF5IQLAC
//! This crate defines tools to implement datastructures that can live
//! in main memory or on-disk, meaning that their natural habitat is
//! memory-mapped files, but if that environment is threatened, they
//! might seek refuge in lower-level environments.
//!
//! One core building block of this library is the notion of virtual
//! memory pages, which are allocated and freed by an
//! externally-provided allocator (see how the `sanakirja` crate does
//! this). The particular implementation used here is meant to allow a
//! transactional system with readers reading the structures
//! concurrently with one writer at a time.
//!
//! At the moment, only B trees are implemented, as well as the
//! following general traits:
//!
//! - [`LoadPage`] is a trait used to get a pointer to a page. In the
//! most basic version, this may just return a pointer to the file,
//! offset by the requested offset. In more sophisticated versions,
//! this can be used to encrypt and compress pages.
//! - [`AllocPage`] allocates and frees pages, because as
//! datastructures need to be persisted on disk, we can't rely on
//! Rust's memory management to do it for us. Users of this crate
//! don't have to worry about this though.
//!
//! Moreover, two other traits can be used to store things on pages:
//! [`Storable`] is a simple trait that all `Sized + Ord` types
//! without references can readily implement (the [`direct_repr!`]
//! macro does that). For types containing references to pages
//! allocated in the database, the comparison function can be
//! customised. Moreover, these types must supply an iterator over
//! these references, in order for reference-counting to work properly
//! when the datastructures referencing these types are forked.
//!
//! Dynamically-sized types, or types that need to be represented in a
//! dynamically-sized way, can use the [`UnsizedStorable`] format.
use async_trait::async_trait;
pub mod btree;
/// There's a hard-coded assumption that pages have 4K bytes. This is
/// true for normal memory pages on almost all platforms.
pub const PAGE_SIZE: usize = 4096;
/// Types that can be stored on disk. This trait may be used in
/// conjunction with `Sized` in order to determine the on-disk size,
/// or with [`UnsizedStorable`] when special arrangements are needed.
#[async_trait(?Send)]
pub trait Storable: Send + Sync + core::fmt::Debug {
/// This is required for B trees, not necessarily for other
/// structures. The default implementation panics.
fn compare(&self, _b: &Self) -> core::cmp::Ordering {
unimplemented!()
}
/// If this value is an offset to another page at offset `offset`,
/// return `Some(offset)`. Return `None` else.
fn page_references(&self) -> Self::PageReferences;
/// If this value is an offset to another page at offset `offset`,
/// return `Some(offset)`. Return `None` else.
async fn drop<T: AllocPage>(&self, txn: &mut T) -> Result<(), T::Error> {
for p in self.page_references() {
txn.decr_rc(p).await?;
}
Ok(())
}
/// An iterator over the offsets to pages contained in this
/// value. Only values from this crate can generate non-empty
/// iterators, but combined values (like tuples) must chain the
/// iterators returned by method `page_offsets`.
type PageReferences: Iterator<Item = u64> + Send;
}
/// A macro to implement [`Storable`] on "plain" types,
/// i.e. fixed-sized types that are `repr(C)` and don't hold
/// references.
#[macro_export]
macro_rules! direct_repr {
($t: ty) => {
impl Storable for $t {
type PageReferences = core::iter::Empty<u64>;
fn page_references(&self) -> Self::PageReferences {
core::iter::empty()
}
fn compare(&self, b: &Self) -> core::cmp::Ordering {
self.cmp(b)
}
}
impl UnsizedStorable for $t {
const ALIGN: usize = core::mem::align_of::<$t>();
/// If `Self::SIZE.is_some()`, this must return the same
/// value. The default implementation is `Self;:SIZE.unwrap()`.
fn size(&self) -> usize {
core::mem::size_of::<Self>()
}
/// Read the size from an on-page entry.
unsafe fn onpage_size(_: *const u8) -> usize {
core::mem::size_of::<Self>()
}
/// Write to a page. Must not overwrite the allocated size, but
/// this isn't checked (which is why it's unsafe).
unsafe fn write_to_page(&self, p: *mut u8) {
core::ptr::copy_nonoverlapping(self, p as *mut Self, 1)
}
unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {
&*(p as *const Self)
}
}
};
}
direct_repr!(());
direct_repr!(u8);
direct_repr!(i8);
direct_repr!(u16);
direct_repr!(i16);
direct_repr!(u32);
direct_repr!(i32);
direct_repr!(u64);
direct_repr!(i64);
direct_repr!([u8; 16]);
#[cfg(feature = "std")]
extern crate std;
#[cfg(feature = "std")]
direct_repr!(std::net::Ipv4Addr);
#[cfg(feature = "std")]
direct_repr!(std::net::Ipv6Addr);
#[cfg(feature = "std")]
direct_repr!(std::net::IpAddr);
#[cfg(feature = "std")]
direct_repr!(std::net::SocketAddr);
#[cfg(feature = "std")]
direct_repr!(std::time::SystemTime);
#[cfg(feature = "std")]
direct_repr!(std::time::Duration);
#[cfg(feature = "uuid")]
direct_repr!(uuid::Uuid);
#[cfg(feature = "ed25519")]
direct_repr!(ed25519_zebra::VerificationKeyBytes);
/// Types that can be stored on disk.
pub trait UnsizedStorable: Storable {
const ALIGN: usize;
/// If `Self::SIZE.is_some()`, this must return the same
/// value. The default implementation is `Self;:SIZE.unwrap()`.
fn size(&self) -> usize;
/// Read the size from an on-page entry. If `Self::SIZE.is_some()`
/// this must be the same value.
unsafe fn onpage_size(_: *const u8) -> usize;
/// Write to a page. Must not overwrite the allocated size, but
/// this isn't checked (which is why it's unsafe).
unsafe fn write_to_page(&self, p: *mut u8);
unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self;
}
impl Storable for [u8] {
type PageReferences = core::iter::Empty<u64>;
fn page_references(&self) -> Self::PageReferences {
core::iter::empty()
}
fn compare(&self, b: &Self) -> core::cmp::Ordering {
self.cmp(b)
}
}
impl UnsizedStorable for [u8] {
const ALIGN: usize = 2;
fn size(&self) -> usize {
2 + self.len()
}
unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {
let len = u16::from_le(*(p as *const u16));
assert_ne!(len, 0);
assert_eq!(len & 0xf000, 0);
core::slice::from_raw_parts(p.add(2), len as usize)
}
unsafe fn onpage_size(p: *const u8) -> usize {
let len = u16::from_le(*(p as *const u16));
2 + len as usize
}
unsafe fn write_to_page(&self, p: *mut u8) {
assert!(self.len() <= 510);
*(p as *mut u16) = (self.len() as u16).to_le();
core::ptr::copy_nonoverlapping(self.as_ptr(), p.add(2), self.len())
}
}
unsafe fn read<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
k: *const u8,
) -> (*const u8, *const u8) {
let s = K::onpage_size(k);
let v = k.add(s);
let al = v.align_offset(V::ALIGN);
let v = v.add(al);
(k, v)
}
unsafe fn entry_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
k: *const u8,
) -> usize {
assert_eq!(k.align_offset(K::ALIGN), 0);
let ks = K::onpage_size(k);
// next multiple of va, assuming va is a power of 2.
let v_off = (ks + V::ALIGN - 1) & !(V::ALIGN - 1);
let v_ptr = k.add(v_off);
let vs = V::onpage_size(v_ptr);
let ka = K::ALIGN.max(V::ALIGN);
let size = v_off + vs;
(size + ka - 1) & !(ka - 1)
}
/// Representation of a mutable or shared page. This is an owned page
/// (like `Vec` in Rust's std), but we do not know whether we can
/// mutate it or not.
///
/// The least-significant bit of the first byte of each page is 1 if
/// and only if the page was allocated by the current transaction (and
/// hence isn't visible to any other transaction, meaning we can write
/// on it).
#[derive(Debug)]
#[repr(C)]
pub struct CowPage {
pub data: *mut u8,
pub offset: u64,
}
/// Representation of a borrowed, or immutable page, like a slice in
/// Rust.
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct Page<'a> {
pub data: &'a [u8; PAGE_SIZE],
pub offset: u64,
}
impl CowPage {
/// Borrows the page.
pub fn as_page(&self) -> Page {
Page {
data: unsafe { &*(self.data as *const [u8; PAGE_SIZE]) },
offset: self.offset,
}
}
#[cfg(feature = "crc32")]
pub fn crc(&self, hasher: &crc32fast::Hasher) -> u32 {
let mut hasher = hasher.clone();
hasher.reset();
// Hash the beginning and the end of the page (i.e. remove
// the CRC).
unsafe {
// Remove the dirty bit.
let x = [(*self.data) & 0xfe];
hasher.update(&x[..]);
hasher.update(core::slice::from_raw_parts(self.data.add(1), 3));
hasher.update(core::slice::from_raw_parts(
self.data.add(8),
PAGE_SIZE - 8,
));
}
hasher.finalize()
}
#[cfg(feature = "crc32")]
pub fn crc_check(&self, hasher: &crc32fast::Hasher) -> bool {
let crc = unsafe { u32::from_le(*(self.data as *const u32).add(1)) };
self.crc(hasher) == crc
}
}
/// An owned page on which we can write. This is just a wrapper around
/// `CowPage` to avoid checking the "dirty" bit at runtime.
#[derive(Debug)]
pub struct MutPage(pub CowPage);
#[cfg(not(feature = "crc32"))]
pub fn clear_dirty(data: &mut [u8]) {
data[0] &= 0xfe
}
#[cfg(feature = "crc32")]
pub fn clear_dirty(data: &mut [u8], hasher: &crc32fast::Hasher) {
unsafe {
data[0] &= 0xfe;
let crc = (self.0.data.as_mut_ptr() as *mut u32).add(1);
*crc = self.0.crc(hasher)
}
}
impl MutPage {
#[cfg(not(feature = "crc32"))]
pub fn clear_dirty(&mut self) {
unsafe { *self.0.data &= 0xfe }
}
#[cfg(feature = "crc32")]
pub fn clear_dirty(&mut self, hasher: &crc32fast::Hasher) {
unsafe {
*self.0.data &= 0xfe;
let crc = (self.0.data as *mut u32).add(1);
*crc = self.0.crc(hasher)
}
}
}
unsafe impl Sync for CowPage {}
unsafe impl Send for CowPage {}
impl CowPage {
/// Checks the dirty bit of a page.
pub fn is_dirty(&self) -> bool {
unsafe { (*self.data) & 1 != 0 }
}
}
/// Trait for loading a page.
#[async_trait(?Send)]
pub trait LoadPage: Send + Sync {
type Error;
/// Loading a page.
async fn load_page(&self, off: u64) -> Result<CowPage, Self::Error>;
/// Reference-counting. Since reference-counts are designed to be
/// storable into B trees by external allocators, pages referenced
/// once aren't stored, and hence are indistinguishable from pages
/// that are never referenced. The default implementation returns
/// 0.
///
/// This has the extra benefit of requiring less disk space, and
/// isn't more unsafe than storing the reference count, since we
/// aren't supposed to hold a reference to a page with "logical
/// RC" 0, so storing "1" for that page would be redundant anyway.
async fn rc(&self, _off: u64) -> Result<u64, Self::Error> {
Ok(0)
}
}
/// Trait for allocating and freeing pages.
#[async_trait(?Send)]
pub trait AllocPage: LoadPage {
/// Allocate a new page.
async fn alloc_page(&mut self) -> Result<MutPage, Self::Error>;
/// Increment the page's reference count.
async fn incr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;
/// Decrement the page's reference count, assuming the page was
/// first allocated by another transaction. If the RC reaches 0,
/// free the page. Must return the new RC (0 if freed).
async fn decr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;
/// Same as [`Self::decr_rc`], but for pages allocated by the current
/// transaction. This is an important distinction, as pages
/// allocated by the current transaction can be reused immediately
/// after being freed.
async fn decr_rc_owned(&mut self, off: u64) -> Result<usize, Self::Error>;
}
//! Insertions into a B tree.
use super::cursor::*;
use super::*;
/// The representation of the update to a page.
#[derive(Debug)]
pub struct Ok {
/// The new page, possibly the same as the previous version.
pub page: MutPage,
/// The freed page, or 0 if no page was freed.
pub freed: u64,
}
/// The result of an insertion, i.e. either an update or a split.
#[derive(Debug)]
pub enum Put<'a, K: ?Sized, V: ?Sized> {
Ok(Ok),
Split {
split_key: &'a K,
split_value: &'a V,
left: MutPage,
right: MutPage,
freed: u64,
},
}
/// Insert an entry into a database, returning `false` if and only if
/// the exact same entry (key *and* value) was already in the
/// database.
pub async fn put_async<
T: AsyncAllocPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
key: &K,
value: &V,
) -> Result<bool, T::Error> {
// Set the cursor to the first element larger than or equal to
// `(key, value)`. We will insert at that position (where "insert"
// is meant in the sense of Rust's `Vec::insert`.
let mut cursor = Cursor::async_new(txn, db).await?;
if cursor.async_set(txn, key, Some(value)).await?.is_some() {
// If `(key, value)` already exists in the tree, return.
return Ok(false);
}
// Else, we are at a leaf, since cursors that don't find an
// element go all the way to the leaves.
//
// We put on the leaf page, resulting in a `Put`, i.e. either
// `Put::Ok` or `Put::Split`.
let p = cursor.len(); // Save the position of the leaf cursor.
let is_owned = p < cursor.first_rc_len();
// Insert the key and value at the leaf, i.e. pop the top level of
// the stack (the leaf) and insert in there.
let cur = cursor.pop().unwrap();
let put = P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
key,
value,
None,
0,
0,
)?;
// In both cases (`Ok` and `Split`), we need to walk up the tree
// and update each node.
// Moreover, since the middle elements of the splits may be on
// pages that must be freed at the end of this call, we collect
// them in the `free` array below, and free them only at the end
// of this function.
//
// If we didn't hold on to these pages, they could be reallocated
// in subsequent splits, and the middle element could be
// lost. This is important when the middle element climbs up more
// than one level (i.e. the middle element of the split of a leaf
// is also the middle element of the split of the leaf's parent,
// etc).
let mut free = [0; N_CURSORS];
db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;
for f in &free[..p] {
if *f & 1 != 0 {
txn.decr_rc_owned((*f) ^ 1)?;
} else if *f > 0 {
txn.decr_rc(*f)?;
}
}
Ok(true)
}
async fn put_cascade<
'a,
T: AsyncAllocPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
mut put: Put<'a, K, V>,
free: &mut [u64; N_CURSORS],
) -> Result<MutPage, T::Error> {
loop {
match put {
Put::Split {
split_key,
split_value,
left,
right,
freed,
} => {
// Here, we are copying all children of the freed
// page, except possibly the last freed page (which is
// a child of the current node, if we aren't at a
// leaf). This includes the split key/value, also
// incremented in the following call:
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
// Then, insert the split key/value in the relevant page:
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(cur) = cursor.pop() {
// In this case, there's a page above the page
// that just split (since we can pop the stack),
// so the page that just split isn't the root (but
// `cur` might be).
put = P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
)?;
} else {
// No page above the split, so the root has just
// split. Insert the split key/value into a new
// page above the entire tree.
let mut p = txn.alloc_page()?;
let cursor = P::cursor_first(&p.0);
P::init(&mut p);
if let Put::Ok(p) = P::put(
txn,
p.0,
true,
false,
&cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
)? {
// Return the new root.
return Ok(p.page);
} else {
unreachable!()
}
}
}
Put::Ok(Ok { page, freed }) => {
// Same as above: increment the relevant reference
// counters.
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
// And update the left child of the current cursor,
// since the main invariant of cursors is that we're
// always visiting the left child (if we're visiting
// the last child of a page, the cursor is set to the
// position strictly after the entries).
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(curs) = cursor.pop() {
// If we aren't at the root, just update the child.
put = Put::Ok(P::update_left_child(
txn,
curs.page,
is_owned,
&curs.cursor,
page.0.offset,
)?)
} else {
// If we are at the root, `page` is the updated root.
return Ok(page);
}
}
}
}
}
/// This function does all the memory management for `put`.
fn incr_descendants<
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
free: &mut [u64; N_CURSORS],
mut freed: u64,
) -> Result<(), T::Error> {
// The freed page is on the page below.
if cursor.len() < cursor.first_rc_len() {
// If a page has split or was immutable (allocated by a
// previous transaction) and has been updated, we need to free
// the old page.
free[cursor.len()] = freed;
} else {
// This means that the new version of the page (either split
// or not) contains references to all the children of the
// freed page, except the last freed page (because the new
// version of that page isn't shared).
let cur = cursor.cur();
// Start a cursor at the leftmost position and increase the
// leftmost child page's RC (if we aren't at a leaf, and if
// that child isn't the last freed page).
let mut c = P::cursor_first(&cur.page);
let left = P::left_child(cur.page.as_page(), &c);
if left != 0 {
if left != (freed & !1) {
txn.incr_rc(left)?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
// Then, for each entry of the page, increase the RCs of
// references contained in the keys and values, and increase
// the RC of the right child.
while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o)?;
}
if r != 0 {
if r != (freed & !1) {
txn.incr_rc(r)?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
}
// Else, the "freed" page is shared with another tree, and
// hence we just need to decrement its RC.
if freed > 0 && cursor.len() == cursor.first_rc_len() {
free[cursor.len()] = freed;
}
}
Ok(())
}
//! Insertions into a B tree.
use super::cursor::*;
use super::*;
/// The representation of the update to a page.
#[derive(Debug)]
pub struct Ok {
/// The new page, possibly the same as the previous version.
pub page: MutPage,
/// The freed page, or 0 if no page was freed.
pub freed: u64,
}
/// The result of an insertion, i.e. either an update or a split.
#[derive(Debug)]
pub enum Put<'a, K: ?Sized, V: ?Sized> {
Ok(Ok),
Split {
split_key: &'a K,
split_value: &'a V,
left: MutPage,
right: MutPage,
freed: u64,
},
}
/// Insert an entry into a database, returning `false` if and only if
/// the exact same entry (key *and* value) was already in the
/// database.
pub async fn put<
T: AllocPage,
K: Storable + ?Sized + Sync,
V: Storable + ?Sized + Sync,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
key: &K,
value: &V,
) -> Result<bool, T::Error> {
// Set the cursor to the first element larger than or equal to
// `(key, value)`. We will insert at that position (where "insert"
// is meant in the sense of Rust's `Vec::insert`.
let mut cursor = Cursor::new(txn, db).await?;
if cursor.set(txn, key, Some(value)).await?.is_some() {
// If `(key, value)` already exists in the tree, return.
return Ok(false);
}
// Else, we are at a leaf, since cursors that don't find an
// element go all the way to the leaves.
//
// We put on the leaf page, resulting in a `Put`, i.e. either
// `Put::Ok` or `Put::Split`.
let p = cursor.len(); // Save the position of the leaf cursor.
let is_owned = p < cursor.first_rc_len();
// Insert the key and value at the leaf, i.e. pop the top level of
// the stack (the leaf) and insert in there.
let cur = cursor.pop().unwrap();
let put = P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
key,
value,
None,
0,
0,
).await?;
// In both cases (`Ok` and `Split`), we need to walk up the tree
// and update each node.
// Moreover, since the middle elements of the splits may be on
// pages that must be freed at the end of this call, we collect
// them in the `free` array below, and free them only at the end
// of this function.
//
// If we didn't hold on to these pages, they could be reallocated
// in subsequent splits, and the middle element could be
// lost. This is important when the middle element climbs up more
// than one level (i.e. the middle element of the split of a leaf
// is also the middle element of the split of the leaf's parent,
// etc).
let mut free = [0; N_CURSORS];
db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;
for f in &free[..p] {
if *f & 1 != 0 {
txn.decr_rc_owned((*f) ^ 1).await?;
} else if *f > 0 {
txn.decr_rc(*f).await?;
}
}
Ok(true)
}
async fn put_cascade<
'a,
T: AllocPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
mut put: Put<'a, K, V>,
free: &mut [u64; N_CURSORS],
) -> Result<MutPage, T::Error> {
loop {
match put {
Put::Split {
split_key,
split_value,
left,
right,
freed,
} => {
// Here, we are copying all children of the freed
// page, except possibly the last freed page (which is
// a child of the current node, if we aren't at a
// leaf). This includes the split key/value, also
// incremented in the following call:
incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
// Then, insert the split key/value in the relevant page:
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(cur) = cursor.pop() {
// In this case, there's a page above the page
// that just split (since we can pop the stack),
// so the page that just split isn't the root (but
// `cur` might be).
put = P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
).await?;
} else {
// No page above the split, so the root has just
// split. Insert the split key/value into a new
// page above the entire tree.
let mut p = txn.alloc_page().await?;
let cursor = P::cursor_first(&p.0);
P::init(&mut p);
if let Put::Ok(p) = P::put(
txn,
p.0,
true,
false,
&cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
).await? {
// Return the new root.
return Ok(p.page);
} else {
unreachable!()
}
}
}
Put::Ok(Ok { page, freed }) => {
// Same as above: increment the relevant reference
// counters.
incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
// And update the left child of the current cursor,
// since the main invariant of cursors is that we're
// always visiting the left child (if we're visiting
// the last child of a page, the cursor is set to the
// position strictly after the entries).
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(curs) = cursor.pop() {
// If we aren't at the root, just update the child.
put = Put::Ok(P::update_left_child(
txn,
curs.page,
is_owned,
&curs.cursor,
page.0.offset,
).await?)
} else {
// If we are at the root, `page` is the updated root.
return Ok(page);
}
}
}
}
}
/// This function does all the memory management for `put`.
async fn incr_descendants<
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
free: &mut [u64; N_CURSORS],
mut freed: u64,
) -> Result<(), T::Error> {
// The freed page is on the page below.
if cursor.len() < cursor.first_rc_len() {
// If a page has split or was immutable (allocated by a
// previous transaction) and has been updated, we need to free
// the old page.
free[cursor.len()] = freed;
} else {
// This means that the new version of the page (either split
// or not) contains references to all the children of the
// freed page, except the last freed page (because the new
// version of that page isn't shared).
let cur = cursor.cur();
// Start a cursor at the leftmost position and increase the
// leftmost child page's RC (if we aren't at a leaf, and if
// that child isn't the last freed page).
let mut c = P::cursor_first(&cur.page);
let left = P::left_child(cur.page.as_page(), &c);
if left != 0 {
if left != (freed & !1) {
txn.incr_rc(left).await?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
// Then, for each entry of the page, increase the RCs of
// references contained in the keys and values, and increase
// the RC of the right child.
while let Some((k, v, r)) = P::next(cur.page.as_page(), &mut c) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
if r != 0 {
if r != (freed & !1) {
txn.incr_rc(r).await?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
}
// Else, the "freed" page is shared with another tree, and
// hence we just need to decrement its RC.
if freed > 0 && cursor.len() == cursor.first_rc_len() {
free[cursor.len()] = freed;
}
}
Ok(())
}
//! Implementation of B tree pages for Unsized types, or types with an
//! dynamically-sized representation (for example enums with widely
//! different sizes).
//!
//! This module follows the same organisation as the sized
//! implementation, and contains types shared between the two
//! implementations.
//!
//! The types that can be used with this implementation must implement
//! the [`UnsizedStorable`] trait, which essentially replaces the
//! [`core::mem`] functions for determining the size and alignment of
//! values.
//!
//! One key difference is the implementation of leaves (internal nodes
//! have the same format): indeed, in this implementation, leaves have
//! almost the same format as internal nodes, except that their
//! offsets are written on the page as little-endian-encoded [`u16`]
//! (with only the 12 LSBs used, i.e. 4 bits unused).
use super::*;
use crate::btree::del::*;
use crate::btree::put::*;
use core::cmp::Ordering;
// The header is the same as for the sized implementation, so we share
// it here.
pub(super) mod header;
// Like in the sized implementation, we have the same three submodules.
mod alloc;
// This is a common module with the sized implementation.
pub(super) mod cursor;
mod put;
pub(super) mod rebalance;
use alloc::*;
use cursor::*;
use header::*;
use rebalance::*;
#[derive(Debug)]
pub struct Page<K: ?Sized, V: ?Sized> {
k: core::marker::PhantomData<K>,
v: core::marker::PhantomData<V>,
}
// Many of these functions are the same as in the Sized implementations.
#[async_trait(?Send)]
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
for Page<K, V>
{
fn is_empty(c: &Self::Cursor) -> bool {
c.cur >= c.total as isize
}
fn is_init(c: &Self::Cursor) -> bool {
c.cur < 0
}
type Cursor = PageCursor;
fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
PageCursor::new(p, -1)
}
fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
PageCursor::after(p)
}
// Split a cursor, returning two cursors `(a, b)` where b is the
// same as `c`, and `a` is a cursor running from the first element
// of the page to `c` (excluding `c`).
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
PageCursor {
cur: 0,
total: c.cur.max(0) as usize,
is_leaf: c.is_leaf,
},
*c,
)
}
fn move_next(c: &mut Self::Cursor) -> bool {
if c.cur < c.total as isize {
c.cur += 1;
true
} else {
false
}
}
fn move_prev(c: &mut Self::Cursor) -> bool {
if c.cur > 0 {
c.cur -= 1;
true
} else {
c.cur = -1;
false
}
}
// This function is the same as the sized implementation for
// internal nodes, since the only difference between leaves and
// internal nodes in this implementation is the size of offsets (2
// bytes for leaves, 8 bytes for internal nodes).
fn current<'a>(
page: crate::Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)> {
if c.cur < 0 || c.cur >= c.total as isize {
None
} else if c.is_leaf {
unsafe {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));
let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
Some((
K::from_raw_ptr(k as *const u8),
V::from_raw_ptr(v as *const u8),
0,
))
}
} else {
unsafe {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));
let (k, v) = read::<K, V>(page.data.as_ptr().add((off & 0xfff) as usize));
Some((
K::from_raw_ptr(k as *const u8),
V::from_raw_ptr(v as *const u8),
off & !0xfff,
))
}
}
}
// These methods are the same as in the sized implementation.
fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur >= 0);
if c.is_leaf {
0
} else {
assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8 - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur < c.total as isize);
if c.is_leaf {
0
} else {
assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);
let off = unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
async fn set_cursor<'a, 'b, T: LoadPage>(
_txn: &'a T,
page: crate::Page<'b>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize> {
unsafe {
// `lookup` has the same semantic as
// `core::slice::binary_search`, i.e. it returns either
// `Ok(n)`, where `n` is the position where `(k0, v0)` was
// found, or `Err(n)` where `n` is the position where
// `(k0, v0)` can be inserted to preserve the order.
match lookup(page, c, k0, v0).await {
Ok(n) => {
c.cur = n as isize;
if c.is_leaf {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
Ok((K::from_raw_ptr(k), V::from_raw_ptr(v), 0))
} else {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
let (k, v) =
read::<K, V>(page.data.as_ptr().add(off as usize & 0xfff));
Ok((
K::from_raw_ptr(k),
V::from_raw_ptr(v),
off & !0xfff,
))
}
}
Err(n) => {
c.cur = n as isize;
Err(n)
}
}
}
}
}
// There quite some duplicated code in the following function, because
// we're forming a slice of offsets, and the using the core library's
// `binary_search_by` method on slices.
async unsafe fn lookup<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<usize, usize> {
let hdr = header(page);
c.total = hdr.n() as usize;
c.is_leaf = hdr.is_leaf();
if c.is_leaf {
lookup_leaf(page, k0, v0, hdr)
} else {
lookup_internal(page, k0, v0, hdr)
}
}
unsafe fn lookup_internal<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
k0: &K,
v0: Option<&V>,
hdr: &header::Header,
) -> Result<usize, usize> {
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u64,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(k);
match k.compare(k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(v);
v.compare(v0)
}
cmp => cmp,
}
})
} else {
match s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(k);
k.compare(k0)
}) {
Err(i) => Err(i),
Ok(mut i) => {
// Rewind if there are multiple matching keys.
while i > 0 {
let off = u64::from_le(s[i-1]) & 0xfff;
let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(k);
if let Ordering::Equal = k.compare(k0) {
i -= 1
} else {
break
}
}
Ok(i)
}
}
}
}
unsafe fn lookup_leaf<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
k0: &K,
v0: Option<&V>,
hdr: &header::Header,
) -> Result<usize, usize> {
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u16,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(k as *const u8);
match k.compare(k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(v as *const u8);
v.compare(v0)
}
cmp => cmp,
}
})
} else {
match s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(k);
k.compare(k0)
}) {
Err(e) => Err(e),
Ok(mut i) => {
// Rewind if there are multiple matching keys.
while i > 0 {
let off = u16::from_le(s[i-1]);
let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(k);
if let Ordering::Equal = k.compare(k0){
i -= 1
} else {
break
}
}
Ok(i)
}
}
}
}
#[async_trait(?Send)]
impl<
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
> super::BTreeMutPage<K, V> for Page<K, V>
{
// The init function is straightforward.
fn init(page: &mut MutPage) {
let h = header_mut(page);
h.init();
}
// Deletions in internal nodes of a B tree need to replace the
// deleted value with a value from a leaf.
//
// In this implementation of pages, we never actually wipe any
// data from pages, we're even only appending data, or cloning the
// pages to compact them. Therefore, raw pointers to leaves are
// always valid for as long as the page isn't freed, which can
// only happen at the very last step of an insertion or deletion.
type Saved = (*const K, *const V);
fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
(k as *const K, v as *const V)
}
unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
(&*s.0, &*s.1)
}
// As in the sized implementation, `put` is split into its own submodule.
async fn put<'a, T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
c: &PageCursor,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
debug_assert!(c.cur >= 0);
debug_assert!(k1v1.is_none() || replace);
if r == 0 {
put::put::<_, _, _, Leaf>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
0,
0,
).await
} else {
put::put::<_, _, _, Internal>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
l,
r,
).await
}
}
unsafe fn put_mut(
page: &mut MutPage,
c: &mut Self::Cursor,
k0: &K,
v0: &V,
r: u64,
) {
let mut n = c.cur;
if r == 0 {
Leaf::alloc_write(page, k0, v0, 0, r, &mut n);
} else {
Internal::alloc_write(page, k0, v0, 0, r, &mut n);
}
c.total += 1;
}
unsafe fn set_left_child(
page: &mut MutPage,
c: &Self::Cursor,
l: u64
) {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
// Update the left child of the entry pointed to by cursor `c`.
async fn update_left_child<T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
c: &Self::Cursor,
l: u64,
) -> Result<crate::btree::put::Ok, T::Error> {
assert!(!c.is_leaf);
let freed;
let page = if mutable && page.is_dirty() {
// If the page is dirty (allocated by this transaction)
// and isn't shared, just make it mutable.
freed = 0;
MutPage(page)
} else {
// Else, clone the page:
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
// Copy the left child
let l = header(page.as_page()).left_page() & !0xfff;
let hdr = header_mut(&mut new);
hdr.set_left_page(l);
// Copy all the entries
let s = Internal::offset_slice(page.as_page());
clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
// Mark the old version for freeing.
freed = page.offset | if page.is_dirty() { 1 } else { 0 };
new
};
// Finally, update the left children of the cursor. We know
// that all valid positions of a cursor except the leftmost
// one (-1) have a left child.
assert!(c.cur >= 0);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
Ok(Ok { page, freed })
}
// Here is how deletions work: if the page is dirty and mutable,
// we "unlink" the value by moving the end of the offset array to
// the left by one offset (2 bytes in leaves, 8 bytes in internal
// nodes).
async fn del<T: AllocPage>(
txn: &mut T,
page: crate::CowPage,
mutable: bool,
c: &PageCursor,
l: u64,
) -> Result<(MutPage, u64), T::Error> {
// Check that the cursor is at a valid position for a deletion.
debug_assert!(c.cur >= 0 && c.cur < c.total as isize);
if mutable && page.is_dirty() {
let p = page.data;
let mut page = MutPage(page);
let hdr = header_mut(&mut page);
let n = hdr.n();
if c.is_leaf {
unsafe {
let ptr = p.add(HDR + c.cur as usize * 2) as *mut u16;
// Get the offset in the page of the key/value data.
let off = u16::from_le(*ptr);
assert_eq!(off & 0xf000, 0);
// Erase the offset by shifting the last (`n -
// c.cur - 1`) offsets. The reasoning against
// "off-by-one errors" is that if the cursor is
// positioned on the first element (`c.cur == 0`),
// there are `n-1` elements to shift.
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
// Remove the size of the actualy key/value, plus
// 2 bytes (the offset).
hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));
}
} else {
unsafe {
let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
// Offset to the key/value data.
let off = u64::from_le(*ptr);
// Shift the offsets like in the leaf case above.
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
if l > 0 {
// In an internal node, we may have to update
// the left child of the current
// position. After the move, the current
// offset is at `ptr`, so we need to find the
// offset one step to the left.
let p = ptr.offset(-1);
*p = (l | (u64::from_le(*p) & 0xfff)).to_le();
}
// Remove the size of the key/value, plus 8 bytes
// (the offset).
hdr.decr(8 + entry_size::<K, V>(p.add((off & 0xfff) as usize)));
}
};
hdr.set_n(n - 1);
// Return the page, and 0 for "nothing was freed".
Ok((page, 0))
} else {
// If the page cannot be mutated, we allocate a new page and clone.
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
if c.is_leaf {
// In a leaf, this is just a matter of getting the
// offset slice, removing the current position and
// cloning.
let s = Leaf::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
} else {
// In an internal node, things are a bit trickier,
// since we might need to update the left child.
//
// First, clone the leftmost child of the page.
let hdr = header(page.as_page());
let left = hdr.left_page() & !0xfff;
unsafe {
*(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();
}
// Then, clone the first half of the page.
let s = Internal::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
// Update the left child, which was written by the
// call to `clone` as the right child of the last
// entry in `s0`.
if l > 0 {
unsafe {
let off = (new.0.data.add(HDR) as *mut u64).offset(n - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
}
// Then, clone the second half of the page.
clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
}
// Return the new page, and the offset of the freed page.
Ok((new, page.offset))
}
}
// Decide what to do with the concatenation of two neighbouring
// pages, with a middle element.
//
// This is highly similar to the sized case.
async fn merge_or_rebalance<'a, T: AllocPage>(
txn: &mut T,
m: Concat<'a, K, V, Self>,
) -> Result<Op<'a, T, K, V>, T::Error> {
// First evaluate the size of the middle element on a
// page. Contrarily to the sized case, the offsets are
// aligned, so the header is always the same size (no
// padding).
let mid_size = if m.modified.c0.is_leaf {
2 + alloc_size::<K, V>(m.mid.0, m.mid.1)
} else {
8 + alloc_size::<K, V>(m.mid.0, m.mid.1)
};
// Evaluate the size of the modified page of the concatenation
// (which includes the header).
let mod_size = size::<K, V>(&m.modified);
// Add the "occupied" size (which excludes the header).
let occupied = {
let hdr = header(m.other.as_page());
(hdr.left_page() & 0xfff) as usize
};
if mod_size + mid_size + occupied <= PAGE_SIZE {
// If the concatenation fits on a page, merge.
return if m.modified.c0.is_leaf {
merge::<_, _, _, _, Leaf>(txn, m).await
} else {
merge::<_, _, _, _, Internal>(txn, m).await
};
}
// If we can't merge, evaluate the size of the first binding
// on the other page, to see if we can rebalance.
let first_other = PageCursor::new(&m.other, 0);
let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);
// If the modified page is at least half-full, or if removing
// the first entry on the other page would make that other
// page less than half-full, don't rebalance. See the Sized
// implementation to see cases where this happens.
if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {
if let Some((k, v)) = m.modified.ins {
return Ok(Op::Put(Self::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
).await?));
} else if m.modified.skip_first {
debug_assert!(m.modified.ins2.is_none());
let (page, freed) = Self::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
).await?;
return Ok(Op::Put(Put::Ok(Ok { page, freed })));
} else {
let mut c1 = m.modified.c1.clone();
let mut l = m.modified.l;
if l == 0 {
Self::move_next(&mut c1);
l = m.modified.r;
}
return Ok(Op::Put(Put::Ok(Self::update_left_child(
txn,
m.modified.page,
m.modified.mutable,
&c1,
l,
).await?)));
}
}
// Finally, if we're here, we can rebalance. There are four
// (relatively explicit) cases, see the `rebalance` submodule
// to see how this is done.
if m.mod_is_left {
if m.modified.c0.is_leaf {
rebalance_left::<_, _, _, Leaf>(txn, m).await
} else {
rebalance_left::<_, _, _, Internal>(txn, m).await
}
} else {
rebalance_right::<_, _, _, Self>(txn, m).await
}
}
}
/// Size of a modified page (including the header).
fn size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
m: &ModifiedPage<K, V, Page<K, V>>,
) -> usize {
let mut total = {
let hdr = header(m.page.as_page());
(hdr.left_page() & 0xfff) as usize
};
total += HDR;
// Extra size for the offsets.
let extra = if m.c1.is_leaf { 2 } else { 8 };
if let Some((k, v)) = m.ins {
total += extra + alloc_size(k, v) as usize;
if let Some((k, v)) = m.ins2 {
total += extra + alloc_size(k, v) as usize;
}
}
if m.skip_first {
total -= current_size::<K, V>(m.page.as_page(), &m.c1) as usize;
}
total
}
// Size of a pair of type `(K, V)`. This is computed in the same way
// as a struct with fields of type `K` and `V` in C.
fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
let al = K::ALIGN.max(V::ALIGN);
(s + al - 1) & !(al - 1)
}
// Total size of the entry for that cursor position, including the
// offset size.
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page,
c: &PageCursor,
) -> usize {
if c.is_leaf {
Leaf::current_size::<K, V>(page, c.cur)
} else {
Internal::current_size::<K, V>(page, c.cur)
}
}
pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize);
fn set_left_child(new: &mut MutPage, n: isize, l: u64);
}
/// Perform the modifications on a page, by copying it onto page `new`.
fn modify<K: core::fmt::Debug + ?Sized, V: core::fmt::Debug + ?Sized, P: BTreeMutPage<K, V>, L: AllocWrite<K, V>>(
new: &mut MutPage,
m: &mut ModifiedPage<K, V, P>,
n: &mut isize,
) {
let mut l = P::left_child(m.page.as_page(), &m.c0);
while let Some((k, v, r)) = P::next(m.page.as_page(), &mut m.c0) {
L::alloc_write(new, k, v, l, r, n);
l = 0;
}
let mut rr = m.r;
if let Some((k, v)) = m.ins {
if let Some((k2, v2)) = m.ins2 {
L::alloc_write(new, k, v, l, m.l, n);
L::alloc_write(new, k2, v2, 0, m.r, n);
} else if m.l > 0 {
L::alloc_write(new, k, v, m.l, m.r, n);
} else {
L::alloc_write(new, k, v, l, m.r, n);
}
l = 0;
rr = 0;
} else if m.l > 0 {
l = m.l
}
let mut c1 = m.c1.clone();
if m.skip_first {
P::move_next(&mut c1);
}
// Here's a confusing thing: if the first element of `c1` is the
// last element of a page, we may be updating its right child (in
// which case m.r > 0) rather than its left child like for all
// other elements.
//
// This case only ever happens for this function when we're
// updating the last child of a page p, and then merging p with
// its right neighbour.
while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
if rr > 0 {
L::alloc_write(new, k, v, l, rr, n);
rr = 0;
} else {
L::alloc_write(new, k, v, l, r, n);
}
l = 0;
}
if l != 0 {
// The while loop above didn't run, i.e. the insertion
// happened at the end of the page. In this case, we haven't
// had a chance to update the left page, so do it now.
L::set_left_child(new, *n, l)
}
}
/// The very unsurprising `merge` function
pub(super) async fn merge<
'a,
T: AllocPage + LoadPage,
K: ?Sized + core::fmt::Debug,
V: ?Sized + core::fmt::Debug,
P: BTreeMutPage<K, V>,
L: AllocWrite<K, V>,
>(
txn: &mut T,
mut m: Concat<'a, K, V, P>,
) -> Result<Op<'a, T, K, V>, T::Error> {
// Here, we first allocate a page, then clone both pages onto it,
// in a different order depending on whether the modified page is
// the left or the right child.
//
// Note that in the case that this merge happens immediately after
// a put that reallocated the two sides of the merge in order to
// split (not all splits do that), we could be slightly more
// efficient, but with considerably more code.
let mut new = txn.alloc_page().await?;
P::init(&mut new);
let mut n = 0;
if m.mod_is_left {
modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);
let mut rc = P::cursor_first(&m.other);
let l = P::left_child(m.other.as_page(), &rc);
L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, l, &mut n);
while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {
L::alloc_write(&mut new, k, v, 0, r, &mut n);
}
} else {
let mut rc = P::cursor_first(&m.other);
let mut l = P::left_child(m.other.as_page(), &rc);
while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {
L::alloc_write(&mut new, k, v, l, r, &mut n);
l = 0;
}
L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, 0, &mut n);
modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);
}
let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
let b1 = if m.other.is_dirty() { 1 } else { 0 };
Ok(Op::Merged {
page: new,
freed: [m.modified.page.offset | b0, m.other.offset | b1],
marker: core::marker::PhantomData,
})
}
fn clone<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
page: crate::Page,
new: &mut MutPage,
s: Offsets<L::Offset>,
n: &mut isize,
) {
for off in s.0.iter() {
let (r, off): (u64, usize) = (*off).into();
unsafe {
let ptr = page.data.as_ptr().add(off);
let size = entry_size::<K, V>(ptr);
// Reserve the space on the page
let hdr = header_mut(new);
let data = hdr.data() as u16;
let off_new = data - size as u16;
hdr.set_data(off_new);
hdr.set_n(hdr.n() + 1);
if hdr.is_leaf() {
hdr.incr(2 + size);
let ptr = new.0.data.offset(HDR as isize + *n * 2) as *mut u16;
*ptr = (off_new as u16).to_le();
} else {
hdr.incr(8 + size);
// Set the offset to this new entry in the offset
// array, along with the right child page.
let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
*ptr = (r | off_new as u64).to_le();
}
core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
}
*n += 1;
}
}
use super::cursor::*;
use super::*;
// The implementation here is mostly the same as in the sized case,
// except for leaves, which makes it hard to refactor.
pub(crate) async fn rebalance_left<
'a,
T: AllocPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc,
>(
txn: &mut T,
m: Concat<'a, K, V, Page<K, V>>,
) -> Result<Op<'a, T, K, V>, T::Error> {
assert!(m.mod_is_left);
// First element of the right page. We'll rotate it to the left
// page, i.e. return a middle element, and append the current
// middle element to the left page.
let rc = super::cursor::PageCursor::new(&m.other, 0);
let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);
let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();
let mut freed_ = [0, 0];
// First, perform the modification on the modified page, which we
// know is the left page, and return the resulting mutable page
// (the modified page may or may not be mutable before we do this).
let new_left = if let Some((k, v)) = m.modified.ins {
let is_dirty = m.modified.page.is_dirty();
let new_left = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
).await? {
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
} else {
unreachable!()
};
// Append the middle element of the concatenation at the end
// of the left page. We know the left page to be mutable by
// now, and we also know there's enough space to do this.
let lc = PageCursor::after(&new_left.0);
if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
txn, new_left.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
).await? {
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
} else {
unreachable!()
}
} else if m.modified.skip_first {
let is_dirty = m.modified.page.is_dirty();
let (page, freed) = <Page<K, V>>::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
).await?;
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
// Append the middle element of the concatenation at the end
// of the left page. We know the left page to be mutable by
// now, and we also know there's enough space to do this.
let lc = PageCursor::after(&page.0);
if let Put::Ok(Ok { freed, page }) =
<Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl).await?
{
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
} else {
unreachable!()
}
} else {
// Append the middle element of the concatenation at the end
// of the left page. We know the left page to be mutable by
// now, and we also know there's enough space to do this.
let is_dirty = m.modified.page.is_dirty();
let lc = PageCursor::after(&m.modified.page);
if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
txn,
m.modified.page,
m.modified.mutable,
false,
&lc,
m.mid.0,
m.mid.1,
None,
0,
rl,
).await? {
if m.modified.l > 0 {
assert_eq!(m.modified.r, 0);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);
*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
}
} else if m.modified.r > 0 {
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
}
}
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
} else {
unreachable!()
}
};
// We extend the pointer's lifetime here, because we know the
// current deletion (we only rebalance during deletions) won't
// touch this page anymore after this.
let k = unsafe { core::mem::transmute(k) };
let v = unsafe { core::mem::transmute(v) };
// If this frees the old "other" page, add it to the "freed"
// array.
let is_dirty = m.other.is_dirty();
let (new_right, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;
if freed > 0 {
freed_[1] = if is_dirty { freed | 1 } else { freed }
}
Ok(Op::Rebalanced {
l: new_left.0.offset,
r: new_right.0.offset,
k,
v,
freed: freed_,
})
}
// Surprisingly, the `rebalance_right` function is simpler,
// since:
//
// - if we call it to rebalance two internal nodes, we're in the easy
// case of rebalance_left.
//
// - Else, the middle element is the last one on the left page, and
// isn't erased be leaf deletions, because these just move entries to
// the left.
//
// This implementation is shared with the sized one.
pub(crate) async fn rebalance_right<
'a,
T: AllocPage,
K: ?Sized,
V: ?Sized,
P: BTreeMutPage<K, V, Cursor = super::PageCursor>,
>(
txn: &mut T,
m: Concat<'a, K, V, P>,
) -> Result<Op<'a, T, K, V>, T::Error> {
assert!(!m.mod_is_left);
// Take the last element of the left page.
let lc = P::cursor_last(&m.other);
let (k0, v0, r0) = P::current(m.other.as_page(), &lc).unwrap();
let mut freed_ = [0, 0];
// Perform the modification on the modified page.
let new_right = if let Some((k, v)) = m.modified.ins {
let is_dirty = m.modified.page.is_dirty();
let new_right = if let Put::Ok(Ok { page, freed }) = P::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
).await? {
if freed > 0 {
freed_[0] = if is_dirty { freed | 1 } else { freed };
}
page
} else {
unreachable!()
};
// Add the middle element of the concatenation as the first
// element of the right page. We know the right page is
// mutable, since we just modified it (hence the
// `assert_eq!(freed, 0)`.
// First element of the right page (after potential
// modification by `put` above).
let rc = P::cursor_first(&new_right.0);
let rl = P::left_child(new_right.0.as_page(), &rc);
if let Put::Ok(Ok { freed, page }) = P::put(
txn,
new_right.0,
true,
false,
&rc,
m.mid.0,
m.mid.1,
None,
r0,
rl,
).await? {
debug_assert_eq!(freed, 0);
page
} else {
unreachable!()
}
} else if m.modified.skip_first {
let is_dirty = m.modified.page.is_dirty();
let (page, freed) = P::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
).await?;
if freed > 0 {
freed_[0] = if is_dirty { freed | 1 } else { freed };
}
// Add the middle element of the concatenation as the first
// element of the right page. We know the right page is
// mutable, since we just modified it. Moreover, if it is
// compacted by the `put` below, we know that the `del` didn't
// free anything, hence we can reuse the slot 0..
// First element of the right page (after potential
// modification by `del` above).
let rc = P::cursor_first(&page.0);
let rl = P::left_child(page.0.as_page(), &rc);
if let Put::Ok(Ok { freed, page }) = P::put(
txn, page.0, true, false, &rc, m.mid.0, m.mid.1, None, r0, rl,
).await? {
if freed > 0 {
freed_[0] = if is_dirty { freed | 1 } else { freed };
}
page
} else {
unreachable!()
}
} else {
let is_dirty = m.modified.page.is_dirty();
let rc = P::cursor_first(&m.modified.page);
let rl = P::left_child(m.modified.page.as_page(), &rc);
if let Put::Ok(Ok { freed, page }) = P::put(
txn,
m.modified.page,
m.modified.mutable,
false,
&rc,
m.mid.0,
m.mid.1,
None,
r0,
rl,
).await? {
// Update the left and right offsets. We know that at
// least one of them is 0, but it can be either one (or
// both), depending on what happened on the page below.
//
// Since we inserted an entry at the beginning of the
// page, we need to add 1 to the index given by
// `m.modified.c1.cur`.
if m.modified.l > 0 {
assert_eq!(m.modified.r, 0);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
}
} else if m.modified.r > 0 {
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur + 1);
*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
}
}
if freed > 0 {
freed_[0] = if is_dirty { freed | 1 } else { freed };
}
page
} else {
unreachable!()
}
};
// As explained in the general comment on this function, this
// entry isn't erased by the deletion in `m.other` below, so we
// can safely extend its lifetime.
let k = unsafe { core::mem::transmute(k0) };
let v = unsafe { core::mem::transmute(v0) };
let is_dirty = m.other.is_dirty();
let (new_left, freed) = P::del(txn, m.other, m.other_is_mutable, &lc, 0).await?;
if freed > 0 {
freed_[1] = if is_dirty { freed | 1 } else { freed }
}
Ok(Op::Rebalanced {
l: new_left.0.offset,
r: new_right.0.offset,
k,
v,
freed: freed_,
})
}
use super::header::{header, header_mut, HDR};
use super::*;
pub(super) async fn put<
'a,
T: AllocPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc + AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
u: usize,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
// Size of the new insertions, not counting the offsets.
let size = alloc_size(k0, v0)
+ if let Some((k1, v1)) = k1v1 {
alloc_size(k1, v1)
} else {
0
};
// Sized occupied by the data in these insertions (not counting
// the offsets), if we need to compact the page.
let size_replaced = if replace {
// We're always in an internal node if we replace.
let cur_size = Internal::current_size::<K, V>(page.as_page(), u as isize);
// Since `size` isn't counting the size of the 64-bit offset,
// and `cur_size` is, we need to add 8.
8 + size as isize - cur_size as isize
} else {
size as isize
};
// Number of extra offsets.
let n_ins = {
let n_ins = if k1v1.is_some() { 2 } else { 1 };
if replace {
n_ins - 1
} else {
n_ins
}
};
let hdr = header(page.as_page());
if mutable && page.is_dirty() && L::can_alloc(header(page.as_page()), size, n_ins) {
// First, if the page is dirty and mutable, mark it mutable.
let mut page = MutPage(page);
let mut n = u as isize;
// If we replace, we first need to "unlink" the previous
// value, by erasing its offset.
if replace {
let p = page.0.data;
let hdr = header_mut(&mut page);
// In this case, we know we're not in a leaf, so offsets
// are of size 8.
assert!(!hdr.is_leaf());
unsafe {
let ptr = p.add(HDR + n as usize * 8) as *mut u64;
let off = (u64::from_le(*ptr) & 0xfff) as usize;
let kv_ptr = p.add(off);
let size = entry_size::<K, V>(kv_ptr);
core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
// Decreasing these figures here, we'll increase them
// again in the calls to `alloc_write` below.
hdr.decr(8 + size);
hdr.set_n(hdr.n() - 1);
}
}
// Do the actual insertions.
if let Some((k1, v1)) = k1v1 {
L::alloc_write(&mut page, k0, v0, 0, 0, &mut n);
L::alloc_write(&mut page, k1, v1, l, r, &mut n);
} else {
L::alloc_write(&mut page, k0, v0, l, r, &mut n);
}
// Return: no page was freed.
Ok(Put::Ok(Ok { page, freed: 0 }))
} else if L::can_compact(hdr, size_replaced, n_ins) {
// Allocate a new page, split the offsets at the position of
// the insertion, and each side of the split onto the new
// page, with the insertion between them.
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
// Clone the leftmost child.
L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);
// Split the offsets.
let s = L::offset_slice(page.as_page());
let (s0, s1) = s.split_at(u as usize);
// Drop the first element of `s1` if we're replacing it.
let s1 = if replace { s1.split_at(1).1 } else { s1 };
// Finally, clone and insert.
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
if let Some((k1, v1)) = k1v1 {
L::alloc_write(&mut new, k0, v0, 0, l, &mut n);
L::alloc_write(&mut new, k1, v1, 0, r, &mut n);
} else {
L::alloc_write(&mut new, k0, v0, l, r, &mut n);
}
clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);
// And return, freeing the old version of the page.
Ok(Put::Ok(Ok {
page: new,
freed: page.offset | if page.is_dirty() { 1 } else { 0 },
}))
} else {
split_unsized::<_, _, _, L>(txn, page, replace, u, k0, v0, k1v1, l, r).await
}
}
// Surprisingly, the following function is simpler than in the sized
// case, because we can't be too efficient in this case, since we have
// to go through all the elements linearly anyway to get their sizes.
async fn split_unsized<
'a,
T: AllocPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc + AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
replace: bool,
u: usize,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l0: u64,
r0: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
let hdr = header(page.as_page());
let mut left = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
let mut right = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
// Clone the leftmost child. If we're inserting at position 0,
// this means this leftmost child must, and will be replaced.
let left_child = hdr.left_page() & !0xfff;
L::set_right_child(&mut left, -1, left_child);
// Pointer to the split key and value.
let mut split = None;
// We start filling the left page, and we will change to filling
// the right page when the left one is at least half-full.
let mut current_page = &mut left;
// There are two synchronised counters here: `hdr.n()` and
// `n`. The reason for this is that operations on `hdr.n()` are
// somewhat cumbersome, as they might involve extra operations to
// convert to/from the little-endian encoding of `hdr.n()`.
let mut n = 0;
let s = L::offset_slice(page.as_page());
let mut total = HDR;
for (uu, off) in s.0.iter().enumerate() {
// If the current position of the cursor is the insertion,
// (i.e. `uu == u`), insert.
if uu == u {
// The following is tricky and makes assumptions on the
// rest of the code. If we are inserting two elements
// (i.e. if `k1v1.is_some()`), this means we're replacing
// one on the page.
header_mut(current_page).set_n(n as u16);
if let Some((k1, v1)) = k1v1 {
// As explained in the documentation for `put` in the
// definition of `BTreeMutPage`, `l0` and `r0` are the
// left and right children of k1v1, since this means
// the right child of a deleted entry has split, and
// we must replace the deleted entry with `(k0, v0)`.
L::alloc_write(current_page, k0, v0, 0, l0, &mut n);
total += alloc_size(k0, v0) + L::OFFSET_SIZE;
L::alloc_write(current_page, k1, v1, 0, r0, &mut n);
total += alloc_size(k1, v1) + L::OFFSET_SIZE;
// Replacing the next element:
debug_assert!(replace);
continue;
} else {
L::alloc_write(current_page, k0, v0, l0, r0, &mut n);
total += alloc_size(k0, v0) + L::OFFSET_SIZE;
if replace {
continue;
}
}
}
// Then, i.e. either if we're not at the position of an
// insertion, or else if we're not replacing the current
// position, just copy the current entry to the appropriate
// page (left or right).
let (r, off): (u64, usize) = (*off).into();
assert_ne!(off, 0);
unsafe {
let ptr = page.data.add(off);
let size = entry_size::<K, V>(ptr);
// If the left side of the split is at least half-full, we
// know that we can do all the insertions we need on the
// right-hand side (even if there are two of them, and the
// replaced value is of size 0), so we switch.
if split.is_none() && total >= PAGE_SIZE / 2 {
// Don't write the next entry onto any page, keep it
// as the split entry.
let (k, v) = read::<K, V>(ptr);
split = Some((K::from_raw_ptr(k), V::from_raw_ptr(v)));
// Set the entry count for the current page, before
// switching and resetting the counter.
header_mut(current_page).set_n(n as u16);
// And set the leftmost child of the right page to
// `r`.
L::set_right_child(&mut right, -1, r);
// Then, switch page and reset the counter.
current_page = &mut right;
n = 0;
} else {
// Else, we're simply allocating a new entry on the
// current page.
let off_new = {
let hdr = header_mut(current_page);
let data = hdr.data();
assert!(
HDR + hdr.n() as usize * L::OFFSET_SIZE + L::OFFSET_SIZE + size
< data as usize
);
// Move the data pointer `size` bytes to the left.
hdr.set_data(data - size as u16);
// And increase the number of entries, and
// occupied size of the page.
hdr.incr(size + L::OFFSET_SIZE);
data - size as u16
};
// Copy the entry.
core::ptr::copy_nonoverlapping(
ptr,
current_page.0.data.offset(off_new as isize),
size,
);
// And set the offset.
L::set_offset(current_page, n, r | (off_new as u64));
n += 1;
}
total += size + L::OFFSET_SIZE;
}
}
header_mut(current_page).set_n(n as u16);
// Finally, it is possible that we haven't had a chance to do the
// insertions yet: this is the case iff we're inserting at the end
// of all the entries, so handle this now.
if u == s.0.len() {
if let Some((k1, v1)) = k1v1 {
L::alloc_write(current_page, k0, v0, 0, l0, &mut n);
L::alloc_write(current_page, k1, v1, 0, r0, &mut n);
} else {
L::alloc_write(current_page, k0, v0, l0, r0, &mut n);
}
}
let (split_key, split_value) = split.unwrap();
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed: if page.is_dirty() {
page.offset | 1
} else {
page.offset
},
})
}
use crate::PAGE_SIZE;
#[derive(Debug)]
#[repr(C)]
pub struct Header {
data: u16,
n: u16,
crc: u32,
left_page: u64,
}
impl Header {
pub(crate) fn init(&mut self) {
self.data = (((PAGE_SIZE as u16) << 3) | 1).to_le();
self.n = 0;
self.crc = 0;
self.left_page = 0;
}
pub(crate) fn n(&self) -> u16 {
u16::from_le(self.n)
}
pub(crate) fn set_n(&mut self, n: u16) {
self.n = n.to_le();
}
pub fn left_page(&self) -> u64 {
u64::from_le(self.left_page)
}
pub fn set_left_page(&mut self, l: u64) {
self.left_page = l.to_le()
}
pub fn data(&self) -> u16 {
u16::from_le(self.data) >> 3
}
pub fn set_data(&mut self, d: u16) {
let dirty = u16::from_le(self.data) & 7;
self.data = ((d << 3) | dirty).to_le()
}
pub fn decr(&mut self, s: usize) {
self.left_page = (self.left_page() - s as u64).to_le();
}
pub fn set_occupied(&mut self, size: u64) {
self.left_page = ((self.left_page() & !0xfff) | size).to_le()
}
pub fn incr(&mut self, s: usize) {
self.left_page = (self.left_page() + s as u64).to_le();
}
pub fn is_leaf(&self) -> bool {
u64::from_le(self.left_page) <= 0xfff
}
}
pub const HDR: usize = core::mem::size_of::<Header>();
pub(crate) fn header<'a>(page: crate::Page<'a>) -> &'a Header {
unsafe { &*(page.data.as_ptr() as *const Header) }
}
pub fn header_mut(page: &mut crate::MutPage) -> &mut Header {
unsafe { &mut *(page.0.data as *mut Header) }
}
use super::{header, Header};
#[derive(Debug, Clone, Copy)]
pub struct PageCursor {
pub cur: isize,
pub total: usize,
pub is_leaf: bool,
}
impl PageCursor {
/// Initialise a cursor on page `p` at position `cur`, checking
/// that `cur` is a valid position.
pub fn new(p: &crate::CowPage, cur: isize) -> Self {
let hdr = unsafe { &*(p.data as *const Header) };
assert!(cur >= -1 && cur < hdr.n() as isize);
PageCursor {
cur,
total: hdr.n() as usize,
is_leaf: hdr.is_leaf(),
}
}
/// Initialise a cursor set after the last entry of page `p`.
pub fn after(p: &crate::CowPage) -> Self {
let hdr = unsafe { &*(p.data as *const Header) };
let total = hdr.n();
PageCursor {
cur: total as isize,
total: total as usize,
is_leaf: hdr.is_leaf(),
}
}
/// Initialise a cursor set on the last entry of page `p`.
pub fn last(p: crate::Page) -> Self {
let hdr = header(p);
let total = hdr.n();
PageCursor {
cur: (total - 1) as isize,
total: total as usize,
is_leaf: hdr.is_leaf(),
}
}
}
use super::header::{header, header_mut, Header, HDR};
use super::*;
/// We'd love to share this trait with the sized implementation, but
/// unfortunately, the type parameters of almost all methods are
/// different.
pub(crate) trait Alloc {
const OFFSET_SIZE: usize;
/// Size (including the offset size) of the entry at position
/// `cur`.
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page,
cur: isize,
) -> usize;
/// Can we allocate an entry of `size` bytes, where `size` doesn't
/// include the offset bytes?
fn can_alloc(hdr: &Header, size: usize, n: usize) -> bool {
(HDR as usize) + (hdr.n() as usize) * Self::OFFSET_SIZE + n * Self::OFFSET_SIZE + size
<= hdr.data() as usize
}
/// If we cloned the page, could we allocate an entry of `size`
/// bytes, where `size` doesn't include the offset bytes?
fn can_compact(hdr: &Header, size: isize, n: usize) -> bool {
(HDR as isize)
+ ((hdr.left_page() & 0xfff) as isize)
+ (n * Self::OFFSET_SIZE) as isize
+ size
<= PAGE_SIZE as isize
}
/// Set the right child of the `n`th entry to `r`. If `n == -1`,
/// this method can be used to set the leftmost child of a page.
fn set_right_child(_new: &mut MutPage, _n: isize, _r: u64) {}
/// Set the n^th offset on the page to `r`, which encodes a page
/// offset in its 52 MSBs, and an offset on the page in its 12 LSBs.
fn set_offset(new: &mut MutPage, n: isize, r: u64);
/// The type of offsets, u64 in internal nodes, u16 in leaves.
type Offset: Into<(u64, usize)> + Copy + core::fmt::Debug;
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset>;
}
#[derive(Debug, Clone, Copy)]
pub struct Offsets<'a, A>(pub &'a [A]);
impl<'a, A: Copy> Offsets<'a, A> {
pub fn split_at(self, mid: usize) -> (Self, Self) {
if mid < self.0.len() {
let (a, b) = self.0.split_at(mid);
(Offsets(a), Offsets(b))
} else {
debug_assert_eq!(mid, self.0.len());
(self, Offsets(&[][..]))
}
}
}
pub struct Leaf {}
pub struct Internal {}
impl Alloc for Leaf {
const OFFSET_SIZE: usize = 2;
// Note: the size returned by this function includes the offset.
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page,
cur: isize,
) -> usize {
// Find the offset of the current position, get its size.
unsafe {
debug_assert!(cur >= 0);
let off = *(page.data.as_ptr().add(HDR + 2 * cur as usize) as *const u16);
Self::OFFSET_SIZE
+ entry_size::<K, V>(page.data.as_ptr().add(u16::from_le(off) as usize))
}
}
fn set_offset(new: &mut MutPage, n: isize, off: u64) {
unsafe {
let ptr = new.0.data.offset(HDR as isize + n * 2) as *mut u16;
*ptr = (off as u16).to_le();
}
}
type Offset = LeafOffset;
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
let hdr_n = header(page).n();
unsafe {
Offsets(core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const LeafOffset,
hdr_n as usize,
))
}
}
}
impl Alloc for Internal {
const OFFSET_SIZE: usize = 8;
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page,
cur: isize,
) -> usize {
unsafe {
8 + entry_size::<K, V>(page.data.as_ptr().add(
(u64::from_le(*(page.data.as_ptr().add(HDR) as *const u64).offset(cur)) & 0xfff)
as usize,
))
}
}
/// Set the right-hand child in the offset array, preserving the
/// current offset.
fn set_right_child(page: &mut MutPage, n: isize, r: u64) {
unsafe {
debug_assert_eq!(r & 0xfff, 0);
let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
let off = u64::from_le(*ptr) & 0xfff;
*ptr = (r | off as u64).to_le();
}
}
fn set_offset(new: &mut MutPage, n: isize, off: u64) {
unsafe {
let ptr = new.0.data.offset(HDR as isize + n * 8) as *mut u64;
*ptr = off.to_le();
}
}
type Offset = InternalOffset;
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
let hdr_n = header(page).n() as usize;
unsafe {
Offsets(core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const InternalOffset,
hdr_n,
))
}
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct LeafOffset(u16);
impl Into<(u64, usize)> for LeafOffset {
fn into(self) -> (u64, usize) {
(0, self.0 as usize)
}
}
impl Into<usize> for LeafOffset {
fn into(self) -> usize {
self.0 as usize
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct InternalOffset(u64);
impl Into<(u64, usize)> for InternalOffset {
fn into(self) -> (u64, usize) {
(self.0 & !0xfff, (self.0 & 0xfff) as usize)
}
}
impl Into<usize> for InternalOffset {
fn into(self) -> usize {
self.0 as usize
}
}
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Leaf {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {
debug_assert_eq!(l, 0);
}
}
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Internal {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
fn set_left_child(new: &mut MutPage, n: isize, l: u64) {
if l > 0 {
Internal::set_right_child(new, n - 1, l);
}
}
}
// Allocate a new entry and copy (k0, v0) into the new slot.
fn alloc_write<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let size = alloc_size(k0, v0);
let off_new = alloc_insert::<K, V, L>(new, n, size, r);
unsafe {
let new_ptr = new.0.data.add(off_new);
k0.write_to_page(new_ptr);
let ks = k0.size();
let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));
v0.write_to_page(v_ptr);
}
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
/// Reserve space on the page for `size` bytes of data (i.e. not
/// counting the offset), set the right child of the new position
/// to `r`, add the offset to the new data to the offset array,
/// and return the new offset.
fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
new: &mut MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize {
let hdr = header_mut(new);
let data = hdr.data();
// If we're here, the following assertion has been checked by the
// `can_alloc` method.
debug_assert!(HDR + (hdr.n() as usize + 1) * L::OFFSET_SIZE + size <= data as usize);
hdr.set_data(data - size as u16);
hdr.set_n(hdr.n() + 1);
hdr.incr(L::OFFSET_SIZE + size);
let off_new = data - size as u16;
let hdr_n = hdr.n();
// Making space for the new offset
unsafe {
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * L::OFFSET_SIZE),
new.0
.data
.add(HDR + (*n as usize) * L::OFFSET_SIZE + L::OFFSET_SIZE),
(hdr_n as usize - (*n as usize)) * L::OFFSET_SIZE,
);
}
L::set_offset(new, *n, r | (off_new as u64));
off_new as usize
}
//! Implementation of B tree pages for Sized types, i.e. types whose
//! representation has a size known at compile time (and the same as
//! [`core::mem::size_of()`]).
//!
//! The details of the implementation are as follows:
//!
//! - The page starts with a 16 bytes header of the following form
//! (where all the fields are encoded in Little-Endian):
//!
//! ```
//! #[repr(C)]
//! pub struct Header {
//! /// Offset to the first entry in the page, shifted 3 bits
//! /// to the right to allow for the dirty bit (plus two
//! /// extra bits, zero for now), as explained in the
//! /// documentation of CowPage, at the root of this
//! /// crate. This is 4096 for empty pages, and it is unused
//! /// for leaves. Moreover, this field can't be increased:
//! /// when it reaches the bottom, the page must be cloned.
//! data: u16,
//! /// Number of entries on the page.
//! n: u16,
//! /// CRC (if used).
//! crc: u32,
//! /// The 52 most significant bits are the left child of
//! /// this page (0 for leaves), while the 12 LSBs represent
//! /// the space this page would take when cloned from scratch,
//! /// minus the header. The reason for this is that entries
//! /// in internal nodes aren't really removed when deleted,
//! /// they're only "unlinked" from the array of offsets (see
//! /// below). Therefore, we must have a way to tell when a
//! /// page can be "compacted" to reclaim space.
//! left_page: u64,
//! }
//! ```
//! - For leaves, the rest of the page has the same representation as
//! an array of length `n`, of elements of type `Tuple<K, V>`:
//! ```
//! #[repr(C)]
//! struct Tuple<K, V> {
//! k: K,
//! v: V,
//! }
//! ```
//! If the alignment of that structure is more than 16 bytes, we
//! need to add some padding between the header and that array.
//! - For internal nodes, the rest of the page starts with an array of
//! length `n` of Little-Endian-encoded [`u64`], where the 12 least
//! significant bits of each [`u64`] are an offset to a `Tuple<K, V>` in
//! the page, and the 52 other bits are an offset in the file to the
//! right child of the entry.
//!
//! Moreover, the offset represented by the 12 LSBs is after (or at)
//! `header.data`.
//! We say we can "allocate" in the page if the `data` of the header
//! is greater than or equal to the position of the last "offset",
//! plus the size we want to allocate (note that since we allocate
//! from the end of the page, `data` is always a multiple of the
//! alignment of `Tuple<K, V>`).
use super::*;
use crate::btree::del::*;
use crate::btree::put::*;
use core::cmp::Ordering;
mod alloc; // The "alloc" trait, to provide common methods for leaves and internal nodes.
mod put; // Inserting a new value onto a page (possibly splitting the page).
mod rebalance; // Rebalance two sibling pages to the left or to the right.
use super::page_unsized::{cursor::PageCursor, header};
use alloc::*;
use header::*;
use rebalance::*;
/// This struct is used to allocate a pair `(K, V)` on the stack, and
/// copy it from a C pointer from a page.
///
/// This is also used to form slices of pairs in order to use
/// functions from the core library.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(C)]
struct Tuple<K, V> {
k: K,
v: V,
}
/// Empty type implementing `BTreePage` and `BTreeMutPage`.
#[derive(Debug)]
pub struct Page<K, V> {
k: core::marker::PhantomData<K>,
v: core::marker::PhantomData<V>,
}
#[async_trait(?Send)]
impl<K: Storable, V: Storable> super::BTreePage<K, V> for Page<K, V> {
// Cursors are quite straightforward. One non-trivial thing is
// that they represent both a position on a page and the interval
// from that position to the end of the page, as is apparent in
// their `split_at` method.
//
// Another thing to note is that -1 and `c.total` are valid
// positions for a cursor `c`. The reason for this is that
// position `-1` has a right child (which is the first element's
// left child).
type Cursor = PageCursor;
fn is_empty(c: &Self::Cursor) -> bool {
c.cur >= c.total as isize
}
fn is_init(c: &Self::Cursor) -> bool {
c.cur < 0
}
fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
PageCursor::new(p, -1)
}
fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
PageCursor::after(p)
}
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
PageCursor {
cur: 0,
total: c.cur.max(0) as usize,
is_leaf: c.is_leaf,
},
*c,
)
}
fn move_next(c: &mut Self::Cursor) -> bool {
if c.cur >= c.total as isize {
return false;
}
c.cur += 1;
true
}
fn move_prev(c: &mut Self::Cursor) -> bool {
if c.cur < 0 {
return false;
}
c.cur -= 1;
true
}
fn current<'a>(
page: crate::Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)> {
// First, there's no current entry if the cursor is outside
// the range of entries.
if c.cur < 0 || c.cur >= c.total as isize {
None
} else if c.is_leaf {
// Else, if this is a leaf, the elements are packed
// together in a contiguous array.
//
// This means that the header may be followed by padding
// (in order to align the entries). These are constants
// known at compile-time, so `al` and `hdr` are optimised
// away by the compiler.
let al = core::mem::align_of::<Tuple<K, V>>();
// The following is a way to compute the first multiple of
// `al` after `HDR`, assuming `al` is a power of 2 (which
// is always the case since we get it from `align_of`).
let hdr = (HDR + al - 1) & !(al - 1);
// The position of the `Tuple<K, V>` we're looking for is
// `f * cur` bytes after the padded header (because
// `size_of` includes alignment padding).
let f = core::mem::size_of::<Tuple<K, V>>();
let kv = unsafe {
&*(page.data.as_ptr().add(hdr + c.cur as usize * f) as *const Tuple<K, V>)
};
Some((&kv.k, &kv.v, 0))
} else {
// Internal nodes have an extra level of indirection: we
// first need to find `off`, the offset in the page, in
// the initial array of offsets. Since these offsets are
// `u64`, and the header is of size 16 bytes, the array is
// already aligned.
unsafe {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + 8 * c.cur as usize) as *const u64));
// Check that we aren't reading outside of the page
// (for example because of a malformed offset).
assert!((off as usize & 0xfff) + core::mem::size_of::<Tuple<K, V>>() <= 4096);
// Once we have the offset, cast its 12 LSBs to a
// position in the page, and read the `Tuple<K, V>` at
// that position.
let kv = &*(page.data.as_ptr().add((off as usize) & 0xfff) as *const Tuple<K, V>);
Some((&kv.k, &kv.v, off & !0xfff))
}
}
}
// The left and right child methods aren't really surprising. One
// thing to note is that cursors are always in positions between
// `-1` and `c.total` (bounds included), so we only have to check
// one side of the bound in the assertions.
//
// We also check, before entering the `unsafe` sections, that
// we're only reading data that is on a page.
fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
if c.is_leaf {
0
} else {
assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().offset((HDR as isize + c.cur * 8) - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
if c.is_leaf {
0
} else {
assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
async fn set_cursor<'a, 'b, T: LoadPage>(
_txn: &'a T,
page: crate::Page<'b>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize> {
unsafe {
// `lookup` has the same semantic as
// `core::slice::binary_search`, i.e. it returns either
// `Ok(n)`, where `n` is the position where `(k0, v0)` was
// found, or `Err(n)` where `n` is the position where
// `(k0, v0)` can be inserted to preserve the order.
match lookup(page, c, k0, v0).await {
Ok(n) => {
c.cur = n as isize;
// Just read the tuple and return it.
if c.is_leaf {
let f = core::mem::size_of::<Tuple<K, V>>();
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
let tup =
&*(page.data.as_ptr().add(hdr_size + f * n) as *const Tuple<K, V>);
Ok((&tup.k, &tup.v, 0))
} else {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
let tup =
&*(page.data.as_ptr().add(off as usize & 0xfff) as *const Tuple<K, V>);
Ok((&tup.k, &tup.v, off & !0xfff))
}
}
Err(n) => {
c.cur = n as isize;
Err(n)
}
}
}
}
}
#[async_trait(?Send)]
impl<K: Storable + core::fmt::Debug, V: Storable + core::fmt::Debug> super::BTreeMutPage<K, V>
for Page<K, V>
{
// Once again, this is quite straightforward.
fn init(page: &mut MutPage) {
let h = header_mut(page);
h.init();
}
// When deleting from internal nodes, we take a replacement from
// one of the leaves (in our current implementation, the leftmost
// entry of the right subtree). This method copies an entry from
// the leaf onto the program stack, which is necessary since
// deletions in leaves overwrites entries.
//
// Another design choice would have been to do the same as for the
// unsized implementation, but in this case this would have meant
// copying the saved value to the end of the leaf, potentially
// preventing merges, and not even saving a memory copy.
type Saved = (K, V);
fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
unsafe {
let mut k0 = core::mem::MaybeUninit::uninit();
let mut v0 = core::mem::MaybeUninit::uninit();
core::ptr::copy_nonoverlapping(k, k0.as_mut_ptr(), 1);
core::ptr::copy_nonoverlapping(v, v0.as_mut_ptr(), 1);
(k0.assume_init(), v0.assume_init())
}
}
unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
(core::mem::transmute(&s.0), core::mem::transmute(&s.1))
}
// `put` inserts one or two entries onto a node (internal or
// leaf). This is implemented in the `put` module.
async fn put<'a, T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
c: &PageCursor,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l: u64,
r: u64,
) -> Result<super::put::Put<'a, K, V>, T::Error> {
assert!(c.cur >= 0);
// In the sized case, deletions can never cause a split, so we
// never have to insert two elements at the same position.
assert!(k1v1.is_none());
if r == 0 {
put::put::<_, _, _, Leaf>(txn, page, mutable, replace, c.cur as usize, k0, v0, 0, 0).await
} else {
put::put::<_, _, _, Internal>(txn, page, mutable, replace, c.cur as usize, k0, v0, l, r).await
}
}
unsafe fn put_mut(page: &mut MutPage, c: &mut Self::Cursor, k0: &K, v0: &V, r: u64) {
use super::page_unsized::AllocWrite;
let mut n = c.cur;
if r == 0 {
Leaf::alloc_write(page, k0, v0, 0, r, &mut n);
} else {
Internal::alloc_write(page, k0, v0, 0, r, &mut n);
}
c.total += 1;
}
unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
// This function updates an internal node, setting the left child
// of the cursor to `l`.
async fn update_left_child<T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
c: &Self::Cursor,
l: u64,
) -> Result<crate::btree::put::Ok, T::Error> {
assert!(!c.is_leaf && c.cur >= 0 && (c.cur as usize) <= c.total);
let freed;
let page = if mutable && page.is_dirty() {
// If the page is mutable (dirty), just convert it to a
// mutable page, and update.
freed = 0;
MutPage(page)
} else {
// Else, clone the page.
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
// First clone the left child of the page.
let l = header(page.as_page()).left_page() & !0xfff;
let hdr = header_mut(&mut new);
hdr.set_left_page(l);
// And then the rest of the page.
let s = Internal::offset_slice::<T, K, V>(page.as_page());
clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
// Mark the former version of the page as free.
freed = page.offset | if page.is_dirty() { 1 } else { 0 };
new
};
// Finally, update the left child of the cursor.
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
Ok(Ok { page, freed })
}
async fn del<T: AllocPage>(
txn: &mut T,
page: crate::CowPage,
mutable: bool,
c: &PageCursor,
l: u64,
) -> Result<(MutPage, u64), T::Error> {
assert!(c.cur >= 0 && (c.cur as usize) < c.total);
if mutable && page.is_dirty() {
// In the mutable case, we just need to move some memory
// around.
let p = page.data;
let mut page = MutPage(page);
let hdr = header_mut(&mut page);
let f = core::mem::size_of::<Tuple<K, V>>();
if c.is_leaf {
// In leaves, we need to move the n - c - 1 elements
// that are strictly after the cursor, by `f` (the
// size of an entry).
//
// Here's the reasoning to avoid off-by-one errors: if
// `c` is 0 (i.e. we're deleting the first element on
// the page), we remove one entry, so there are n - 1
// remaining entries.
let n = hdr.n() as usize;
let hdr_size = {
// As usual, header + padding
let al = core::mem::align_of::<Tuple<K, V>>();
(HDR + al - 1) & !(al - 1)
};
let off = hdr_size + c.cur as usize * f;
unsafe {
core::ptr::copy(p.add(off + f), p.add(off), f * (n - c.cur as usize - 1));
}
// Removing `f` bytes from the page.
hdr.decr(f);
} else {
// Internal nodes are easier to deal with, as we just
// have to move the offsets.
unsafe {
let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - c.cur as usize - 1);
}
// Removing `f` bytes from the page (the tuple), plus
// one 8-bytes offset.
hdr.decr(f + 8);
// Updating the left page if necessary.
if l > 0 {
unsafe {
let off = (p.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
}
}
hdr.set_n(hdr.n() - 1);
// Returning the mutable page itself, and 0 (for "no page freed")
Ok((page, 0))
} else {
// Immutable pages need to be cloned. The strategy is the
// same in both cases: get an "offset slice", split it at
// the cursor, remove the first entry of the second half,
// and clone.
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
if c.is_leaf {
let s = Leaf::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
} else {
// Internal nodes a bit trickier, since the left child
// is not counted in the "offset slice", so we need to
// clone it separately. Also, the `l` argument to this
// function might be non-zero in this case.
let s = Internal::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
// First, clone the left child of the page.
let hdr = header(page.as_page());
let left = hdr.left_page() & !0xfff;
unsafe { *(new.0.data.add(HDR - 8) as *mut u64) = left.to_le() };
// Then, clone the entries strictly before the cursor
// (i.e. clone `s0`).
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
// If we need to update the left child of the deleted
// item, do it.
if l > 0 {
unsafe {
let off = new.0.data.offset(HDR as isize + (n - 1) * 8) as *mut u64;
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
}
}
// Finally, clone the right half of the page (`s1`).
clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
}
Ok((new, page.offset))
}
}
// Decide what to do with the concatenation of two neighbouring
// pages, with a middle element.
async fn merge_or_rebalance<'a, T: AllocPage>(
txn: &mut T,
m: Concat<'a, K, V, Self>,
) -> Result<Op<'a, T, K, V>, T::Error> {
// First evaluate the size of the middle element on a page.
let (hdr_size, mid_size) = if m.modified.c0.is_leaf {
let al = core::mem::align_of::<Tuple<K, V>>();
(
(HDR + al - 1) & !(al - 1),
core::mem::size_of::<Tuple<K, V>>(),
)
} else {
(HDR, 8 + core::mem::size_of::<Tuple<K, V>>())
};
// Evaluate the size of the modified half of the concatenation
// (which includes the header).
let mod_size = size::<K, V>(&m.modified);
// Add the "occupied" size (which excludes the header).
let occupied = {
let hdr = header(m.other.as_page());
(hdr.left_page() & 0xfff) as usize
};
// One surprising observation here is that adding the sizes
// works. This is surprising because of alignment and
// padding. It works because we can split the sizes into an
// offset part (always 8 bytes) and a data part (of a constant
// alignment), and thus we never need any padding anywhere on
// the page.
if mod_size + mid_size + occupied <= PAGE_SIZE {
// If the concatenation fits on a page, merge.
return if m.modified.c0.is_leaf {
super::page_unsized::merge::<_, _, _, _, Leaf>(txn, m).await
} else {
super::page_unsized::merge::<_, _, _, _, Internal>(txn, m).await
};
}
// If the modified page is large enough, or if the other page
// has only just barely the minimum number of elements to be
// at least half-full, return.
//
// The situation where the other page isn't full enough might
// happen for example if elements occupy exactly 1/5th of a
// page + 1 byte, and the modified page has 2 of them after
// the deletion, while the other page has 3.
if mod_size >= PAGE_SIZE / 2 || hdr_size + occupied - mid_size < PAGE_SIZE / 2 {
if let Some((k, v)) = m.modified.ins {
// Perform the required modification and return.
return Ok(Op::Put(Self::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
).await?));
} else if m.modified.skip_first {
// `ins2` is only ever used when the page immediately
// below a deletion inside an internal node has split,
// and we need to replace the deleted value, *and*
// insert the middle entry of the split.
debug_assert!(m.modified.ins2.is_none());
let (page, freed) = Self::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
).await?;
return Ok(Op::Put(Put::Ok(Ok { page, freed })));
} else {
let mut c1 = m.modified.c1.clone();
let mut l = m.modified.l;
if l == 0 {
Self::move_next(&mut c1);
l = m.modified.r;
}
return Ok(Op::Put(Put::Ok(Self::update_left_child(
txn,
m.modified.page,
m.modified.mutable,
&c1,
l,
).await?)));
}
}
// Finally, if we're here, we can rebalance. There are four
// (relatively explicit) cases, see the `rebalance` submodule
// to see how this is done.
if m.mod_is_left {
if m.modified.c0.is_leaf {
rebalance_left::<_, _, _, Leaf>(txn, m).await
} else {
rebalance_left::<_, _, _, Internal>(txn, m).await
}
} else {
super::page_unsized::rebalance::rebalance_right::<_, _, _, Self>(txn, m).await
}
}
}
/// Size of a modified page (including the header).
fn size<K: Storable, V: Storable>(m: &ModifiedPage<K, V, Page<K, V>>) -> usize {
let mut total = {
let hdr = header(m.page.as_page());
(hdr.left_page() & 0xfff) as usize
};
if m.c1.is_leaf {
let al = core::mem::align_of::<Tuple<K, V>>();
total += (HDR + al - 1) & (!al - 1);
} else {
total += HDR
};
// Extra size for the offsets.
let extra = if m.c1.is_leaf { 0 } else { 8 };
if m.ins.is_some() {
total += extra + core::mem::size_of::<Tuple<K, V>>();
if m.ins2.is_some() {
total += extra + core::mem::size_of::<Tuple<K, V>>()
}
}
// If we skip the first entry of `m.c1`, remove its size.
if m.skip_first {
total -= extra + core::mem::size_of::<Tuple<K, V>>()
}
total
}
/// Linear search, leaf version. This is relatively straightforward.
fn leaf_linear_search<K: Storable, V: Storable>(
k0: &K,
v0: Option<&V>,
s: &[Tuple<K, V>],
) -> Result<usize, usize> {
let mut n = 0;
for sm in s.iter() {
match sm.k.compare(k0) {
Ordering::Less => n += 1,
Ordering::Greater => return Err(n),
Ordering::Equal => {
if let Some(v0) = v0 {
match sm.v.compare(v0) {
Ordering::Less => n += 1,
Ordering::Greater => return Err(n),
Ordering::Equal => return Ok(n),
}
} else {
return Ok(n);
}
}
}
}
Err(n)
}
/// An equivalent of `Ord::cmp` on `Tuple<K, V>` but using
/// `Storable::compare` instead of `Ord::cmp` on `K` and `V`.
fn cmp<K: Storable, V: Storable>(
k0: &K,
v0: Option<&V>,
p: &[u8; 4096],
off: u64,
) -> Ordering {
let off = u64::from_le(off) & 0xfff;
if off as usize + core::mem::size_of::<Tuple<K, V>>() > PAGE_SIZE {
panic!(
"off = {:?}, size = {:?}",
off,
core::mem::size_of::<Tuple<K, V>>()
);
}
let tup = unsafe { &*(p.as_ptr().offset(off as isize & 0xfff) as *const Tuple<K, V>) };
match tup.k.compare(k0) {
Ordering::Equal => {
if let Some(v0) = v0 {
tup.v.compare(v0)
} else {
Ordering::Equal
}
}
o => o,
}
}
/// Linear search for internal nodes. Does what it says.
unsafe fn internal_search<K: Storable, V: Storable>(
k0: &K,
v0: Option<&V>,
s: &[u64],
p: &[u8; 4096],
) -> Result<usize, usize> {
for (n, off) in s.iter().enumerate() {
match cmp(k0, v0, p, *off) {
Ordering::Less => {}
Ordering::Greater => return Err(n),
Ordering::Equal => return Ok(n),
}
}
Err(s.len())
}
/// Lookup just forms slices of offsets (for internal nodes) or values
/// (for leaves), and runs an internal search on them.
async unsafe fn lookup<'a, K: Storable, V: Storable>(
page: crate::Page<'a>,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<usize, usize> {
let hdr = header(page);
c.total = hdr.n() as usize;
c.is_leaf = hdr.is_leaf();
if c.is_leaf {
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(hdr_size) as *const Tuple<K, V>,
hdr.n() as usize,
);
leaf_linear_search(k0, v0, s)
} else {
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u64,
hdr.n() as usize,
);
internal_search(k0, v0, s, page.data)
}
}
/// Clone a slice of offsets onto a page. This essentially does what
/// it says. Note that the leftmost child of a page is not included in
/// the offset slices, so we don't have to handle it here.
///
/// This should really be in the `Alloc` trait, but Rust doesn't have
/// associated type constructors, so we can't have an associated type
/// that's sometimes a slice and sometimes a "Range".
fn clone<K: Storable, V: Storable, L: Alloc>(
page: crate::Page,
new: &mut MutPage,
s: Offsets,
n: &mut isize,
) {
match s {
Offsets::Slice(s) => {
// We know we're in an internal node here.
let size = core::mem::size_of::<Tuple<K, V>>();
for off in s.iter() {
let off = u64::from_le(*off);
let r = off & !0xfff;
let off = off & 0xfff;
unsafe {
let ptr = page.data.as_ptr().add(off as usize);
// Reserve the space on the page
let hdr = header_mut(new);
let data = hdr.data() as u16;
let off_new = data - size as u16;
hdr.set_data(off_new);
// Copy the entry from the original page to its
// position on the new page.
core::ptr::copy_nonoverlapping(ptr, new.0.data.add(off_new as usize), size);
// Set the offset to this new entry in the offset
// array, along with the right child page.
let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
*ptr = (r | off_new as u64).to_le();
}
*n += 1;
}
let hdr = header_mut(new);
hdr.set_n(hdr.n() + s.len() as u16);
hdr.incr((8 + size) * s.len());
}
Offsets::Range(r) => {
let size = core::mem::size_of::<Tuple<K, V>>();
let a = core::mem::align_of::<Tuple<K, V>>();
let header_size = (HDR + a - 1) & !(a - 1);
let len = r.len();
for off in r {
unsafe {
let ptr = page.data.as_ptr().add(header_size + off * size);
let new_ptr = new.0.data.add(header_size + (*n as usize) * size);
core::ptr::copy_nonoverlapping(ptr, new_ptr, size);
}
*n += 1;
}
// On leaves, we do have to update everything manually,
// because we're simply copying stuff.
let hdr = header_mut(new);
hdr.set_n(hdr.n() + len as u16);
hdr.incr(size * len);
}
}
}
use super::*;
use core::mem::MaybeUninit;
pub(crate) async fn rebalance_left<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc,
>(
txn: &mut T,
m: Concat<'a, K, V, Page<K, V>>,
) -> Result<Op<'a, T, K, V>, T::Error> {
assert!(m.mod_is_left);
// First element of the right page. We'll rotate it to the left
// page, i.e. return a middle element, and append the current
// middle element to the left page.
let rc = super::PageCursor::new(&m.other, 0);
let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);
let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();
let mut freed_ = [0, 0];
// First, perform the modification on the modified page, which we
// know is the left page, and return the resulting mutable page
// (the modified page may or may not be mutable before we do this).
let new_left = if let Some((k, v)) = m.modified.ins {
let is_dirty = m.modified.page.is_dirty();
let page = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
).await? {
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
} else {
unreachable!()
};
// Append the middle element of the concatenation at the end
// of the left page. We know the left page to be mutable by
// now, and we also know there's enough space to do this.
let lc = PageCursor::after(&page.0);
if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
).await? {
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
// index 0 is ok here: if a page is freed, it means we
// just compacted a page, which implies that the call
// to `put` above didn't free.
freed_[0] = freed | b
}
page
} else {
unreachable!()
}
} else if m.modified.skip_first {
// Looking at module `super::del`, the only way we can be in
// this case
let is_dirty = m.modified.page.is_dirty();
let (page, freed) = <Page<K, V>>::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
).await?;
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b
}
// Append the middle element of the concatenation at the end
// of the left page. We know the left page to be mutable by
// now, and we also know there's enough space to do this.
let lc = PageCursor::after(&page.0);
if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
).await? {
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
// index 0 is ok here: if we freed a page in the call
// to `del` above, it is mutable and we can allocate
// on it. Else, we just compacted a page, but that
// also means `del` above didn't free.
freed_[0] = freed | b
}
page
} else {
unreachable!()
}
} else {
let is_dirty = m.modified.page.is_dirty();
let lc = PageCursor::after(&m.modified.page);
if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn, m.modified.page, m.modified.mutable, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
).await? {
if m.modified.l > 0 {
assert_eq!(m.modified.r, 0);
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);
*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
}
} else if m.modified.r > 0 {
unsafe {
let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
}
}
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b
}
page
} else {
unreachable!()
}
};
let f = core::mem::size_of::<Tuple<K, V>>();
let (new_right, k, v) = if r == 0 && m.other_is_mutable && m.other.is_dirty() {
// Since `r == 0`, we know this is a leaf. Therefore, we need
// to rewrite the deleted element to the end of the page, so
// that the pointer to the new middle entry is valid when this
// function returns.
let data = m.other.data;
let mut other = MutPage(m.other);
let right_hdr = header_mut(&mut other);
// Remove the space for one entry.
let n = right_hdr.n() as usize;
right_hdr.decr(f);
right_hdr.set_n((n - 1) as u16);
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
let mut t: MaybeUninit<Tuple<K, V>> = MaybeUninit::uninit();
unsafe {
// Save the leftmost element (we can't directly copy it to
// the end, because the page may be full).
core::ptr::copy_nonoverlapping(
data.add(hdr_size) as *mut Tuple<K, V>,
t.as_mut_ptr(),
1,
);
// Move the n - 1 last elements one entry to the left.
core::ptr::copy(
data.add(hdr_size + f) as *const Tuple<K, V>,
data.add(hdr_size) as *mut Tuple<K, V>,
n - 1,
);
// Copy the last element to the end of the page.
let last = data.add(hdr_size + f * (n - 1)) as *mut Tuple<K, V>;
core::ptr::copy_nonoverlapping(t.as_ptr(), last, 1);
(other, &(&*last).k, &(&*last).v)
}
} else {
// If this isn't a leaf, or isn't mutable, use the standard
// deletion function, because we know this will leave the
// pointer to the current entry valid.
// We extend the pointer's lifetime here, because we know the
// current deletion (we only rebalance during deletions) won't
// touch this page anymore after this.
let k = unsafe { core::mem::transmute(k) };
let v = unsafe { core::mem::transmute(v) };
// If this frees the old "other" page, add it to the "freed"
// array.
let is_dirty = m.other.is_dirty();
let (page, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;
if freed > 0 {
freed_[1] = if is_dirty { freed | 1 } else { freed };
}
(page, k, v)
};
Ok(Op::Rebalanced {
l: new_left.0.offset,
r: new_right.0.offset,
k,
v,
freed: freed_,
})
}
use super::*;
pub(super) async fn put<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
let hdr = header(page.as_page());
let is_dirty = page.is_dirty();
if mutable && is_dirty && L::can_alloc::<K, V>(hdr) {
// If the page is mutable (i.e. not shared with another tree)
// and dirty, here's what we do:
let mut page = MutPage(page);
let p = page.0.data;
let hdr = header_mut(&mut page);
let mut n = u as isize;
if replace {
// If we're replacing a value, this can't be in a leaf,
// hence the only thing that needs to be done is erasing
// the offset in the offset array. This is a bit naive,
// since we're moving that array back and forth.
assert!(!hdr.is_leaf());
unsafe {
let ptr = p.add(HDR + n as usize * 8) as *mut u64;
core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
hdr.decr(8 + core::mem::size_of::<Tuple<K, V>>());
hdr.set_n(hdr.n() - 1);
}
}
// Do the actual insertions.
L::alloc_write(&mut page, k0, v0, l, r, &mut n);
// No page was freed, return the page.
Ok(Put::Ok(Ok { page, freed: 0 }))
} else if replace || L::can_compact::<K, V>(hdr) {
// Here, we could allocate if we cloned, so let's clone:
let mut new = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);
// Take the offsets and split it at the cursor position.
let s = L::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(u as usize);
// If we're replacing, remove the offset that needs to go.
let s1 = if replace { s1.split_at(1).1 } else { s1 };
// And then clone the page, inserting the new elements between
// the two halves of the split offsets.
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
L::alloc_write(&mut new, k0, v0, l, r, &mut n);
clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);
Ok(Put::Ok(Ok {
page: new,
freed: if is_dirty {
page.offset | 1
} else {
page.offset
},
}))
} else {
// Else, split the page.
split::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r).await
}
}
async fn split<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
mutable: bool,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
let mut left;
let hdr = header(page.as_page());
let page_is_dirty = if page.is_dirty() { 1 } else { 0 };
let mut n = hdr.n();
let k = n / 2;
n += 1;
let s = L::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(k as usize);
// Find the split entry (i.e. the entry going up in the B
// tree). Then, mid_child is the right child of the split
// key/value, and `s1` is the offsets of the right side of the
// split.
let (mut split_key, mut split_value, mid_child, s1) = if u == k as usize {
// The inserted element is exactly in the middle.
(k0, v0, r, s1)
} else {
// Else, the split key is the first element of `s1`: set the
// new, updated `s1`.
let (s1a, s1b) = s1.split_at(1);
let (k, v, r) = L::kv(page.as_page(), L::first::<K, V>(&s1a));
(k, v, r, s1b)
};
let mut freed = 0;
if u >= k as usize {
// If we are here, u >= k, i.e. the insertion is in the
// right-hand side of the split, or is the split entry itself.
let mut right = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
if u > k as usize {
// if we're inserting in the right-hand side, clone to the
// newly allocated `right` page, by
let mut n = 0;
// Splitting the offsets to make space for an insertion,
// and insert in the middle.
//
// Off-by-one error? Nope: since `k` is the split entry,
// the right page starts at index `k + 1`, hence if `u ==
// k + 1`, `kk` must be 0.
let kk = u as usize - k as usize - 1;
let (s1a, s1b) = s1.split_at(kk);
L::set_right_child(&mut right, -1, mid_child);
clone::<K, V, L>(page.as_page(), &mut right, s1a, &mut n);
L::alloc_write(&mut right, k0, v0, l, r, &mut n);
clone::<K, V, L>(page.as_page(), &mut right, s1b, &mut n);
} else {
// Else, just clone the page, we'll take care of returning
// the split entry afterwards.
clone::<K, V, L>(page.as_page(), &mut right, s1, &mut 0);
}
if mutable && page.is_dirty() {
// (k0, v0) is to be inserted on the right-hand side of
// the split, hence we don't have to clone the left-hand
// side, we can just truncate it.
left = MutPage(page);
let hdr = header_mut(&mut left);
hdr.set_n(k);
hdr.decr((n - 1 - k) as usize * core::mem::size_of::<Tuple<K, V>>());
} else {
// Else, we need to clone the first `k-1` entries,
// i.e. `s0`, onto a new left page.
left = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
let mut n = 0;
L::set_right_child(&mut left, -1, hdr.left_page() & !0xfff);
clone::<K, V, L>(page.as_page(), &mut left, s0, &mut n);
freed = page.offset | page_is_dirty
}
// Finally, if `u` is the middle element, its left and right
// children become the leftmost child of the right page, and
// the rightmost child of the left page, respectively.
if u == k as usize {
// Insertion in the middle:
// - `l` becomes the right child of the last element on `left`.
L::set_right_child(&mut left, k as isize - 1, l);
// - `r` (i.e. `mid_child`) becomes the left child of `right`.
L::set_right_child(&mut right, -1, mid_child);
}
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed,
})
} else {
// Else, the insertion is in the new left page. We first clone
// the left page, inserting (k,v) at its position:
left = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
// Clone the leftmost page
let ll = header(page.as_page()).left_page() & !0xfff;
L::set_right_child(&mut left, -1, ll);
// Clone the two sides, with an entry in between.
let (s0a, s0b) = s0.split_at(u as usize);
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut left, s0a, &mut n);
L::alloc_write(&mut left, k0, v0, l, r, &mut n);
clone::<K, V, L>(page.as_page(), &mut left, s0b, &mut n);
let mut right;
let freed;
// Then, for the right page:
if mutable && page.is_dirty() {
// If we can mutate the original page, remove the first
// `k` entries, and return a pointer to the split key (at
// position `k` on the page)
right = MutPage(page);
if let Some((k, v)) = L::truncate_left::<K, V>(&mut right, k as usize + 1) {
unsafe {
split_key = &*k;
split_value = &*v;
}
}
freed = 0;
} else {
// Else, clone the right page.
right = txn.alloc_page().await?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
// The left child of the right page is `mid_child`,
// i.e. the right child of the split entry.
L::set_right_child(&mut right, -1, mid_child);
// Clone the rest of the page.
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut right, s1, &mut n);
freed = page.offset | page_is_dirty
}
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed,
})
}
}
use super::*;
/// This trait contains methods common to leaves and internal
/// nodes. These methods are meant to be just a few lines long each,
/// and avoid duplicating code in higher-level functions where just a
/// few constants change between internal nodes and leaves.
pub(crate) trait Alloc {
/// Is there enough room to add one entry into this page (without cloning)?
fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool;
/// If we clone the page, will there be enough room for one entry?
fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool;
/// Remove the leftmost `n` elements from the page. On leaves,
/// this moves the last truncated element to the end of the page
/// and returns a pointer to that element (this function is only
/// used in `crate::btree::put`, and the pointer becomes invalid
/// at the end of the `crate::btree::put`).
///
/// Returning the last deleted element isn't required for internal
/// nodes, since the entries are still valid there.
fn truncate_left<K: Storable, V: Storable>(
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)>;
/// Allocate a new entry (size inferred), and return the offset on
/// the page where the new entry can be copied.
fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize;
/// Set the right child of the `n`th entry to `r`. If `n == -1`,
/// this method can be used to set the leftmost child of a page.
fn set_right_child(new: &mut MutPage, n: isize, r: u64);
/// "Slices" of offsets, which aren't really slices in the case of
/// leaves, but just intervals or "ranges".
fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(
page: crate::Page<'a>,
) -> Offsets<'a>;
/// First element of a slice offset. For the sake of code
/// simplicity, we return a `u64` even for leaves. In internal
/// nodes, the 52 MSBs encode a child page, and the 12 LSBs encode
/// an offset in the page.
fn first<'a, K, V>(off: &Offsets<'a>) -> u64;
/// The tuple and right child at offset `off`.
fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64);
}
/// The type of offsets.
#[derive(Debug, Clone)]
pub enum Offsets<'a> {
/// Slices of child pages and offset-in-page for internal nodes,
Slice(&'a [u64]),
/// Ranges for leaves.
Range(core::ops::Range<usize>),
}
impl<'a> Offsets<'a> {
/// Splitting an offset slice, with the same semantics as
/// `split_at` for slices, except if `mid` is equal to the length,
/// in which case we return `(self, &[])`.
pub fn split_at(&self, mid: usize) -> (Self, Self) {
match self {
Offsets::Slice(s) if mid < s.len() => {
let (a, b) = s.split_at(mid);
(Offsets::Slice(a), Offsets::Slice(b))
}
Offsets::Slice(s) => {
debug_assert_eq!(mid, s.len());
(Offsets::Slice(s), Offsets::Slice(&[][..]))
}
Offsets::Range(r) => (
Offsets::Range(r.start..r.start + mid),
Offsets::Range(r.start + mid..r.end),
),
}
}
}
pub struct Leaf {}
pub struct Internal {}
impl Alloc for Leaf {
// Checking if there's enough room is just bookkeeping.
fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {
let f = core::mem::size_of::<Tuple<K, V>>();
let al = core::mem::align_of::<Tuple<K, V>>();
let header_size = (HDR + al - 1) & !(al - 1);
header_size + (hdr.n() as usize + 1) * f <= PAGE_SIZE
}
/// There's no possible compaction on leaves, it's the same as allocating.
fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {
Self::can_alloc::<K, V>(hdr)
}
fn set_right_child(_: &mut MutPage, _: isize, _: u64) {}
fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(
page: crate::Page<'a>,
) -> Offsets<'a> {
let hdr = header(page);
Offsets::Range(0..(hdr.n() as usize))
}
/// This returns an offset on the page, so we need to compute that.
fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {
match off {
Offsets::Slice(_) => unreachable!(),
Offsets::Range(r) => {
let size = core::mem::size_of::<Tuple<K, V>>();
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
(hdr_size + r.start * size) as u64
}
}
}
fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {
unsafe {
let tup = &*(page.data.as_ptr().add(off as usize) as *const Tuple<K, V>);
(&tup.k, &tup.v, 0)
}
}
/// Here, we just move `total - *n` elements to the right, and
/// increase the bookkeeping values (occupied bytes and number of
/// elements).
fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, _: u64) -> usize {
let f = core::mem::size_of::<Tuple<K, V>>();
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
let hdr_n = header_mut(new).n();
unsafe {
core::ptr::copy(
new.0.data.add(hdr_size + (*n as usize) * f),
new.0.data.add(hdr_size + (*n as usize) * f + f),
(hdr_n as usize - (*n as usize)) * f,
);
}
let hdr = header_mut(new);
hdr.set_n(hdr.n() + 1);
hdr.incr(f);
hdr_size + (*n as usize) * f
}
fn truncate_left<K: Storable, V: Storable>(
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)> {
let f = core::mem::size_of::<Tuple<K, V>>();
let al = core::mem::align_of::<Tuple<K, V>>();
let hdr_size = (HDR + al - 1) & !(al - 1);
let hdr = header_mut(page);
let hdr_n = hdr.n();
hdr.set_n(hdr_n - n as u16);
hdr.decr(n * f);
// Now, this is a bit trickier. This performs a rotation by 1
// without all the rotation machinery from the Rust core
// library.
//
// Indeed, since we're in a leaf, we are effectively erasing
// the split entry. There is a choice to be made here, between
// passing the entry by value or by reference. Because splits
// might cascade with the same middle entry, passing it by
// value may be significantly worse if the tree is deep, and
// is never significantly better (we'll end up copying this
// entry twice anyway).
unsafe {
let mut swap: core::mem::MaybeUninit<Tuple<K, V>> = core::mem::MaybeUninit::uninit();
core::ptr::copy(
page.0.data.add(hdr_size + (n - 1) * f),
swap.as_mut_ptr() as *mut u8,
f,
);
core::ptr::copy(
page.0.data.add(hdr_size + n * f),
page.0.data.add(hdr_size),
(hdr_n as usize - n) * f,
);
core::ptr::copy(
swap.as_ptr() as *mut u8,
page.0.data.add(hdr_size + (hdr_n as usize - n) * f),
f,
);
let tup =
&*(page.0.data.add(hdr_size + (hdr_n as usize - n) * f) as *const Tuple<K, V>);
Some((&tup.k, &tup.v))
}
}
}
impl Alloc for Internal {
fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {
(HDR as usize) + (hdr.n() as usize + 1) * 8 + core::mem::size_of::<Tuple<K, V>>()
< hdr.data() as usize
}
fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {
(HDR as usize)
+ ((hdr.left_page() & 0xfff) as usize)
+ 8
+ core::mem::size_of::<Tuple<K, V>>()
<= PAGE_SIZE
}
/// Set the right-hand child in the offset array, preserving the
/// current offset.
fn set_right_child(page: &mut MutPage, n: isize, r: u64) {
unsafe {
debug_assert_eq!(r & 0xfff, 0);
let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
let off = u64::from_le(*ptr) & 0xfff;
*ptr = (r | off as u64).to_le();
}
}
fn offset_slice<'a, T, K: Storable, V: Storable>(page: crate::Page<'a>) -> Offsets<'a> {
let hdr = header(page);
unsafe {
Offsets::Slice(core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u64,
hdr.n() as usize,
))
}
}
fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {
match off {
Offsets::Slice(s) => s[0],
Offsets::Range(_) => unreachable!(),
}
}
fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {
unsafe {
let tup = &*(page.data.as_ptr().add((off & 0xfff) as usize) as *const Tuple<K, V>);
(&tup.k, &tup.v, off & !0xfff)
}
}
// Much simpler than in leaves, because we just need to move the
// offsets. There's a little bit of bookkeeping involved though.
fn truncate_left<K: Storable, V: Storable>(
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)> {
let hdr_n = header_mut(page).n();
unsafe {
// We want to copy the left child of the first entry that
// will be kept alive as the left child of the page.
core::ptr::copy(
page.0.data.add(HDR + (n - 1) * 8),
page.0.data.add(HDR - 8),
// We're removing n entries, so we need to copy the
// remaining `hdr_n - n`, plus the leftmost child
// (hence the + 1).
(hdr_n as usize - n + 1) * 8,
);
}
// Remaining occupied size on the page (minus the header).
let size = (8 + core::mem::size_of::<Tuple<K, V>>()) * (hdr_n as usize - n);
let hdr = header_mut(page);
hdr.set_n(hdr_n - n as u16);
// Because the leftmost child has been copied from an entry
// containing an offset, its 12 LBSs are still enconding an
// offset on the page, whereas it should encode the occupied
// bytes on the page. Reset it.
hdr.set_occupied(size as u64);
None
}
fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize {
let hdr = header_mut(new);
// Move the `data` field one entry to the left.
let data = hdr.data() as u16;
let off_new = data - core::mem::size_of::<Tuple<K, V>>() as u16;
hdr.set_data(off_new);
// Increment the number of entries, add the newly occupied size.
hdr.set_n(hdr.n() + 1);
hdr.incr(8 + core::mem::size_of::<Tuple<K, V>>());
let hdr_n = hdr.n();
unsafe {
// Make space in the offset array by moving the last
// `hdr_n - *n` elements.
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * 8),
new.0.data.add(HDR + (*n as usize) * 8 + 8),
(hdr_n as usize - (*n as usize)) * 8,
);
// Add the offset.
let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
*ptr = (r | off_new as u64).to_le();
}
off_new as usize
}
}
impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Internal {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
fn set_left_child(new: &mut MutPage, n: isize, l: u64) {
if l > 0 {
Internal::set_right_child(new, n - 1, l);
}
}
}
impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Leaf {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {
debug_assert_eq!(l, 0);
}
}
/// Simply allocate an entry in the page, copy it, and set its right
/// and left children.
fn alloc_write<K: Storable, V: Storable, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let off_new = L::alloc_insert::<K, V>(new, n, r);
unsafe {
let new_ptr = &mut *(new.0.data.add(off_new as usize) as *mut Tuple<K, V>);
core::ptr::copy_nonoverlapping(k0, &mut new_ptr.k, 1);
core::ptr::copy_nonoverlapping(v0, &mut new_ptr.v, 1);
}
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
//! An implementation of B trees. The core operations on B trees
//! (lookup, iterate, put and del) are generic in the actual
//! implementation of nodes, via the [`BTreePage`] and
//! [`BTreeMutPage`]. This allows for a simpler code for the
//! high-level functions, as well as specialised, high-performance
//! implementations for the nodes.
//!
//! Two different implementations are supplied: one in [`page`] for
//! types with a size known at compile-time, which yields denser
//! leaves, and hence shallower trees (if the values are really using
//! the space). The other one, in [`page_unsized`], is for
//! dynamic-sized types, or types whose representation is dynamic, for
//! example enums that are `Sized` in Rust, but whose cases vary
//! widely in size.
use crate::*;
#[doc(hidden)]
pub mod cursor;
pub use cursor::{iter, rev_iter, Cursor, Iter, RevIter};
pub mod del;
pub use del::{del, del_no_decr};
pub mod put;
pub use put::put;
pub mod page;
pub mod page_unsized;
#[async_trait(?Send)]
pub trait BTreePage<K: ?Sized, V: ?Sized>: Sized {
type Cursor: Send + Sync + Clone + Copy + core::fmt::Debug;
/// Whether this cursor is at the end of the page.
fn is_empty(c: &Self::Cursor) -> bool;
/// Whether this cursor is strictly before the first element.
fn is_init(c: &Self::Cursor) -> bool;
/// Returns a cursor set before the first element of the page
/// (i.e. set to -1).
fn cursor_before(p: &CowPage) -> Self::Cursor;
/// Returns a cursor set to the first element of the page
/// (i.e. 0). If the page is empty, the returned cursor might be
/// empty.
fn cursor_first(p: &CowPage) -> Self::Cursor {
let mut c = Self::cursor_before(p);
Self::move_next(&mut c);
c
}
/// Returns a cursor set after the last element of the page
/// (i.e. to element n)
fn cursor_after(p: &CowPage) -> Self::Cursor;
/// Returns a cursor set to the last element of the page. If the
/// cursor is empty, this is the same as `cursor_before`.
fn cursor_last(p: &CowPage) -> Self::Cursor {
let mut c = Self::cursor_after(p);
Self::move_prev(&mut c);
c
}
/// Return the element currently pointed to by the cursor (if the
/// cursor is not before or after the elements of the page), and
/// move the cursor to the next element.
fn next<'b>(
p: Page<'b>,
c: &mut Self::Cursor,
) -> Option<(&'b K, &'b V, u64)> {
if let Some((k, v, r)) = Self::current(p, c) {
Self::move_next(c);
Some((k, v, r))
} else {
None
}
}
/// Move the cursor to the previous element, and return that
/// element. Note that this is not the symmetric of `next`.
fn prev<'b>(
p: Page<'b>,
c: &mut Self::Cursor,
) -> Option<(&'b K, &'b V, u64)> {
if Self::move_prev(c) {
Self::current(p, c)
} else {
None
}
}
/// Move the cursor to the next position. Returns whether the
/// cursor was actually moved (i.e. `true` if and only if the
/// cursor isn't already after the last element).
fn move_next(c: &mut Self::Cursor) -> bool;
/// Move the cursor to the previous position. Returns whether the
/// cursor was actually moved (i.e. `true` if and only if the
/// cursor isn't strictly before the page).
fn move_prev(c: &mut Self::Cursor) -> bool;
/// Returns the current element, if the cursor is pointing at one.
fn current<'a>(
p: Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)>;
/// Returns the left child of the entry pointed to by the cursor.
fn left_child(p: Page, c: &Self::Cursor) -> u64;
/// Returns the right child of the entry pointed to by the cursor.
fn right_child(p: Page, c: &Self::Cursor) -> u64;
/// Sets the cursor to the last element less than or equal to `k0`
/// if `v0.is_none()`, and to `(k0, v0)` if `v0.is_some()`. If a
/// match is found (on `k0` only if `v0.is_none()`, on `(k0, v0)`
/// else), return the match along with the right child.
///
/// Else (in the `Err` case of the `Result`), return the position
/// at which `(k0, v0)` can be inserted.
async fn set_cursor<'a, 'b, T: LoadPage>(
txn: &'a T,
page: Page<'b>,
c: &mut Self::Cursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize>;
/// Splits the cursor into two cursors: the elements strictly
/// before the current position, and the elements greater than or
/// equal the current element.
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor);
}
pub struct PageIterator<'a, T: LoadPage, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
cursor: P::Cursor,
_txn: &'a T,
page: Page<'a>,
}
impl<'a, T: LoadPage, K: ?Sized + 'a, V: ?Sized + 'a, P: BTreePage<K, V>> Iterator
for PageIterator<'a, T, K, V, P>
{
type Item = (&'a K, &'a V, u64);
fn next(&mut self) -> Option<Self::Item> {
P::next(self.page, &mut self.cursor)
}
}
#[async_trait(?Send)]
pub trait BTreeMutPage<K: ?Sized, V: ?Sized>: BTreePage<K, V> + core::fmt::Debug {
/// Initialise a page.
fn init(page: &mut MutPage);
/// Add an entry to the page, possibly splitting the page in the
/// process.
///
/// Makes the assumption that `k1v1.is_some()` implies
/// `replace`. When `k1v1.is_some()`, we insert both `(k0, v0)`
/// (as a replacement), followed by `(k1, v1)`. This is only ever
/// used when deleting, and when the right child of a deleted
/// entry (in an internal node) has split while we were looking
/// for a replacement for the deleted entry.
///
/// Since we only look for replacements in the right child, the
/// left child of the insertion isn't modified, in which case `l`
/// and `r` are interpreted as the left and right child of the new
/// entry, `k1v1`.
async fn put<'a, T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
c: &Self::Cursor,
k0: &'a K,
v0: &'a V,
k1v1: Option<(&'a K, &'a V)>,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error>;
/// Add an entry to `page`, at position `c`. Does not check
/// whether there is enough space to do so. This method is mostly
/// useful for cloning pages.
#[allow(unused_variables)]
unsafe fn put_mut(
page: &mut MutPage,
c: &mut Self::Cursor,
k0: &K,
v0: &V,
r: u64,
);
#[allow(unused_variables)]
unsafe fn set_left_child(
page: &mut MutPage,
c: &Self::Cursor,
l: u64
);
/// Update the left child of the position pointed to by the
/// cursor.
async fn update_left_child<T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
c: &Self::Cursor,
r: u64,
) -> Result<crate::btree::put::Ok, T::Error>;
type Saved;
/// Save a leaf entry as a replacement, when we delete at an
/// internal node. This can be a pointer to the leaf if the leaf
/// isn't mutated, or the actual value, or something else.
fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved;
/// Recover the saved value.
unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V);
/// Delete an entry on the page, returning the resuting page along
/// with the offset of the freed page (or 0 if no page was freed,
/// i.e. if `page` is mutable).
async fn del<T: AllocPage>(
txn: &mut T,
page: CowPage,
mutable: bool,
c: &Self::Cursor,
l: u64,
) -> Result<(MutPage, u64), T::Error>;
/// Merge, rebalance or update a concatenation.
async fn merge_or_rebalance<'a, T: AllocPage>(
txn: &mut T,
m: del::Concat<'a, K, V, Self>,
) -> Result<del::Op<'a, T, K, V>, T::Error>;
}
/// A database, which is essentially just a page offset along with markers.
#[derive(Debug)]
pub struct Db_<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
pub db: u64,
pub k: core::marker::PhantomData<K>,
pub v: core::marker::PhantomData<V>,
pub p: core::marker::PhantomData<P>,
}
unsafe impl<K, V, P: BTreePage<K, V>> Sync for Db_<K, V, P>{}
/// A database of sized values.
pub type Db<K, V> = Db_<K, V, page::Page<K, V>>;
#[async_trait(?Send)]
impl<K:Storable, V:Storable, P: BTreePage<K, V> + Send + core::fmt::Debug> Storable for Db_<K, V, P> {
type PageReferences = core::iter::Once<u64>;
fn page_references(&self) -> Self::PageReferences {
core::iter::once(self.db)
}
async fn drop<T: AllocPage>(&self, t: &mut T) -> Result<(), T::Error> {
drop_(t, self).await
}
}
/// A database of unsized values.
pub type UDb<K, V> = Db_<K, V, page_unsized::Page<K, V>>;
impl<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> Db_<K, V, P> {
/// Load a database from a page offset.
pub fn from_page(db: u64) -> Self {
Db_ {
db,
k: core::marker::PhantomData,
v: core::marker::PhantomData,
p: core::marker::PhantomData,
}
}
}
/// Create a database with an arbitrary page implementation.
pub async fn create_db_<T: AllocPage, K: ?Sized, V: ?Sized, P: BTreeMutPage<K, V>>(
txn: &mut T,
) -> Result<Db_<K, V, P>, T::Error> {
let mut page = txn.alloc_page().await?;
P::init(&mut page);
Ok(Db_ {
db: page.0.offset,
k: core::marker::PhantomData,
v: core::marker::PhantomData,
p: core::marker::PhantomData,
})
}
/// Create a database for sized keys and values.
pub async fn create_db<T: AllocPage, K: Storable, V: Storable>(
txn: &mut T,
) -> Result<Db_<K, V, page::Page<K, V>>, T::Error> {
create_db_(txn).await
}
/// Fork a database.
pub async fn fork_db<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
txn: &mut T,
db: &Db_<K, V, P>,
) -> Result<Db_<K, V, P>, T::Error> {
txn.incr_rc(db.db).await?;
Ok(Db_ {
db: db.db,
k: core::marker::PhantomData,
v: core::marker::PhantomData,
p: core::marker::PhantomData,
})
}
/// Get the first entry of a database greater than or equal to `k` (or
/// to `(k, v)` if `v.is_some()`), and return `true` if this entry is
/// shared with another structure (i.e. the RC of at least one page
/// along a path to the entry is at least 2).
pub async fn get_shared<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(
txn: &'a T,
db: &Db_<K, V, P>,
k: &K,
v: Option<&V>,
) -> Result<Option<(&'a K, &'a V, bool)>, T::Error> {
// Set the "cursor stack" by setting a skip list cursor in
// each page from the root to the appropriate leaf.
let mut last_match = None;
let mut page = txn.load_page(db.db).await?;
let mut is_shared = txn.rc(db.db).await? >= 2;
// This is a for loop, to allow the compiler to unroll (maybe).
for _ in 0..cursor::N_CURSORS {
let mut cursor = P::cursor_before(&page);
if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {
if v.is_some() {
return Ok(Some((kk, vv, is_shared)));
}
last_match = Some((kk, vv, is_shared));
} else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {
// Here, Rust says that `k` and `v` have the same lifetime
// as `page`, but we actually know that they're alive for
// as long as `txn` doesn't change, so we can safely
// extend their lifetimes:
unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v), is_shared)) }
}
// The cursor is set to the first element greater than or
// equal to the (k, v) we're looking for, so we need to
// explore the left subtree.
let next_page = P::left_child(page.as_page(), &cursor);
if next_page > 0 {
page = txn.load_page(next_page).await?;
is_shared |= txn.rc(db.db).await? >= 2;
} else {
break;
}
}
Ok(last_match)
}
/// Get the first entry of a database greater than or equal to `k` (or
/// to `(k, v)` if `v.is_some()`), and return `true` if this entry is
/// shared with another structure (i.e. the RC of at least one page
/// along a path to the entry is at least 2).
pub async fn get<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(
txn: &'a T,
db: &Db_<K, V, P>,
k: &K,
v: Option<&V>,
) -> Result<Option<(&'a K, &'a V)>, T::Error> {
// Unfortunately, since the above function `get_shared` uses this
// one in order to get the RC of pages, we can't really refactor,
// so this is mostly a copy of the other function.
// Set the "cursor stack" by setting a skip list cursor in
// each page from the root to the appropriate leaf.
let mut last_match = None;
let mut page = txn.load_page(db.db).await?;
// This is a for loop, to allow the compiler to unroll (maybe).
for _ in 0..cursor::N_CURSORS {
let mut cursor = P::cursor_before(&page);
if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {
if v.is_some() {
return Ok(Some((kk, vv)));
}
last_match = Some((kk, vv));
} else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {
// Here, Rust says that `k` and `v` have the same lifetime
// as `page`, but we actually know that they're alive for
// as long as `txn` doesn't change, so we can safely
// extend their lifetimes:
unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v))) }
}
// The cursor is set to the first element greater than or
// equal to the (k, v) we're looking for, so we need to
// explore the left subtree.
let next_page = P::left_child(page.as_page(), &cursor);
if next_page > 0 {
page = txn.load_page(next_page).await?;
} else {
break;
}
}
Ok(last_match)
}
/// Drop a database recursively, dropping all referenced keys and
/// values that aren't shared with other databases.
pub async fn drop<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(
txn: &mut T,
db: Db_<K, V, P>,
) -> Result<(), T::Error> {
drop_(txn, &db).await
}
use core::mem::MaybeUninit;
struct PageCursor_<K:?Sized, V:?Sized, P: BTreePage<K, V>>(MaybeUninit<cursor::PageCursor<K, V, P>>);
unsafe impl<K: ?Sized, V:?Sized, P: BTreePage<K, V>> Send for PageCursor_<K, V, P> {}
async fn drop_<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(
txn: &mut T,
db: &Db_<K, V, P>,
) -> Result<(), T::Error> {
let mut stack: [PageCursor_<K, V, P>; cursor::N_CURSORS] = [
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
PageCursor_(MaybeUninit::uninit()),
];
let mut ptr = 0;
// Push the root page of `db` onto the stack.
let page = txn.load_page(db.db).await?;
stack[0] = PageCursor_(MaybeUninit::new(cursor::PageCursor {
cursor: P::cursor_before(&page),
page,
}));
// Then perform a DFS:
loop {
// Look at the top element of the stack.
let cur_off = unsafe { (&*stack[ptr].0.as_ptr()).page.offset };
// If it needs to be dropped (i.e. if its RC is <= 1), iterate
// its cursor and drop all its referenced pages.
let rc = txn.rc(cur_off).await?;
if rc <= 1 {
// if there's a current element in the cursor (i.e. we
// aren't before or after the elements), decrease its RC.
if let Some((k, v, _)) = {
let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
P::current(cur.page.as_page(), &cur.cursor)
} {
k.drop(txn).await?;
v.drop(txn).await?;
}
// In all cases, move next and push the left child onto
// the stack. This works because pushed cursors are
// initially set to before the page's elements.
let r = {
let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
if P::move_next(&mut cur.cursor) {
Some(P::left_child(cur.page.as_page(), &cur.cursor))
} else {
None
}
};
if let Some(r) = r {
if r > 0 {
ptr += 1;
let page = txn.load_page(r).await?;
stack[ptr] = PageCursor_(MaybeUninit::new(cursor::PageCursor {
cursor: P::cursor_before(&page),
page,
}))
}
continue;
}
}
// Here, either rc > 1 (in which case the only thing we need
// to do in this iteration is to decrement the RC), or else
// `P::move_next` returned `false`, meaning that the cursor is
// after the last element (in which case we are done with this
// page, and also need to decrement its RC, in order to free
// it).
let (is_dirty, offset) = {
let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
(cur.page.is_dirty(), cur.page.offset)
};
if is_dirty {
txn.decr_rc_owned(offset).await?;
} else {
txn.decr_rc(offset).await?;
}
// If this was the bottom element of the stack, stop, else, pop.
if ptr == 0 {
break;
} else {
ptr -= 1;
}
}
Ok(())
}
//! Deletions from a B tree.
use super::cursor::*;
use super::put::{Ok, Put};
use super::*;
use crate::LoadPage;
/// Represents the result of a merge or rebalance, without touching
/// the parent of the merge/rebalance.
#[derive(Debug)]
pub enum Op<'a, T, K: ?Sized, V: ?Sized> {
Merged {
// New merged page.
page: MutPage,
// Pages freed by the merge (0 meaning "no page").
freed: [u64; 2],
marker: core::marker::PhantomData<T>,
},
Rebalanced {
// New middle key.
k: &'a K,
// New middle value.
v: &'a V,
// New left page.
l: u64,
// New right page.
r: u64,
// Pages freed by the rebalance (0 meaning "no page").
freed: [u64; 2],
},
Put(crate::btree::put::Put<'a, K, V>),
}
/// Represents a page with modifications from a merge.
#[derive(Debug)]
pub struct ModifiedPage<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
pub page: CowPage,
// Whether the page is mutable with another tree.
pub mutable: bool,
// Start copying c0 (keep `page`'s left child).
pub c0: P::Cursor,
// If > 0, replace the right child of c0's last element with `l`.
pub l: u64,
// If > 0, replace the left child of c1's last element with `r`.
pub r: u64,
// Possibly insert a new binding.
pub ins: Option<(&'a K, &'a V)>,
// If a rebalance causes a split, we might need to insert an entry
// after the replacement.
pub ins2: Option<(&'a K, &'a V)>,
// The first element of c1 is to be deleted, the others must be copied.
pub c1: P::Cursor,
// Whether to skip `c1`'s first element.
pub skip_first: bool,
// Whether the page we just modified is `self.l`.
pub mod_is_left: bool,
}
/// Represents the concatenation of a modified page and one of its
/// sibling (left or right).
#[derive(Debug)]
pub struct Concat<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
/// Modified page.
pub modified: ModifiedPage<'a, K, V, P>,
/// Middle element.
pub mid: (&'a K, &'a V),
/// Sibling of the modified page.
pub other: CowPage,
/// Is the modified field on the left or on the right of the
/// concatenation?
pub mod_is_left: bool,
/// Is the other page owned by this tree? If not, it means `other`
/// is shared with another tree, and hence we need to increase the
/// reference count of entries coming from `other`.
pub other_is_mutable: bool,
}
/// If `value` is `None`, delete the first entry for `key` from the
/// database, if present. Else, delete the entry for `(key, value)`,
/// if present.
pub async fn del<
T: AllocPage + LoadPage,
K: Storable + ?Sized + PartialEq + Sync,
V: Storable + ?Sized + PartialEq + Sync,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
key: &K,
value: Option<&V>,
) -> Result<bool, T::Error> {
let mut cursor = Cursor::new(txn, db).await?;
// If the key and value weren't found, return.
match (cursor.set(txn, key, value).await?, value) {
(Some((k, v)), Some(value)) if k == key && v == value => {}
(Some((k, _)), None) if k == key => {}
_ => return Ok(false),
}
// Else, the cursor is set on `(key, value)` if `value.is_some()`
// or on the first entry with key `key` else.
//
// We delete that position, which might be in a leaf or in an
// internal node.
del_at_cursor(txn, db, &mut cursor, true).await
}
/// If `value` is `None`, delete the first entry for `key` from the
/// database, if present. Else, delete the entry for `(key, value)`,
/// if present.
pub async fn del_no_decr<
T: AllocPage + LoadPage,
K: Storable + ?Sized + PartialEq + Sync,
V: Storable + ?Sized + PartialEq + Sync,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
key: &K,
value: Option<&V>,
) -> Result<bool, T::Error> {
let mut cursor = Cursor::new(txn, db).await?;
// If the key and value weren't found, return.
match (cursor.set(txn, key, value).await?, value) {
(Some((k, v)), Some(value)) if k == key && v == value => {}
(Some((k, _)), None) if k == key => {}
_ => return Ok(false),
}
// Else, the cursor is set on `(key, value)` if `value.is_some()`
// or on the first entry with key `key` else.
//
// We delete that position, which might be in a leaf or in an
// internal node.
del_at_cursor(txn, db, &mut cursor, false).await
}
/// Delete the entry pointed to by `cursor`. The cursor can't be used
/// after this.
#[doc(hidden)]
pub async fn del_at_cursor<
T: AllocPage + LoadPage,
K: Storable + ?Sized + PartialEq + Sync,
V: Storable + ?Sized + Sync,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
cursor: &mut Cursor<K, V, P>,
decr_del_rc: bool,
) -> Result<bool, T::Error> {
let p0 = cursor.len();
// If p0 < cursor.first_rc_level, the page containing the deleted
// element is not shared with another table. Therefore, the RC of
// the pages referenced by the deleted element needs to be
// decreased.
//
// Else, that RC doesn't need to be changed, since it's still
// referenced by the same pages as before, just not by the pages
// that are cloned by this deletion.
//
// Note in particular that if p0 == cursor.first_rc_len(), then we
// don't need to touch the RC, since we're cloning this page, but
// without including a reference to the deleted entry.
if decr_del_rc && p0 < cursor.first_rc_len() {
let (delk, delv, _) = {
let cur = cursor.cur();
P::current(cur.page.as_page(), &cur.cursor).unwrap()
};
delk.drop(txn).await?;
delv.drop(txn).await?;
}
// Find the leftmost page in the right subtree of the deleted
// element, in order to find a replacement.
let cur = cursor.cur();
let right_subtree = P::right_child(cur.page.as_page(), &cur.cursor);
cursor.set_first_leaf(txn, right_subtree).await?;
let leaf_is_shared = cursor.len() >= cursor.first_rc_len();
// In the leaf, mark the replacement for deletion, and keep a
// reference to it in k0v0 if the deletion happens in an internal
// node (k0v0 is `Option::None` else).
let (mut last_op, k0v0) = leaf_delete(txn, cursor, p0).await?;
// If the leaf is shared with another tree, then since we're
// cloning the leaf, update the reference counts of all the
// references contained in the leaf.
if leaf_is_shared {
modify_rc(txn, &last_op).await?;
}
let mut free = [[0, 0]; N_CURSORS];
// Then, climb up the stack, performing the operations lazily. At
// each step, we are one level above the page that we plan to
// modify, since `last_op` is the result of popping the
// stack.
//
// We iterate up to the root. The last iteration builds a modified
// page for the root, but doesn't actually execute it.
while cursor.len() > 0 {
// Prepare a plan for merging the current modified page (which
// is stored in `last_op`) with one of its neighbours.
let concat = concat(txn, cursor, p0, &k0v0, last_op).await?;
let mil = concat.mod_is_left;
let incr_page = if !concat.other_is_mutable {
Some(CowPage {
offset: concat.other.offset,
data: concat.other.data,
})
} else {
None
};
let incr_mid = if cursor.len() >= cursor.first_rc_len() {
Some(concat.mid)
} else {
None
};
// Execute the modification plan, resulting in one of four
// different outcomes (described in the big match in
// `handle_merge`). This mutates or clones the current
// modified page, returning an `Op` describing what happened
// (between update, merge, rebalance and split).
let merge = P::merge_or_rebalance(txn, concat).await?;
// Prepare a description (`last_op`) of the next page
// modification, without mutating nor cloning that page.
last_op = handle_merge(txn, cursor, p0, &k0v0, incr_page, incr_mid, mil, merge, &mut free).await?;
// If the modified page is shared with another tree, we will
// clone it in the next round. Therefore, update the reference
// counts of all the references in the modified page.
//
// Since `handle_merge` pops the stack, the modified page is
// at level `cursor.len() + 1`.
if cursor.len() + 1 >= cursor.first_rc_len() {
modify_rc(txn, &last_op).await?;
}
}
// The last modification plan was on the root, and still needs to
// be executed.
let root_is_shared = cursor.first_rc_len() == 1;
update_root(txn, db, last_op, k0v0, root_is_shared, &mut free).await?;
// Finally, free all the pages marked as free during the deletion,
// now that we don't need to read them anymore.
for p in free.iter() {
if p[0] & 1 == 1 {
txn.decr_rc_owned(p[0] ^ 1).await?;
} else if p[0] > 0 {
txn.decr_rc(p[0]).await?;
}
if p[1] & 1 == 1 {
txn.decr_rc_owned(p[1] ^ 1).await?;
} else if p[1] > 0 {
txn.decr_rc(p[1]).await?;
}
}
Ok(true)
}
/// Preparing a modified page for the leaf.
async fn leaf_delete<
'a,
T: AllocPage + LoadPage,
K: Storable + ?Sized + 'a,
V: Storable + ?Sized + 'a,
P: BTreeMutPage<K, V> + 'a,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
p0: usize,
) -> Result<(ModifiedPage<'a, K, V, P>, Option<P::Saved>), T::Error> {
let is_rc = cursor.len() >= cursor.first_rc_len();
let del_at_internal = p0 < cursor.len();
let curs0 = cursor.pop().unwrap();
let (c0, c1) = P::split_at(&curs0.cursor);
let mut deleted = None;
if del_at_internal {
// In this case, we need to save the replacement, and if this
// leaf is shared with another tree, we also need increment
// the replacement's references, since we are copying it.
let (k, v, _) = P::current(curs0.page.as_page(), &c1).unwrap();
if is_rc {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
}
deleted = Some(P::save_deleted_leaf_entry(k, v))
}
Ok((
ModifiedPage {
page: curs0.page,
mutable: !is_rc,
c0,
c1,
skip_first: true,
l: 0,
r: 0,
ins: None,
ins2: None,
// The following (`mod_is_left`) is meaningless in this
// context, and isn't actually used: indeed, the only
// place where this field is used is when modifying the
// root, when `ins.is_some()`.
mod_is_left: true,
},
deleted,
))
}
/// From a cursor at level `p = cursor.len()`, form the
/// concatenation of `last_op` (which is a modified page at level p +
/// 1`, and its left or right sibling, depending on the case.
async fn concat<
'a,
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
p0: usize,
k0v0: &Option<P::Saved>,
last_op: ModifiedPage<'a, K, V, P>,
) -> Result<Concat<'a, K, V, P>, T::Error> {
let p = cursor.len();
let rc_level = cursor.first_rc_len();
let curs = cursor.current_mut();
if p == p0 {
// Here, p == p0, meaning that we're deleting at an internal
// node. If that's the case, k0v0 is `Some`, hence we can
// safely unwrap the replacement.
let (k0, v0) = unsafe { P::from_saved(k0v0.as_ref().unwrap()) };
// Since we've picked the leftmost entry of the right child of
// the deleted entry, the other page to consider in the
// concatenation is the left child of the deleted entry.
let other = txn.load_page(P::left_child(curs.page.as_page(), &curs.cursor)).await?;
// Then, if the page at level `p` or one of its ancestor, is
// pointed at least twice (i.e. if `p >= rc_level`, or
// alternatively if `other` is itself pointed at least twice,
// the page is immutable, meaning that we can't delete on it
// when rebalancing.
let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;
Ok(Concat {
modified: last_op,
mid: (k0, v0),
other,
mod_is_left: false,
other_is_mutable,
})
} else {
// Here, `p` is not a leaf (but `last_op` might be), and does
// not contain the deleted entry.
// In this case, the visited page at level `p+1` is always on
// the left-hand side of the cursor at level `p` (this is an
// invariant of cursors). However, if the cursor at level `p`
// is past the end of the page, we need to move one step back
// to find a middle element for the concatenation, in which
// case the visited page becomes the right-hand side of the
// cursor.
let ((k, v, r), mod_is_left) =
if let Some(x) = P::current(curs.page.as_page(), &curs.cursor) {
// Not the last element of the page.
(x, true)
} else {
// Last element of the page.
let (k, v, _) = P::prev(curs.page.as_page(), &mut curs.cursor).unwrap();
let l = P::left_child(curs.page.as_page(), &curs.cursor);
((k, v, l), false)
};
let other = txn.load_page(r).await?;
let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;
Ok(Concat {
modified: last_op,
// The middle element comes from the page above this
// concatenation, and hence has the same lifetime as that
// page. However, our choice of lifetimes can't express
// this, but we also know that we are not going to mutate
// the parent before rebalancing or merging the
// concatenation, and hence this pointer will be alive
// during these operations (i.e. during the merge or
// rebalance).
mid: unsafe { (core::mem::transmute(k), core::mem::transmute(v)) },
other,
mod_is_left,
other_is_mutable,
})
}
}
/// Prepare a modified version of the page at the current level `p =
/// cursor.len()`.
async fn handle_merge<
'a,
T: AllocPage + LoadPage,
K: Storable + ?Sized + PartialEq,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
p0: usize,
k0v0: &Option<P::Saved>,
incr_other: Option<CowPage>,
incr_mid: Option<(&K, &V)>,
mod_is_left: bool, // The modified page in the `merge` is the left one.
merge: Op<'a, T, K, V>,
free: &mut [[u64; 2]; N_CURSORS],
) -> Result<ModifiedPage<'a, K, V, P>, T::Error> {
let mutable = cursor.len() < cursor.first_rc_len();
let mut last_op = {
// Beware, a stack pop happens here, all subsequent references
// to the pointer must be updated.
let curs = cursor.pop().unwrap();
let (c0, c1) = P::split_at(&curs.cursor);
ModifiedPage {
page: curs.page,
mutable,
c0,
l: 0,
r: 0,
ins: None,
ins2: None,
c1,
skip_first: true,
mod_is_left,
}
};
// For merges and rebalances, take care of the reference counts of
// pages and key/values.
match merge {
Op::Merged {.. } | Op::Rebalanced { .. } => {
// Increase the RC of the "other page's" descendants. In
// the case of a rebalance, this has the effect of
// increasing the RC of the new middle entry if that entry
// comes from a shared page, which is what we want.
if let Some(other) = incr_other {
let mut curs = P::cursor_first(&other);
let left = P::left_child(other.as_page(), &curs);
if left > 0 {
txn.incr_rc(left).await?;
}
while let Some((k0, v0, r)) = P::next(other.as_page(), &mut curs) {
for o in k0.page_references().chain(v0.page_references()) {
txn.incr_rc(o).await?;
}
if r != 0 {
txn.incr_rc(r).await?;
}
}
}
// If the middle element comes from a shared page,
// increment its references.
if let Some((ref k, ref v)) = incr_mid {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
}
}
_ => {}
}
// Here, we match on `merge`, the operation that just happened at
// level `cursor.len() + 1`, and build the modification plan
// for the page at the current level (i.e. at level
// `cursor.len()`).
let freed = match merge {
Op::Merged {
page,
freed,
marker: _,
} => {
// If we're deleting at an internal node, the
// replacement has already been included in the merged
// page.
last_op.l = page.0.offset;
freed
}
Op::Rebalanced {
k,
v,
l,
r,
freed,
} => {
// If we're deleting at an internal node, the
// replacement is already in pages `l` or `r`.
last_op.l = l;
last_op.r = r;
last_op.ins = Some((k, v));
freed
}
Op::Put(Put::Ok(Ok { page, freed })) => {
// No merge, split or rebalance has happened. If we're
// deleting at an internal node (i.e. if cursor.len ==
// p0), we need to mark the replacement here.
if cursor.len() + 1 == p0 {
// We have already incremented the references of the
// replacement at the leaf.
if let Some(k0v0) = k0v0 {
last_op.ins = Some(unsafe { P::from_saved(k0v0) });
}
last_op.r = page.0.offset;
} else {
last_op.skip_first = false;
if mod_is_left {
last_op.l = page.0.offset;
} else {
last_op.r = page.0.offset;
}
}
[freed, 0]
}
Op::Put(Put::Split {
left,
right,
split_key,
split_value,
freed,
}) => {
// This case only happens if `(K, V)` is not `Sized`, when
// either (1) a rebalance replaces a key/value pair with a
// larger one or (2) another split has happened in a page
// below.
let split_key_is_k0 = if cursor.len() == p0 {
// In this case, ins2 comes after ins, since the
// replacement is in the right child of the deleted
// key.
if let Some(k0v0) = k0v0 {
last_op.ins = Some(unsafe { P::from_saved(k0v0) });
}
last_op.ins2 = Some((split_key, split_value));
false
} else {
last_op.ins = Some((split_key, split_value));
last_op.skip_first = false;
// If the page modified in the last step is the one on
// the right of the current entry, move right one step
// before inserting the split key/value.
//
// (remember that we popped the stack upon entering
// this function).
//
// We need to do this because the split key/value is
// inserted immediately *before* the current cursor
// position, which is incorrect if the split key/value
// comes from the right-hand side of the current
// cursor position.
//
// This can happen in exactly two situations:
// - when the element we are deleting is the one we are
// skipping here.
// - when we are deleting in the rightmost child of a
// page.
if cursor.len() > 0 && !mod_is_left {
P::move_next(&mut last_op.c1);
}
// If the split key is the replacement element, we have
// already increased its counter when we deleted it from
// its original position at the bottom of the tree.
//
// This can happen if we replaced an element and that
// replacement caused a split with the replacement as the
// middle element.
if let Some(k0v0) = k0v0 {
assert!(cursor.len() + 1 < p0);
let (k0, _) = unsafe { P::from_saved(k0v0) };
core::ptr::eq(k0, split_key)
} else {
false
}
};
// The `l` and `r` fields are relative to `ins2` if
// `ins2.is_some()` or to `ins` else.
last_op.l = left.0.offset;
last_op.r = right.0.offset;
if cursor.len() + 2 >= cursor.first_rc_len() && !split_key_is_k0 {
for o in split_key
.page_references()
.chain(split_value.page_references())
{
txn.incr_rc(o).await?;
}
}
[freed, 0]
}
};
// Free the page(s) at level `cursor.len() + 1` if it isn't
// shared with another tree, or if it is the first level shared
// with another tree.
if cursor.len() + 1 < cursor.first_rc_len() {
free[cursor.len() + 1] = freed;
}
Ok(last_op)
}
// This function modifies the reference counts of references of the
// modified page, which is the page *about to be* modified.
//
// This function is always called when `m` is an internal node.
async fn modify_rc<
'a,
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
>(
txn: &mut T,
m: &ModifiedPage<'a, K, V, P>,
) -> Result<(), T::Error> {
let mut c0 = m.c0.clone();
let mut c1 = m.c1.clone();
let mut left = P::left_child(m.page.as_page(), &c0);
while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c0) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
if left != 0 {
txn.incr_rc(left).await?;
}
left = r;
}
// The insertions are taken care of in `handle_merge` above,
// so we can directly move to the `c1` part of the
// modification.
if m.skip_first {
P::move_next(&mut c1);
}
// If we are not updating the left child of c1's first
// element, increment that left child.
if m.l == 0 {
if left != 0 {
txn.incr_rc(left).await?;
}
}
// If we are not updating the right child of the first
// element of `c1`, increment that right child's RC.
if let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
// If `m.skip_first`, we have already skipped the entry above,
// so this `r` has nothing to do with any update.
//
// Else, if we aren't skipping, but also aren't updating the
// right child of the current entry, also increase the RC.
if (m.skip_first || m.r == 0) && r != 0 {
txn.incr_rc(r).await?;
}
}
// Finally, increment the RCs of all other elements in `c1`.
while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
if r != 0 {
txn.incr_rc(r).await?;
}
}
Ok(())
}
async fn update_root<
'a,
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreeMutPage<K, V>,
>(
txn: &mut T,
db: &mut Db_<K, V, P>,
last_op: ModifiedPage<'a, K, V, P>,
k0v0: Option<P::Saved>,
is_rc: bool,
free: &mut [[u64; 2]; N_CURSORS],
) -> Result<(), T::Error> {
if let Some(d) = single_child(&last_op) {
// If the root had only one element, and the last operation
// was a merge, this decreases the depth of the tree.
//
// We don't do this if the table is empty (i.e. if `d == 0`),
// in order to be consistent with put and drop.
if d > 0 {
if last_op.page.is_dirty() {
txn.decr_rc_owned(last_op.page.offset).await?;
} else {
txn.decr_rc(last_op.page.offset).await?;
}
db.db = d;
} else {
// The page becomes empty.
let (page, freed) = P::del(txn, last_op.page, last_op.mutable, &last_op.c1, d).await?;
free[0][0] = freed;
db.db = page.0.offset
}
return Ok(());
}
// Else, the last operation `last_op` was relative to the root,
// but it hasn't been applied yet. We apply it now:
match modify::<_, K, V, P>(txn, last_op).await? {
Put::Ok(Ok { page, freed }) => {
// Here, the root was simply updated (i.e. some elements
// might have been deleted/inserted/updated), so we just
// need to update the Db.
free[0][0] = freed;
db.db = page.0.offset
}
Put::Split {
split_key: k,
split_value: v,
left: MutPage(CowPage { offset: l, .. }),
right: MutPage(CowPage { offset: r, .. }),
freed,
} => {
// Else, the root has split, and we need to allocate a new
// page to hold the split key/value and the two sides of
// the split.
free[0][0] = freed;
let mut page = txn.alloc_page().await?;
P::init(&mut page);
let mut c = P::cursor_before(&page.0);
P::move_next(&mut c);
let page = if let Put::Ok(p) = P::put(txn, page.0, true, false, &c, k, v, None, l, r).await? {
p.page
} else {
unreachable!()
};
let split_key_is_k0 = if let Some(ref k0v0) = k0v0 {
let (k0, _) = unsafe { P::from_saved(k0v0) };
core::ptr::eq(k0, k)
} else {
false
};
// If the root isn't shared with another tree, and the
// split key is different from the replacement, increment
// the RCs of the references contained in the split
// key/value.
//
// The replacement need not be incremented again, since it
// was already increment when we took it from the page in
// `leaf_delete`.
if is_rc && !split_key_is_k0 {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o).await?;
}
}
// Finally, update the database.
db.db = page.0.offset
}
}
Ok(())
}
fn single_child<K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
m: &ModifiedPage<K, V, P>,
) -> Option<u64> {
let mut c1 = m.c1.clone();
if m.skip_first {
P::move_next(&mut c1);
}
if P::is_empty(&m.c0) && m.ins.is_none() && P::is_empty(&c1) {
Some(m.l)
} else {
None
}
}
/// Apply the modifications to a page. This is exclusively used for
/// the root page, because other modifications are applied during a
/// merge/rebalance attempts.
async fn modify<'a, T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
txn: &mut T,
m: ModifiedPage<'a, K, V, P>,
) -> Result<super::put::Put<'a, K, V>, T::Error> {
if let Some((k, v)) = m.ins {
// The following call might actually replace the first element
// of `m.c1` if `m.skip_first`.
let mut c1 = m.c1.clone();
if !m.skip_first && !m.mod_is_left {
// This means that the page below just split, since we
// have to insert an extra entry on the root page.
//
// However, the extra entry is to be inserted (by
// `P::put`) *before* `c1`'s first element, which is
// incorrect since the page that split is the right child
// of `c1`'s first element. Therefore, we need to move
// `c1` one notch to the right.
assert!(m.ins2.is_none());
P::move_next(&mut c1);
}
P::put(
txn,
m.page,
m.mutable,
m.skip_first,
&c1,
k,
v,
m.ins2,
m.l,
m.r,
).await
} else {
// If there's no insertion to do, we might still need to
// delete elements, or update children.
if m.skip_first {
let (page, freed) = P::del(txn, m.page, m.mutable, &m.c1, m.l).await?;
Ok(Put::Ok(Ok { freed, page }))
} else if m.l > 0 {
// If the left page of the current entry has changed,
// update it.
Ok(Put::Ok(P::update_left_child(
txn, m.page, m.mutable, &m.c1, m.l,
).await?))
} else {
// If there's no insertion, and `m.l == 0`, we need to
// update the right child of the current entry. The
// following moves one step to the right and updates the
// left child:
let mut c1 = m.c1.clone();
P::move_next(&mut c1);
Ok(Put::Ok(P::update_left_child(
txn, m.page, m.mutable, &c1, m.r,
).await?))
}
}
}
use super::*;
use crate::{CowPage, LoadPage};
use core::mem::MaybeUninit;
#[derive(Debug)]
pub(super) struct PageCursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
pub page: CowPage,
pub cursor: P::Cursor,
}
unsafe impl<K, V, P: BTreePage<K, V>> Send for PageCursor<K, V, P>{}
unsafe impl<K, V, P: BTreePage<K, V>> Sync for PageCursor<K, V, P>{}
// This is 1 + the maximal depth of a tree.
//
// Since pages are of size 2^12, there are at most 2^52 addressable
// pages (potentially less depending on the platform). Since each page
// of a B tree below the root has at least 2 elements (because each
// page is at least half-full, and elements are at most 1/4th of a
// page), the arity is at least 3, except for the root. Since 3^33 is
// the smallest power of 3 larger than 2^52, the maximum depth is 33.
pub(crate) const N_CURSORS: usize = 33;
#[derive(Debug)]
/// A position in a B tree.
pub struct Cursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
/// Invariant: the first `len` items are initialised.
stack: [core::mem::MaybeUninit<PageCursor<K, V, P>>; N_CURSORS],
/// The length of the stack at the position of the first page
/// shared with another tree. This definition is a little bit
/// convoluted, but allows for efficient comparisons between
/// `first_rc_len` and `len` to check whether a page is shared
/// with another tree.
first_rc_len: usize,
/// Number of initialised items on the stack.
len: usize,
}
unsafe impl<K, V, P: BTreePage<K, V>> Send for Cursor<K, V, P>{}
unsafe impl<K, V, P: BTreePage<K, V>> Sync for Cursor<K, V, P>{}
impl<'a, K: 'a + ?Sized + Storable, V: 'a + ?Sized + Storable, P: BTreePage<K, V>> Cursor<K, V, P> {
/// Create a new cursor, initialised to the first entry of the B tree.
pub async fn new<T: LoadPage>(txn: &T, db: &Db_<K, V, P>) -> Result<Self, T::Error> {
// Looking forward to getting array initialisation stabilised :)
let mut stack = [
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
core::mem::MaybeUninit::uninit(),
];
let page = txn.load_page(db.db).await?;
stack[0] = MaybeUninit::new(PageCursor {
cursor: P::cursor_before(&page),
page,
});
Ok(Cursor {
stack,
first_rc_len: N_CURSORS,
len: 1,
})
}
pub(super) fn push(&mut self, p: PageCursor<K, V, P>) {
self.stack[self.len] = MaybeUninit::new(p);
self.len += 1;
}
pub(super) fn cur(&self) -> &PageCursor<K, V, P> {
assert!(self.len > 0);
unsafe { &*self.stack[self.len - 1].as_ptr() }
}
pub(super) fn len(&self) -> usize {
self.len
}
pub(super) fn first_rc_len(&self) -> usize {
self.first_rc_len
}
pub(super) fn pop(&mut self) -> Option<PageCursor<K, V, P>> {
if self.len > 0 {
self.len -= 1;
let result = core::mem::replace(&mut self.stack[self.len], MaybeUninit::uninit());
Some(unsafe { result.assume_init() })
} else {
None
}
}
pub(super) fn current_mut(&mut self) -> &mut PageCursor<K, V, P> {
assert!(self.len > 0);
unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() }
}
/// Push the leftmost path starting at page `left_page` onto the
/// stack.
pub(super) async fn set_first_leaf<T: LoadPage>(
&mut self,
txn: &T,
mut left_page: u64,
) -> Result<(), T::Error> {
while left_page > 0 {
if self.first_rc_len >= N_CURSORS && txn.rc(left_page).await? >= 2 {
self.first_rc_len = self.len + 1
}
let page = txn.load_page(left_page).await?;
let curs = P::cursor_first(&page);
left_page = P::left_child(page.as_page(), &curs);
self.push(PageCursor { page, cursor: curs });
}
Ok(())
}
/// Reset the cursor to the first element of the database.
pub fn reset<T: LoadPage>(&mut self) {
self.len = 1;
let init = unsafe { &mut *self.stack[0].as_mut_ptr() };
init.cursor = P::cursor_before(&init.page);
}
// An invariant of cursors, fundamental to understanding the
// `next` and `prev` functions below, is that the lower levels (in
// the tree, upper levels on the stack) are the left children of
// the cursor's position on a page.
/// Set the cursor to the first position larger than or equal to
/// the specified key and value.
pub async fn set<T: LoadPage>(
&mut self,
txn: &'a T,
k: &K,
v: Option<&V>,
) -> Result<Option<(&'a K, &'a V)>, T::Error> {
// Set the "cursor stack" by setting a cursor in each page
// on a path from the root to the appropriate leaf.
// Start from the bottom of the stack, which is also the root
// of the tree.
self.len = 1;
self.first_rc_len = N_CURSORS;
let init = unsafe { &mut *self.stack[0].as_mut_ptr() };
init.cursor = P::cursor_before(&init.page);
let mut last_matching_page = N_CURSORS;
let mut last_match = None;
while self.len < N_CURSORS {
let current = unsafe { &mut *self.stack.get_unchecked_mut(self.len - 1).as_mut_ptr() };
if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {
self.first_rc_len = self.len
}
let ref mut cursor = current.cursor;
if let Ok((kk, vv, _)) = P::set_cursor(txn, current.page.as_page(), cursor, k, v).await {
if v.is_some() {
return Ok(Some((kk, vv)));
}
last_match = Some((kk, vv));
last_matching_page = self.len
}
let next_page = P::left_child(current.page.as_page(), cursor);
if next_page > 0 {
let page = txn.load_page(next_page).await?;
self.push(PageCursor {
cursor: P::cursor_before(&page),
page,
});
} else {
break;
}
}
if last_matching_page < N_CURSORS {
self.len = last_matching_page;
Ok(last_match)
} else {
Ok(None)
}
}
/// Set the cursor after the last element, so that [`Self::next`]
/// returns `None`, and [`Self::prev`] returns the last element.
pub async fn set_last< T: LoadPage>(
&mut self,
txn: &'a T,
) -> Result<(), T::Error> {
self.len = 1;
self.first_rc_len = N_CURSORS;
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
current.cursor = P::cursor_after(¤t.page);
loop {
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {
self.first_rc_len = self.len
}
let l = P::left_child(current.page.as_page(), ¤t.cursor);
if l > 0 {
let page = txn.load_page(l).await?;
self.push(PageCursor {
cursor: P::cursor_after(&page),
page,
})
} else {
break;
}
}
Ok(())
}
/// Return the current position of the cursor.
pub fn current<T: LoadPage>(
&mut self,
_txn: &'a T,
) -> Result<Option<(&'a K, &'a V)>, T::Error> {
loop {
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
if P::is_init(¤t.cursor) {
// The cursor hasn't been set.
return Ok(None)
} else if let Some((k, v, _)) = P::current(current.page.as_page(), ¤t.cursor)
{
unsafe {
return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
}
} else if self.len > 1 {
self.len -= 1
} else {
// We're past the last element at the root.
return Ok(None);
}
}
}
/// Return the current position of the cursor, and move the cursor
/// to the next position.
pub async fn next< T: LoadPage>(
&mut self,
txn: &'a T,
) -> Result<Option<(&'a K, &'a V)>, T::Error> {
loop {
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
if P::is_empty(¤t.cursor) {
if self.len > 1 {
if self.first_rc_len == self.len {
self.first_rc_len = N_CURSORS
}
self.len -= 1
} else {
return Ok(None);
}
} else {
let (cur_entry, r) = if let Some((k, v, r)) = P::current(current.page.as_page(), ¤t.cursor) {
(Some((k, v)), r)
} else {
(None, P::right_child(current.page.as_page(), ¤t.cursor))
};
P::move_next(&mut current.cursor);
if r != 0 {
let page = txn.load_page(r).await?;
self.push(PageCursor {
cursor: P::cursor_before(&page),
page,
});
if self.first_rc_len >= N_CURSORS && txn.rc(r).await? >= 2 {
self.first_rc_len = self.len
}
}
if let Some((k, v)) = cur_entry {
unsafe {
return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
}
}
}
}
}
/// Move the cursor to the previous entry, and return the entry
/// that was current before the move. If the cursor is initially
/// after all the entries, this moves it back by two steps.
pub async fn prev< T: LoadPage>(
&mut self,
txn: &'a T,
) -> Result<Option<(&'a K, &'a V)>, T::Error> {
loop {
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
if P::is_init(¤t.cursor) {
if self.len > 1 {
if self.first_rc_len == self.len {
self.first_rc_len = N_CURSORS
}
self.len -= 1;
let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
P::move_prev(&mut current.cursor);
} else {
return Ok(None);
}
} else {
let cur_entry = P::current(current.page.as_page(), ¤t.cursor);
let left = P::left_child(current.page.as_page(), ¤t.cursor);
if left != 0 {
let page = txn.load_page(left).await?;
self.push(PageCursor {
cursor: P::cursor_after(&page),
page,
});
if self.first_rc_len >= N_CURSORS && txn.rc(left).await? >= 2 {
self.first_rc_len = self.len
}
} else {
// We are at a leaf.
P::move_prev(&mut current.cursor);
}
if let Some((k, v, _)) = cur_entry {
unsafe {
return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
}
}
}
}
}
}
pub struct Iter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>> {
txn: &'a T,
cursor: Cursor<K, V, P>,
}
pub async fn iter<'a, T, K, V, P>(
txn: &'a T,
db: &super::Db_<K, V, P>,
origin: Option<(&K, Option<&V>)>,
) -> Result<Iter<'a, T, K, V, P>, T::Error>
where
T: LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
{
let mut cursor = Cursor::new(txn, db).await?;
if let Some((k, v)) = origin {
cursor.set(txn, k, v).await?;
}
Ok(Iter { cursor, txn })
}
impl<
'a,
T: LoadPage,
K: Storable + ?Sized + 'a,
V: Storable + ?Sized + 'a,
P: BTreePage<K, V> + 'a,
> Iter<'a, T, K, V, P>
{
pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{
self.cursor.prev(self.txn).await
}
}
pub struct RevIter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>
{
txn: &'a T,
cursor: Cursor<K, V, P>,
}
pub async fn rev_iter<'a, T, K, V, P>(
txn: &'a T,
db: &super::Db_<K, V, P>,
origin: Option<(&K, Option<&V>)>,
) -> Result<RevIter<'a, T, K, V, P>, T::Error>
where
T: LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
{
let mut cursor = Cursor::new(txn, db).await?;
if let Some((k, v)) = origin {
cursor.set(txn, k, v).await?;
} else {
cursor.set_last(txn).await?;
}
Ok(RevIter { cursor, txn })
}
impl<
'a,
T: LoadPage,
K: Storable + ?Sized + 'a,
V: Storable + ?Sized + 'a,
P: BTreePage<K, V> + 'a,
> RevIter<'a, T, K, V, P>
{
pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{
self.cursor.prev(self.txn).await
}
}
[package]
name = "sanakirja-core-async"
version = "0.0.1"
edition = "2018"
description = "Copy-on-write datastructures, storable on disk (or elsewhere) with a stable format."
authors = ["Pierre-Étienne Meunier", "Florent Becker"]
license = "MIT/Apache-2.0"
documentation = "https://docs.rs/sanakirja-core-async"
repository = "https://nest.pijul.com/pijul/sanakirja"
include = [
"Cargo.toml",
"src/lib.rs",
"src",
"src/btree",
"src/btree/page_cursor.rs",
"src/btree/page_unsized",
"src/btree/page_unsized/alloc.rs",
"src/btree/page_unsized/rebalance.rs",
"src/btree/page_unsized/header.rs",
"src/btree/page_unsized/put.rs",
"src/btree/page_unsized/cursor.rs",
"src/btree/put.rs",
"src/btree/page",
"src/btree/page/alloc.rs",
"src/btree/page/rebalance.rs",
"src/btree/page/put.rs",
"src/btree/del.rs",
"src/btree/mod.rs",
"src/btree/page_unsized.rs",
"src/btree/cursor.rs",
"src/btree/page.rs"
]
[features]
crc32 = [ "crc32fast" ]
std = []
ed25519 = [ "ed25519-zebra", "ed25519-zebra/serde" ]
[dependencies]
crc32fast = { version = "1.2", optional = true, default-features = false }
uuid = { version = "0.8", optional = true }
ed25519-zebra = { version = "2.2", optional = true }
async-trait = "0.1"