KK3SBH4P3FBZ3ER344TO6WEK7EX5AFB57UX2G5PKANG3NUF5IQLAC //! This crate defines tools to implement datastructures that can live//! in main memory or on-disk, meaning that their natural habitat is//! memory-mapped files, but if that environment is threatened, they//! might seek refuge in lower-level environments.//!//! One core building block of this library is the notion of virtual//! memory pages, which are allocated and freed by an//! externally-provided allocator (see how the `sanakirja` crate does//! this). The particular implementation used here is meant to allow a//! transactional system with readers reading the structures//! concurrently with one writer at a time.//!//! At the moment, only B trees are implemented, as well as the//! following general traits://!//! - [`LoadPage`] is a trait used to get a pointer to a page. In the//! most basic version, this may just return a pointer to the file,//! offset by the requested offset. In more sophisticated versions,//! this can be used to encrypt and compress pages.//! - [`AllocPage`] allocates and frees pages, because as//! datastructures need to be persisted on disk, we can't rely on//! Rust's memory management to do it for us. Users of this crate//! don't have to worry about this though.//!//! Moreover, two other traits can be used to store things on pages://! [`Storable`] is a simple trait that all `Sized + Ord` types//! without references can readily implement (the [`direct_repr!`]//! macro does that). For types containing references to pages//! allocated in the database, the comparison function can be//! customised. Moreover, these types must supply an iterator over//! these references, in order for reference-counting to work properly//! when the datastructures referencing these types are forked.//!//! Dynamically-sized types, or types that need to be represented in a//! dynamically-sized way, can use the [`UnsizedStorable`] format.use async_trait::async_trait;pub mod btree;/// There's a hard-coded assumption that pages have 4K bytes. This is/// true for normal memory pages on almost all platforms.pub const PAGE_SIZE: usize = 4096;/// Types that can be stored on disk. This trait may be used in/// conjunction with `Sized` in order to determine the on-disk size,/// or with [`UnsizedStorable`] when special arrangements are needed.#[async_trait(?Send)]pub trait Storable: Send + Sync + core::fmt::Debug {/// This is required for B trees, not necessarily for other/// structures. The default implementation panics.fn compare(&self, _b: &Self) -> core::cmp::Ordering {unimplemented!()}/// If this value is an offset to another page at offset `offset`,/// return `Some(offset)`. Return `None` else.fn page_references(&self) -> Self::PageReferences;/// If this value is an offset to another page at offset `offset`,/// return `Some(offset)`. Return `None` else.async fn drop<T: AllocPage>(&self, txn: &mut T) -> Result<(), T::Error> {for p in self.page_references() {txn.decr_rc(p).await?;}Ok(())}/// An iterator over the offsets to pages contained in this/// value. Only values from this crate can generate non-empty/// iterators, but combined values (like tuples) must chain the/// iterators returned by method `page_offsets`.type PageReferences: Iterator<Item = u64> + Send;}/// A macro to implement [`Storable`] on "plain" types,/// i.e. fixed-sized types that are `repr(C)` and don't hold/// references.#[macro_export]macro_rules! direct_repr {($t: ty) => {impl Storable for $t {type PageReferences = core::iter::Empty<u64>;fn page_references(&self) -> Self::PageReferences {core::iter::empty()}fn compare(&self, b: &Self) -> core::cmp::Ordering {self.cmp(b)}}impl UnsizedStorable for $t {const ALIGN: usize = core::mem::align_of::<$t>();/// If `Self::SIZE.is_some()`, this must return the same/// value. The default implementation is `Self;:SIZE.unwrap()`.fn size(&self) -> usize {core::mem::size_of::<Self>()}/// Read the size from an on-page entry.unsafe fn onpage_size(_: *const u8) -> usize {core::mem::size_of::<Self>()}/// Write to a page. Must not overwrite the allocated size, but/// this isn't checked (which is why it's unsafe).unsafe fn write_to_page(&self, p: *mut u8) {core::ptr::copy_nonoverlapping(self, p as *mut Self, 1)}unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {&*(p as *const Self)}}};}direct_repr!(());direct_repr!(u8);direct_repr!(i8);direct_repr!(u16);direct_repr!(i16);direct_repr!(u32);direct_repr!(i32);direct_repr!(u64);direct_repr!(i64);direct_repr!([u8; 16]);#[cfg(feature = "std")]extern crate std;#[cfg(feature = "std")]direct_repr!(std::net::Ipv4Addr);#[cfg(feature = "std")]direct_repr!(std::net::Ipv6Addr);#[cfg(feature = "std")]direct_repr!(std::net::IpAddr);#[cfg(feature = "std")]direct_repr!(std::net::SocketAddr);#[cfg(feature = "std")]direct_repr!(std::time::SystemTime);#[cfg(feature = "std")]direct_repr!(std::time::Duration);#[cfg(feature = "uuid")]direct_repr!(uuid::Uuid);#[cfg(feature = "ed25519")]direct_repr!(ed25519_zebra::VerificationKeyBytes);/// Types that can be stored on disk.pub trait UnsizedStorable: Storable {const ALIGN: usize;/// If `Self::SIZE.is_some()`, this must return the same/// value. The default implementation is `Self;:SIZE.unwrap()`.fn size(&self) -> usize;/// Read the size from an on-page entry. If `Self::SIZE.is_some()`/// this must be the same value.unsafe fn onpage_size(_: *const u8) -> usize;/// Write to a page. Must not overwrite the allocated size, but/// this isn't checked (which is why it's unsafe).unsafe fn write_to_page(&self, p: *mut u8);unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self;}impl Storable for [u8] {type PageReferences = core::iter::Empty<u64>;fn page_references(&self) -> Self::PageReferences {core::iter::empty()}fn compare(&self, b: &Self) -> core::cmp::Ordering {self.cmp(b)}}impl UnsizedStorable for [u8] {const ALIGN: usize = 2;fn size(&self) -> usize {2 + self.len()}unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {let len = u16::from_le(*(p as *const u16));assert_ne!(len, 0);assert_eq!(len & 0xf000, 0);core::slice::from_raw_parts(p.add(2), len as usize)}unsafe fn onpage_size(p: *const u8) -> usize {let len = u16::from_le(*(p as *const u16));2 + len as usize}unsafe fn write_to_page(&self, p: *mut u8) {assert!(self.len() <= 510);*(p as *mut u16) = (self.len() as u16).to_le();core::ptr::copy_nonoverlapping(self.as_ptr(), p.add(2), self.len())}}unsafe fn read<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: *const u8,) -> (*const u8, *const u8) {let s = K::onpage_size(k);let v = k.add(s);let al = v.align_offset(V::ALIGN);let v = v.add(al);(k, v)}unsafe fn entry_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: *const u8,) -> usize {assert_eq!(k.align_offset(K::ALIGN), 0);let ks = K::onpage_size(k);// next multiple of va, assuming va is a power of 2.let v_off = (ks + V::ALIGN - 1) & !(V::ALIGN - 1);let v_ptr = k.add(v_off);let vs = V::onpage_size(v_ptr);let ka = K::ALIGN.max(V::ALIGN);let size = v_off + vs;(size + ka - 1) & !(ka - 1)}/// Representation of a mutable or shared page. This is an owned page/// (like `Vec` in Rust's std), but we do not know whether we can/// mutate it or not.////// The least-significant bit of the first byte of each page is 1 if/// and only if the page was allocated by the current transaction (and/// hence isn't visible to any other transaction, meaning we can write/// on it).#[derive(Debug)]#[repr(C)]pub struct CowPage {pub data: *mut u8,pub offset: u64,}/// Representation of a borrowed, or immutable page, like a slice in/// Rust.#[derive(Debug, Clone, Copy)]#[repr(C)]pub struct Page<'a> {pub data: &'a [u8; PAGE_SIZE],pub offset: u64,}impl CowPage {/// Borrows the page.pub fn as_page(&self) -> Page {Page {data: unsafe { &*(self.data as *const [u8; PAGE_SIZE]) },offset: self.offset,}}#[cfg(feature = "crc32")]pub fn crc(&self, hasher: &crc32fast::Hasher) -> u32 {let mut hasher = hasher.clone();hasher.reset();// Hash the beginning and the end of the page (i.e. remove// the CRC).unsafe {// Remove the dirty bit.let x = [(*self.data) & 0xfe];hasher.update(&x[..]);hasher.update(core::slice::from_raw_parts(self.data.add(1), 3));hasher.update(core::slice::from_raw_parts(self.data.add(8),PAGE_SIZE - 8,));}hasher.finalize()}#[cfg(feature = "crc32")]pub fn crc_check(&self, hasher: &crc32fast::Hasher) -> bool {let crc = unsafe { u32::from_le(*(self.data as *const u32).add(1)) };self.crc(hasher) == crc}}/// An owned page on which we can write. This is just a wrapper around/// `CowPage` to avoid checking the "dirty" bit at runtime.#[derive(Debug)]pub struct MutPage(pub CowPage);#[cfg(not(feature = "crc32"))]pub fn clear_dirty(data: &mut [u8]) {data[0] &= 0xfe}#[cfg(feature = "crc32")]pub fn clear_dirty(data: &mut [u8], hasher: &crc32fast::Hasher) {unsafe {data[0] &= 0xfe;let crc = (self.0.data.as_mut_ptr() as *mut u32).add(1);*crc = self.0.crc(hasher)}}impl MutPage {#[cfg(not(feature = "crc32"))]pub fn clear_dirty(&mut self) {unsafe { *self.0.data &= 0xfe }}#[cfg(feature = "crc32")]pub fn clear_dirty(&mut self, hasher: &crc32fast::Hasher) {unsafe {*self.0.data &= 0xfe;let crc = (self.0.data as *mut u32).add(1);*crc = self.0.crc(hasher)}}}unsafe impl Sync for CowPage {}unsafe impl Send for CowPage {}impl CowPage {/// Checks the dirty bit of a page.pub fn is_dirty(&self) -> bool {unsafe { (*self.data) & 1 != 0 }}}/// Trait for loading a page.#[async_trait(?Send)]pub trait LoadPage: Send + Sync {type Error;/// Loading a page.async fn load_page(&self, off: u64) -> Result<CowPage, Self::Error>;/// Reference-counting. Since reference-counts are designed to be/// storable into B trees by external allocators, pages referenced/// once aren't stored, and hence are indistinguishable from pages/// that are never referenced. The default implementation returns/// 0.////// This has the extra benefit of requiring less disk space, and/// isn't more unsafe than storing the reference count, since we/// aren't supposed to hold a reference to a page with "logical/// RC" 0, so storing "1" for that page would be redundant anyway.async fn rc(&self, _off: u64) -> Result<u64, Self::Error> {Ok(0)}}/// Trait for allocating and freeing pages.#[async_trait(?Send)]pub trait AllocPage: LoadPage {/// Allocate a new page.async fn alloc_page(&mut self) -> Result<MutPage, Self::Error>;/// Increment the page's reference count.async fn incr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;/// Decrement the page's reference count, assuming the page was/// first allocated by another transaction. If the RC reaches 0,/// free the page. Must return the new RC (0 if freed).async fn decr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;/// Same as [`Self::decr_rc`], but for pages allocated by the current/// transaction. This is an important distinction, as pages/// allocated by the current transaction can be reused immediately/// after being freed.async fn decr_rc_owned(&mut self, off: u64) -> Result<usize, Self::Error>;}
//! Insertions into a B tree.use super::cursor::*;use super::*;/// The representation of the update to a page.#[derive(Debug)]pub struct Ok {/// The new page, possibly the same as the previous version.pub page: MutPage,/// The freed page, or 0 if no page was freed.pub freed: u64,}/// The result of an insertion, i.e. either an update or a split.#[derive(Debug)]pub enum Put<'a, K: ?Sized, V: ?Sized> {Ok(Ok),Split {split_key: &'a K,split_value: &'a V,left: MutPage,right: MutPage,freed: u64,},}/// Insert an entry into a database, returning `false` if and only if/// the exact same entry (key *and* value) was already in the/// database.pub async fn put_async<T: AsyncAllocPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,key: &K,value: &V,) -> Result<bool, T::Error> {// Set the cursor to the first element larger than or equal to// `(key, value)`. We will insert at that position (where "insert"// is meant in the sense of Rust's `Vec::insert`.let mut cursor = Cursor::async_new(txn, db).await?;if cursor.async_set(txn, key, Some(value)).await?.is_some() {// If `(key, value)` already exists in the tree, return.return Ok(false);}// Else, we are at a leaf, since cursors that don't find an// element go all the way to the leaves.//// We put on the leaf page, resulting in a `Put`, i.e. either// `Put::Ok` or `Put::Split`.let p = cursor.len(); // Save the position of the leaf cursor.let is_owned = p < cursor.first_rc_len();// Insert the key and value at the leaf, i.e. pop the top level of// the stack (the leaf) and insert in there.let cur = cursor.pop().unwrap();let put = P::put(txn,cur.page,is_owned,false,&cur.cursor,key,value,None,0,0,)?;// In both cases (`Ok` and `Split`), we need to walk up the tree// and update each node.// Moreover, since the middle elements of the splits may be on// pages that must be freed at the end of this call, we collect// them in the `free` array below, and free them only at the end// of this function.//// If we didn't hold on to these pages, they could be reallocated// in subsequent splits, and the middle element could be// lost. This is important when the middle element climbs up more// than one level (i.e. the middle element of the split of a leaf// is also the middle element of the split of the leaf's parent,// etc).let mut free = [0; N_CURSORS];db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;for f in &free[..p] {if *f & 1 != 0 {txn.decr_rc_owned((*f) ^ 1)?;} else if *f > 0 {txn.decr_rc(*f)?;}}Ok(true)}async fn put_cascade<'a,T: AsyncAllocPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,mut put: Put<'a, K, V>,free: &mut [u64; N_CURSORS],) -> Result<MutPage, T::Error> {loop {match put {Put::Split {split_key,split_value,left,right,freed,} => {// Here, we are copying all children of the freed// page, except possibly the last freed page (which is// a child of the current node, if we aren't at a// leaf). This includes the split key/value, also// incremented in the following call:incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;// Then, insert the split key/value in the relevant page:let is_owned = cursor.len() < cursor.first_rc_len();if let Some(cur) = cursor.pop() {// In this case, there's a page above the page// that just split (since we can pop the stack),// so the page that just split isn't the root (but// `cur` might be).put = P::put(txn,cur.page,is_owned,false,&cur.cursor,split_key,split_value,None,left.0.offset,right.0.offset,)?;} else {// No page above the split, so the root has just// split. Insert the split key/value into a new// page above the entire tree.let mut p = txn.alloc_page()?;let cursor = P::cursor_first(&p.0);P::init(&mut p);if let Put::Ok(p) = P::put(txn,p.0,true,false,&cursor,split_key,split_value,None,left.0.offset,right.0.offset,)? {// Return the new root.return Ok(p.page);} else {unreachable!()}}}Put::Ok(Ok { page, freed }) => {// Same as above: increment the relevant reference// counters.incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;// And update the left child of the current cursor,// since the main invariant of cursors is that we're// always visiting the left child (if we're visiting// the last child of a page, the cursor is set to the// position strictly after the entries).let is_owned = cursor.len() < cursor.first_rc_len();if let Some(curs) = cursor.pop() {// If we aren't at the root, just update the child.put = Put::Ok(P::update_left_child(txn,curs.page,is_owned,&curs.cursor,page.0.offset,)?)} else {// If we are at the root, `page` is the updated root.return Ok(page);}}}}}/// This function does all the memory management for `put`.fn incr_descendants<T: AllocPage + LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreePage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,free: &mut [u64; N_CURSORS],mut freed: u64,) -> Result<(), T::Error> {// The freed page is on the page below.if cursor.len() < cursor.first_rc_len() {// If a page has split or was immutable (allocated by a// previous transaction) and has been updated, we need to free// the old page.free[cursor.len()] = freed;} else {// This means that the new version of the page (either split// or not) contains references to all the children of the// freed page, except the last freed page (because the new// version of that page isn't shared).let cur = cursor.cur();// Start a cursor at the leftmost position and increase the// leftmost child page's RC (if we aren't at a leaf, and if// that child isn't the last freed page).let mut c = P::cursor_first(&cur.page);let left = P::left_child(cur.page.as_page(), &c);if left != 0 {if left != (freed & !1) {txn.incr_rc(left)?;} else if cursor.len() == cursor.first_rc_len() {freed = 0}}// Then, for each entry of the page, increase the RCs of// references contained in the keys and values, and increase// the RC of the right child.while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o)?;}if r != 0 {if r != (freed & !1) {txn.incr_rc(r)?;} else if cursor.len() == cursor.first_rc_len() {freed = 0}}}// Else, the "freed" page is shared with another tree, and// hence we just need to decrement its RC.if freed > 0 && cursor.len() == cursor.first_rc_len() {free[cursor.len()] = freed;}}Ok(())}
//! Insertions into a B tree.use super::cursor::*;use super::*;/// The representation of the update to a page.#[derive(Debug)]pub struct Ok {/// The new page, possibly the same as the previous version.pub page: MutPage,/// The freed page, or 0 if no page was freed.pub freed: u64,}/// The result of an insertion, i.e. either an update or a split.#[derive(Debug)]pub enum Put<'a, K: ?Sized, V: ?Sized> {Ok(Ok),Split {split_key: &'a K,split_value: &'a V,left: MutPage,right: MutPage,freed: u64,},}/// Insert an entry into a database, returning `false` if and only if/// the exact same entry (key *and* value) was already in the/// database.pub async fn put<T: AllocPage,K: Storable + ?Sized + Sync,V: Storable + ?Sized + Sync,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,key: &K,value: &V,) -> Result<bool, T::Error> {// Set the cursor to the first element larger than or equal to// `(key, value)`. We will insert at that position (where "insert"// is meant in the sense of Rust's `Vec::insert`.let mut cursor = Cursor::new(txn, db).await?;if cursor.set(txn, key, Some(value)).await?.is_some() {// If `(key, value)` already exists in the tree, return.return Ok(false);}// Else, we are at a leaf, since cursors that don't find an// element go all the way to the leaves.//// We put on the leaf page, resulting in a `Put`, i.e. either// `Put::Ok` or `Put::Split`.let p = cursor.len(); // Save the position of the leaf cursor.let is_owned = p < cursor.first_rc_len();// Insert the key and value at the leaf, i.e. pop the top level of// the stack (the leaf) and insert in there.let cur = cursor.pop().unwrap();let put = P::put(txn,cur.page,is_owned,false,&cur.cursor,key,value,None,0,0,).await?;// In both cases (`Ok` and `Split`), we need to walk up the tree// and update each node.// Moreover, since the middle elements of the splits may be on// pages that must be freed at the end of this call, we collect// them in the `free` array below, and free them only at the end// of this function.//// If we didn't hold on to these pages, they could be reallocated// in subsequent splits, and the middle element could be// lost. This is important when the middle element climbs up more// than one level (i.e. the middle element of the split of a leaf// is also the middle element of the split of the leaf's parent,// etc).let mut free = [0; N_CURSORS];db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;for f in &free[..p] {if *f & 1 != 0 {txn.decr_rc_owned((*f) ^ 1).await?;} else if *f > 0 {txn.decr_rc(*f).await?;}}Ok(true)}async fn put_cascade<'a,T: AllocPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,mut put: Put<'a, K, V>,free: &mut [u64; N_CURSORS],) -> Result<MutPage, T::Error> {loop {match put {Put::Split {split_key,split_value,left,right,freed,} => {// Here, we are copying all children of the freed// page, except possibly the last freed page (which is// a child of the current node, if we aren't at a// leaf). This includes the split key/value, also// incremented in the following call:incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;// Then, insert the split key/value in the relevant page:let is_owned = cursor.len() < cursor.first_rc_len();if let Some(cur) = cursor.pop() {// In this case, there's a page above the page// that just split (since we can pop the stack),// so the page that just split isn't the root (but// `cur` might be).put = P::put(txn,cur.page,is_owned,false,&cur.cursor,split_key,split_value,None,left.0.offset,right.0.offset,).await?;} else {// No page above the split, so the root has just// split. Insert the split key/value into a new// page above the entire tree.let mut p = txn.alloc_page().await?;let cursor = P::cursor_first(&p.0);P::init(&mut p);if let Put::Ok(p) = P::put(txn,p.0,true,false,&cursor,split_key,split_value,None,left.0.offset,right.0.offset,).await? {// Return the new root.return Ok(p.page);} else {unreachable!()}}}Put::Ok(Ok { page, freed }) => {// Same as above: increment the relevant reference// counters.incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;// And update the left child of the current cursor,// since the main invariant of cursors is that we're// always visiting the left child (if we're visiting// the last child of a page, the cursor is set to the// position strictly after the entries).let is_owned = cursor.len() < cursor.first_rc_len();if let Some(curs) = cursor.pop() {// If we aren't at the root, just update the child.put = Put::Ok(P::update_left_child(txn,curs.page,is_owned,&curs.cursor,page.0.offset,).await?)} else {// If we are at the root, `page` is the updated root.return Ok(page);}}}}}/// This function does all the memory management for `put`.async fn incr_descendants<T: AllocPage + LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreePage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,free: &mut [u64; N_CURSORS],mut freed: u64,) -> Result<(), T::Error> {// The freed page is on the page below.if cursor.len() < cursor.first_rc_len() {// If a page has split or was immutable (allocated by a// previous transaction) and has been updated, we need to free// the old page.free[cursor.len()] = freed;} else {// This means that the new version of the page (either split// or not) contains references to all the children of the// freed page, except the last freed page (because the new// version of that page isn't shared).let cur = cursor.cur();// Start a cursor at the leftmost position and increase the// leftmost child page's RC (if we aren't at a leaf, and if// that child isn't the last freed page).let mut c = P::cursor_first(&cur.page);let left = P::left_child(cur.page.as_page(), &c);if left != 0 {if left != (freed & !1) {txn.incr_rc(left).await?;} else if cursor.len() == cursor.first_rc_len() {freed = 0}}// Then, for each entry of the page, increase the RCs of// references contained in the keys and values, and increase// the RC of the right child.while let Some((k, v, r)) = P::next(cur.page.as_page(), &mut c) {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}if r != 0 {if r != (freed & !1) {txn.incr_rc(r).await?;} else if cursor.len() == cursor.first_rc_len() {freed = 0}}}// Else, the "freed" page is shared with another tree, and// hence we just need to decrement its RC.if freed > 0 && cursor.len() == cursor.first_rc_len() {free[cursor.len()] = freed;}}Ok(())}
//! Implementation of B tree pages for Unsized types, or types with an//! dynamically-sized representation (for example enums with widely//! different sizes).//!//! This module follows the same organisation as the sized//! implementation, and contains types shared between the two//! implementations.//!//! The types that can be used with this implementation must implement//! the [`UnsizedStorable`] trait, which essentially replaces the//! [`core::mem`] functions for determining the size and alignment of//! values.//!//! One key difference is the implementation of leaves (internal nodes//! have the same format): indeed, in this implementation, leaves have//! almost the same format as internal nodes, except that their//! offsets are written on the page as little-endian-encoded [`u16`]//! (with only the 12 LSBs used, i.e. 4 bits unused).use super::*;use crate::btree::del::*;use crate::btree::put::*;use core::cmp::Ordering;// The header is the same as for the sized implementation, so we share// it here.pub(super) mod header;// Like in the sized implementation, we have the same three submodules.mod alloc;// This is a common module with the sized implementation.pub(super) mod cursor;mod put;pub(super) mod rebalance;use alloc::*;use cursor::*;use header::*;use rebalance::*;#[derive(Debug)]pub struct Page<K: ?Sized, V: ?Sized> {k: core::marker::PhantomData<K>,v: core::marker::PhantomData<V>,}// Many of these functions are the same as in the Sized implementations.#[async_trait(?Send)]impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>for Page<K, V>{fn is_empty(c: &Self::Cursor) -> bool {c.cur >= c.total as isize}fn is_init(c: &Self::Cursor) -> bool {c.cur < 0}type Cursor = PageCursor;fn cursor_before(p: &crate::CowPage) -> Self::Cursor {PageCursor::new(p, -1)}fn cursor_after(p: &crate::CowPage) -> Self::Cursor {PageCursor::after(p)}// Split a cursor, returning two cursors `(a, b)` where b is the// same as `c`, and `a` is a cursor running from the first element// of the page to `c` (excluding `c`).fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {(PageCursor {cur: 0,total: c.cur.max(0) as usize,is_leaf: c.is_leaf,},*c,)}fn move_next(c: &mut Self::Cursor) -> bool {if c.cur < c.total as isize {c.cur += 1;true} else {false}}fn move_prev(c: &mut Self::Cursor) -> bool {if c.cur > 0 {c.cur -= 1;true} else {c.cur = -1;false}}// This function is the same as the sized implementation for// internal nodes, since the only difference between leaves and// internal nodes in this implementation is the size of offsets (2// bytes for leaves, 8 bytes for internal nodes).fn current<'a>(page: crate::Page<'a>,c: &Self::Cursor,) -> Option<(&'a K, &'a V, u64)> {if c.cur < 0 || c.cur >= c.total as isize {None} else if c.is_leaf {unsafe {let off =u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));Some((K::from_raw_ptr(k as *const u8),V::from_raw_ptr(v as *const u8),0,))}} else {unsafe {let off =u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));let (k, v) = read::<K, V>(page.data.as_ptr().add((off & 0xfff) as usize));Some((K::from_raw_ptr(k as *const u8),V::from_raw_ptr(v as *const u8),off & !0xfff,))}}}// These methods are the same as in the sized implementation.fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {assert!(c.cur >= 0);if c.is_leaf {0} else {assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);let off =unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8 - 8) as *const u64) };u64::from_le(off) & !0xfff}}fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {assert!(c.cur < c.total as isize);if c.is_leaf {0} else {assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);let off = unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };u64::from_le(off) & !0xfff}}async fn set_cursor<'a, 'b, T: LoadPage>(_txn: &'a T,page: crate::Page<'b>,c: &mut PageCursor,k0: &K,v0: Option<&V>,) -> Result<(&'a K, &'a V, u64), usize> {unsafe {// `lookup` has the same semantic as// `core::slice::binary_search`, i.e. it returns either// `Ok(n)`, where `n` is the position where `(k0, v0)` was// found, or `Err(n)` where `n` is the position where// `(k0, v0)` can be inserted to preserve the order.match lookup(page, c, k0, v0).await {Ok(n) => {c.cur = n as isize;if c.is_leaf {let off =u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));Ok((K::from_raw_ptr(k), V::from_raw_ptr(v), 0))} else {let off =u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));let (k, v) =read::<K, V>(page.data.as_ptr().add(off as usize & 0xfff));Ok((K::from_raw_ptr(k),V::from_raw_ptr(v),off & !0xfff,))}}Err(n) => {c.cur = n as isize;Err(n)}}}}}// There quite some duplicated code in the following function, because// we're forming a slice of offsets, and the using the core library's// `binary_search_by` method on slices.async unsafe fn lookup<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page<'a>,c: &mut PageCursor,k0: &K,v0: Option<&V>,) -> Result<usize, usize> {let hdr = header(page);c.total = hdr.n() as usize;c.is_leaf = hdr.is_leaf();if c.is_leaf {lookup_leaf(page, k0, v0, hdr)} else {lookup_internal(page, k0, v0, hdr)}}unsafe fn lookup_internal<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page<'a>,k0: &K,v0: Option<&V>,hdr: &header::Header,) -> Result<usize, usize> {let s = core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u64,hdr.n() as usize,);if let Some(v0) = v0 {s.binary_search_by(|&off| {let off = u64::from_le(off) & 0xfff;let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));let k = K::from_raw_ptr(k);match k.compare(k0) {Ordering::Equal => {let v = V::from_raw_ptr(v);v.compare(v0)}cmp => cmp,}})} else {match s.binary_search_by(|&off| {let off = u64::from_le(off) & 0xfff;let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));let k = K::from_raw_ptr(k);k.compare(k0)}) {Err(i) => Err(i),Ok(mut i) => {// Rewind if there are multiple matching keys.while i > 0 {let off = u64::from_le(s[i-1]) & 0xfff;let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));let k = K::from_raw_ptr(k);if let Ordering::Equal = k.compare(k0) {i -= 1} else {break}}Ok(i)}}}}unsafe fn lookup_leaf<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page<'a>,k0: &K,v0: Option<&V>,hdr: &header::Header,) -> Result<usize, usize> {let s = core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u16,hdr.n() as usize,);if let Some(v0) = v0 {s.binary_search_by(|&off| {let off = u16::from_le(off);let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize));let k = K::from_raw_ptr(k as *const u8);match k.compare(k0) {Ordering::Equal => {let v = V::from_raw_ptr(v as *const u8);v.compare(v0)}cmp => cmp,}})} else {match s.binary_search_by(|&off| {let off = u16::from_le(off);let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));let k = K::from_raw_ptr(k);k.compare(k0)}) {Err(e) => Err(e),Ok(mut i) => {// Rewind if there are multiple matching keys.while i > 0 {let off = u16::from_le(s[i-1]);let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));let k = K::from_raw_ptr(k);if let Ordering::Equal = k.compare(k0){i -= 1} else {break}}Ok(i)}}}}#[async_trait(?Send)]impl<K: UnsizedStorable + ?Sized + core::fmt::Debug,V: UnsizedStorable + ?Sized + core::fmt::Debug,> super::BTreeMutPage<K, V> for Page<K, V>{// The init function is straightforward.fn init(page: &mut MutPage) {let h = header_mut(page);h.init();}// Deletions in internal nodes of a B tree need to replace the// deleted value with a value from a leaf.//// In this implementation of pages, we never actually wipe any// data from pages, we're even only appending data, or cloning the// pages to compact them. Therefore, raw pointers to leaves are// always valid for as long as the page isn't freed, which can// only happen at the very last step of an insertion or deletion.type Saved = (*const K, *const V);fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {(k as *const K, v as *const V)}unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {(&*s.0, &*s.1)}// As in the sized implementation, `put` is split into its own submodule.async fn put<'a, T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,replace: bool,c: &PageCursor,k0: &'a K,v0: &'a V,k1v1: Option<(&'a K, &'a V)>,l: u64,r: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {debug_assert!(c.cur >= 0);debug_assert!(k1v1.is_none() || replace);if r == 0 {put::put::<_, _, _, Leaf>(txn,page,mutable,replace,c.cur as usize,k0,v0,k1v1,0,0,).await} else {put::put::<_, _, _, Internal>(txn,page,mutable,replace,c.cur as usize,k0,v0,k1v1,l,r,).await}}unsafe fn put_mut(page: &mut MutPage,c: &mut Self::Cursor,k0: &K,v0: &V,r: u64,) {let mut n = c.cur;if r == 0 {Leaf::alloc_write(page, k0, v0, 0, r, &mut n);} else {Internal::alloc_write(page, k0, v0, 0, r, &mut n);}c.total += 1;}unsafe fn set_left_child(page: &mut MutPage,c: &Self::Cursor,l: u64) {let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}// Update the left child of the entry pointed to by cursor `c`.async fn update_left_child<T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,c: &Self::Cursor,l: u64,) -> Result<crate::btree::put::Ok, T::Error> {assert!(!c.is_leaf);let freed;let page = if mutable && page.is_dirty() {// If the page is dirty (allocated by this transaction)// and isn't shared, just make it mutable.freed = 0;MutPage(page)} else {// Else, clone the page:let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);// Copy the left childlet l = header(page.as_page()).left_page() & !0xfff;let hdr = header_mut(&mut new);hdr.set_left_page(l);// Copy all the entrieslet s = Internal::offset_slice(page.as_page());clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);// Mark the old version for freeing.freed = page.offset | if page.is_dirty() { 1 } else { 0 };new};// Finally, update the left children of the cursor. We know// that all valid positions of a cursor except the leftmost// one (-1) have a left child.assert!(c.cur >= 0);unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}Ok(Ok { page, freed })}// Here is how deletions work: if the page is dirty and mutable,// we "unlink" the value by moving the end of the offset array to// the left by one offset (2 bytes in leaves, 8 bytes in internal// nodes).async fn del<T: AllocPage>(txn: &mut T,page: crate::CowPage,mutable: bool,c: &PageCursor,l: u64,) -> Result<(MutPage, u64), T::Error> {// Check that the cursor is at a valid position for a deletion.debug_assert!(c.cur >= 0 && c.cur < c.total as isize);if mutable && page.is_dirty() {let p = page.data;let mut page = MutPage(page);let hdr = header_mut(&mut page);let n = hdr.n();if c.is_leaf {unsafe {let ptr = p.add(HDR + c.cur as usize * 2) as *mut u16;// Get the offset in the page of the key/value data.let off = u16::from_le(*ptr);assert_eq!(off & 0xf000, 0);// Erase the offset by shifting the last (`n -// c.cur - 1`) offsets. The reasoning against// "off-by-one errors" is that if the cursor is// positioned on the first element (`c.cur == 0`),// there are `n-1` elements to shift.core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);// Remove the size of the actualy key/value, plus// 2 bytes (the offset).hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));}} else {unsafe {let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;// Offset to the key/value data.let off = u64::from_le(*ptr);// Shift the offsets like in the leaf case above.core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);if l > 0 {// In an internal node, we may have to update// the left child of the current// position. After the move, the current// offset is at `ptr`, so we need to find the// offset one step to the left.let p = ptr.offset(-1);*p = (l | (u64::from_le(*p) & 0xfff)).to_le();}// Remove the size of the key/value, plus 8 bytes// (the offset).hdr.decr(8 + entry_size::<K, V>(p.add((off & 0xfff) as usize)));}};hdr.set_n(n - 1);// Return the page, and 0 for "nothing was freed".Ok((page, 0))} else {// If the page cannot be mutated, we allocate a new page and clone.let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);if c.is_leaf {// In a leaf, this is just a matter of getting the// offset slice, removing the current position and// cloning.let s = Leaf::offset_slice(page.as_page());let (s0, s1) = s.split_at(c.cur as usize);let (_, s1) = s1.split_at(1);let mut n = 0;clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);} else {// In an internal node, things are a bit trickier,// since we might need to update the left child.//// First, clone the leftmost child of the page.let hdr = header(page.as_page());let left = hdr.left_page() & !0xfff;unsafe {*(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();}// Then, clone the first half of the page.let s = Internal::offset_slice(page.as_page());let (s0, s1) = s.split_at(c.cur as usize);let (_, s1) = s1.split_at(1);let mut n = 0;clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);// Update the left child, which was written by the// call to `clone` as the right child of the last// entry in `s0`.if l > 0 {unsafe {let off = (new.0.data.add(HDR) as *mut u64).offset(n - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}}// Then, clone the second half of the page.clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);}// Return the new page, and the offset of the freed page.Ok((new, page.offset))}}// Decide what to do with the concatenation of two neighbouring// pages, with a middle element.//// This is highly similar to the sized case.async fn merge_or_rebalance<'a, T: AllocPage>(txn: &mut T,m: Concat<'a, K, V, Self>,) -> Result<Op<'a, T, K, V>, T::Error> {// First evaluate the size of the middle element on a// page. Contrarily to the sized case, the offsets are// aligned, so the header is always the same size (no// padding).let mid_size = if m.modified.c0.is_leaf {2 + alloc_size::<K, V>(m.mid.0, m.mid.1)} else {8 + alloc_size::<K, V>(m.mid.0, m.mid.1)};// Evaluate the size of the modified page of the concatenation// (which includes the header).let mod_size = size::<K, V>(&m.modified);// Add the "occupied" size (which excludes the header).let occupied = {let hdr = header(m.other.as_page());(hdr.left_page() & 0xfff) as usize};if mod_size + mid_size + occupied <= PAGE_SIZE {// If the concatenation fits on a page, merge.return if m.modified.c0.is_leaf {merge::<_, _, _, _, Leaf>(txn, m).await} else {merge::<_, _, _, _, Internal>(txn, m).await};}// If we can't merge, evaluate the size of the first binding// on the other page, to see if we can rebalance.let first_other = PageCursor::new(&m.other, 0);let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);// If the modified page is at least half-full, or if removing// the first entry on the other page would make that other// page less than half-full, don't rebalance. See the Sized// implementation to see cases where this happens.if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {if let Some((k, v)) = m.modified.ins {return Ok(Op::Put(Self::put(txn,m.modified.page,m.modified.mutable,m.modified.skip_first,&m.modified.c1,k,v,m.modified.ins2,m.modified.l,m.modified.r,).await?));} else if m.modified.skip_first {debug_assert!(m.modified.ins2.is_none());let (page, freed) = Self::del(txn,m.modified.page,m.modified.mutable,&m.modified.c1,m.modified.l,).await?;return Ok(Op::Put(Put::Ok(Ok { page, freed })));} else {let mut c1 = m.modified.c1.clone();let mut l = m.modified.l;if l == 0 {Self::move_next(&mut c1);l = m.modified.r;}return Ok(Op::Put(Put::Ok(Self::update_left_child(txn,m.modified.page,m.modified.mutable,&c1,l,).await?)));}}// Finally, if we're here, we can rebalance. There are four// (relatively explicit) cases, see the `rebalance` submodule// to see how this is done.if m.mod_is_left {if m.modified.c0.is_leaf {rebalance_left::<_, _, _, Leaf>(txn, m).await} else {rebalance_left::<_, _, _, Internal>(txn, m).await}} else {rebalance_right::<_, _, _, Self>(txn, m).await}}}/// Size of a modified page (including the header).fn size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(m: &ModifiedPage<K, V, Page<K, V>>,) -> usize {let mut total = {let hdr = header(m.page.as_page());(hdr.left_page() & 0xfff) as usize};total += HDR;// Extra size for the offsets.let extra = if m.c1.is_leaf { 2 } else { 8 };if let Some((k, v)) = m.ins {total += extra + alloc_size(k, v) as usize;if let Some((k, v)) = m.ins2 {total += extra + alloc_size(k, v) as usize;}}if m.skip_first {total -= current_size::<K, V>(m.page.as_page(), &m.c1) as usize;}total}// Size of a pair of type `(K, V)`. This is computed in the same way// as a struct with fields of type `K` and `V` in C.fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();let al = K::ALIGN.max(V::ALIGN);(s + al - 1) & !(al - 1)}// Total size of the entry for that cursor position, including the// offset size.fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page,c: &PageCursor,) -> usize {if c.is_leaf {Leaf::current_size::<K, V>(page, c.cur)} else {Internal::current_size::<K, V>(page, c.cur)}}pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize);fn set_left_child(new: &mut MutPage, n: isize, l: u64);}/// Perform the modifications on a page, by copying it onto page `new`.fn modify<K: core::fmt::Debug + ?Sized, V: core::fmt::Debug + ?Sized, P: BTreeMutPage<K, V>, L: AllocWrite<K, V>>(new: &mut MutPage,m: &mut ModifiedPage<K, V, P>,n: &mut isize,) {let mut l = P::left_child(m.page.as_page(), &m.c0);while let Some((k, v, r)) = P::next(m.page.as_page(), &mut m.c0) {L::alloc_write(new, k, v, l, r, n);l = 0;}let mut rr = m.r;if let Some((k, v)) = m.ins {if let Some((k2, v2)) = m.ins2 {L::alloc_write(new, k, v, l, m.l, n);L::alloc_write(new, k2, v2, 0, m.r, n);} else if m.l > 0 {L::alloc_write(new, k, v, m.l, m.r, n);} else {L::alloc_write(new, k, v, l, m.r, n);}l = 0;rr = 0;} else if m.l > 0 {l = m.l}let mut c1 = m.c1.clone();if m.skip_first {P::move_next(&mut c1);}// Here's a confusing thing: if the first element of `c1` is the// last element of a page, we may be updating its right child (in// which case m.r > 0) rather than its left child like for all// other elements.//// This case only ever happens for this function when we're// updating the last child of a page p, and then merging p with// its right neighbour.while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {if rr > 0 {L::alloc_write(new, k, v, l, rr, n);rr = 0;} else {L::alloc_write(new, k, v, l, r, n);}l = 0;}if l != 0 {// The while loop above didn't run, i.e. the insertion// happened at the end of the page. In this case, we haven't// had a chance to update the left page, so do it now.L::set_left_child(new, *n, l)}}/// The very unsurprising `merge` functionpub(super) async fn merge<'a,T: AllocPage + LoadPage,K: ?Sized + core::fmt::Debug,V: ?Sized + core::fmt::Debug,P: BTreeMutPage<K, V>,L: AllocWrite<K, V>,>(txn: &mut T,mut m: Concat<'a, K, V, P>,) -> Result<Op<'a, T, K, V>, T::Error> {// Here, we first allocate a page, then clone both pages onto it,// in a different order depending on whether the modified page is// the left or the right child.//// Note that in the case that this merge happens immediately after// a put that reallocated the two sides of the merge in order to// split (not all splits do that), we could be slightly more// efficient, but with considerably more code.let mut new = txn.alloc_page().await?;P::init(&mut new);let mut n = 0;if m.mod_is_left {modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);let mut rc = P::cursor_first(&m.other);let l = P::left_child(m.other.as_page(), &rc);L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, l, &mut n);while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {L::alloc_write(&mut new, k, v, 0, r, &mut n);}} else {let mut rc = P::cursor_first(&m.other);let mut l = P::left_child(m.other.as_page(), &rc);while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {L::alloc_write(&mut new, k, v, l, r, &mut n);l = 0;}L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, 0, &mut n);modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);}let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };let b1 = if m.other.is_dirty() { 1 } else { 0 };Ok(Op::Merged {page: new,freed: [m.modified.page.offset | b0, m.other.offset | b1],marker: core::marker::PhantomData,})}fn clone<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(page: crate::Page,new: &mut MutPage,s: Offsets<L::Offset>,n: &mut isize,) {for off in s.0.iter() {let (r, off): (u64, usize) = (*off).into();unsafe {let ptr = page.data.as_ptr().add(off);let size = entry_size::<K, V>(ptr);// Reserve the space on the pagelet hdr = header_mut(new);let data = hdr.data() as u16;let off_new = data - size as u16;hdr.set_data(off_new);hdr.set_n(hdr.n() + 1);if hdr.is_leaf() {hdr.incr(2 + size);let ptr = new.0.data.offset(HDR as isize + *n * 2) as *mut u16;*ptr = (off_new as u16).to_le();} else {hdr.incr(8 + size);// Set the offset to this new entry in the offset// array, along with the right child page.let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;*ptr = (r | off_new as u64).to_le();}core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);}*n += 1;}}
use super::cursor::*;use super::*;// The implementation here is mostly the same as in the sized case,// except for leaves, which makes it hard to refactor.pub(crate) async fn rebalance_left<'a,T: AllocPage,K: UnsizedStorable + ?Sized + core::fmt::Debug,V: UnsizedStorable + ?Sized + core::fmt::Debug,L: Alloc,>(txn: &mut T,m: Concat<'a, K, V, Page<K, V>>,) -> Result<Op<'a, T, K, V>, T::Error> {assert!(m.mod_is_left);// First element of the right page. We'll rotate it to the left// page, i.e. return a middle element, and append the current// middle element to the left page.let rc = super::cursor::PageCursor::new(&m.other, 0);let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();let mut freed_ = [0, 0];// First, perform the modification on the modified page, which we// know is the left page, and return the resulting mutable page// (the modified page may or may not be mutable before we do this).let new_left = if let Some((k, v)) = m.modified.ins {let is_dirty = m.modified.page.is_dirty();let new_left = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(txn,m.modified.page,m.modified.mutable,m.modified.skip_first,&m.modified.c1,k,v,m.modified.ins2,m.modified.l,m.modified.r,).await? {if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}page} else {unreachable!()};// Append the middle element of the concatenation at the end// of the left page. We know the left page to be mutable by// now, and we also know there's enough space to do this.let lc = PageCursor::after(&new_left.0);if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(txn, new_left.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,).await? {if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}page} else {unreachable!()}} else if m.modified.skip_first {let is_dirty = m.modified.page.is_dirty();let (page, freed) = <Page<K, V>>::del(txn,m.modified.page,m.modified.mutable,&m.modified.c1,m.modified.l,).await?;if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}// Append the middle element of the concatenation at the end// of the left page. We know the left page to be mutable by// now, and we also know there's enough space to do this.let lc = PageCursor::after(&page.0);if let Put::Ok(Ok { freed, page }) =<Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl).await?{if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}page} else {unreachable!()}} else {// Append the middle element of the concatenation at the end// of the left page. We know the left page to be mutable by// now, and we also know there's enough space to do this.let is_dirty = m.modified.page.is_dirty();let lc = PageCursor::after(&m.modified.page);if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(txn,m.modified.page,m.modified.mutable,false,&lc,m.mid.0,m.mid.1,None,0,rl,).await? {if m.modified.l > 0 {assert_eq!(m.modified.r, 0);unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();}} else if m.modified.r > 0 {unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();}}if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}page} else {unreachable!()}};// We extend the pointer's lifetime here, because we know the// current deletion (we only rebalance during deletions) won't// touch this page anymore after this.let k = unsafe { core::mem::transmute(k) };let v = unsafe { core::mem::transmute(v) };// If this frees the old "other" page, add it to the "freed"// array.let is_dirty = m.other.is_dirty();let (new_right, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;if freed > 0 {freed_[1] = if is_dirty { freed | 1 } else { freed }}Ok(Op::Rebalanced {l: new_left.0.offset,r: new_right.0.offset,k,v,freed: freed_,})}// Surprisingly, the `rebalance_right` function is simpler,// since://// - if we call it to rebalance two internal nodes, we're in the easy// case of rebalance_left.//// - Else, the middle element is the last one on the left page, and// isn't erased be leaf deletions, because these just move entries to// the left.//// This implementation is shared with the sized one.pub(crate) async fn rebalance_right<'a,T: AllocPage,K: ?Sized,V: ?Sized,P: BTreeMutPage<K, V, Cursor = super::PageCursor>,>(txn: &mut T,m: Concat<'a, K, V, P>,) -> Result<Op<'a, T, K, V>, T::Error> {assert!(!m.mod_is_left);// Take the last element of the left page.let lc = P::cursor_last(&m.other);let (k0, v0, r0) = P::current(m.other.as_page(), &lc).unwrap();let mut freed_ = [0, 0];// Perform the modification on the modified page.let new_right = if let Some((k, v)) = m.modified.ins {let is_dirty = m.modified.page.is_dirty();let new_right = if let Put::Ok(Ok { page, freed }) = P::put(txn,m.modified.page,m.modified.mutable,m.modified.skip_first,&m.modified.c1,k,v,m.modified.ins2,m.modified.l,m.modified.r,).await? {if freed > 0 {freed_[0] = if is_dirty { freed | 1 } else { freed };}page} else {unreachable!()};// Add the middle element of the concatenation as the first// element of the right page. We know the right page is// mutable, since we just modified it (hence the// `assert_eq!(freed, 0)`.// First element of the right page (after potential// modification by `put` above).let rc = P::cursor_first(&new_right.0);let rl = P::left_child(new_right.0.as_page(), &rc);if let Put::Ok(Ok { freed, page }) = P::put(txn,new_right.0,true,false,&rc,m.mid.0,m.mid.1,None,r0,rl,).await? {debug_assert_eq!(freed, 0);page} else {unreachable!()}} else if m.modified.skip_first {let is_dirty = m.modified.page.is_dirty();let (page, freed) = P::del(txn,m.modified.page,m.modified.mutable,&m.modified.c1,m.modified.l,).await?;if freed > 0 {freed_[0] = if is_dirty { freed | 1 } else { freed };}// Add the middle element of the concatenation as the first// element of the right page. We know the right page is// mutable, since we just modified it. Moreover, if it is// compacted by the `put` below, we know that the `del` didn't// free anything, hence we can reuse the slot 0..// First element of the right page (after potential// modification by `del` above).let rc = P::cursor_first(&page.0);let rl = P::left_child(page.0.as_page(), &rc);if let Put::Ok(Ok { freed, page }) = P::put(txn, page.0, true, false, &rc, m.mid.0, m.mid.1, None, r0, rl,).await? {if freed > 0 {freed_[0] = if is_dirty { freed | 1 } else { freed };}page} else {unreachable!()}} else {let is_dirty = m.modified.page.is_dirty();let rc = P::cursor_first(&m.modified.page);let rl = P::left_child(m.modified.page.as_page(), &rc);if let Put::Ok(Ok { freed, page }) = P::put(txn,m.modified.page,m.modified.mutable,false,&rc,m.mid.0,m.mid.1,None,r0,rl,).await? {// Update the left and right offsets. We know that at// least one of them is 0, but it can be either one (or// both), depending on what happened on the page below.//// Since we inserted an entry at the beginning of the// page, we need to add 1 to the index given by// `m.modified.c1.cur`.if m.modified.l > 0 {assert_eq!(m.modified.r, 0);unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();}} else if m.modified.r > 0 {unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur + 1);*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();}}if freed > 0 {freed_[0] = if is_dirty { freed | 1 } else { freed };}page} else {unreachable!()}};// As explained in the general comment on this function, this// entry isn't erased by the deletion in `m.other` below, so we// can safely extend its lifetime.let k = unsafe { core::mem::transmute(k0) };let v = unsafe { core::mem::transmute(v0) };let is_dirty = m.other.is_dirty();let (new_left, freed) = P::del(txn, m.other, m.other_is_mutable, &lc, 0).await?;if freed > 0 {freed_[1] = if is_dirty { freed | 1 } else { freed }}Ok(Op::Rebalanced {l: new_left.0.offset,r: new_right.0.offset,k,v,freed: freed_,})}
use super::header::{header, header_mut, HDR};use super::*;pub(super) async fn put<'a,T: AllocPage,K: UnsizedStorable + ?Sized + core::fmt::Debug,V: UnsizedStorable + ?Sized + core::fmt::Debug,L: Alloc + AllocWrite<K, V>,>(txn: &mut T,page: CowPage,mutable: bool,replace: bool,u: usize,k0: &'a K,v0: &'a V,k1v1: Option<(&'a K, &'a V)>,l: u64,r: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {// Size of the new insertions, not counting the offsets.let size = alloc_size(k0, v0)+ if let Some((k1, v1)) = k1v1 {alloc_size(k1, v1)} else {0};// Sized occupied by the data in these insertions (not counting// the offsets), if we need to compact the page.let size_replaced = if replace {// We're always in an internal node if we replace.let cur_size = Internal::current_size::<K, V>(page.as_page(), u as isize);// Since `size` isn't counting the size of the 64-bit offset,// and `cur_size` is, we need to add 8.8 + size as isize - cur_size as isize} else {size as isize};// Number of extra offsets.let n_ins = {let n_ins = if k1v1.is_some() { 2 } else { 1 };if replace {n_ins - 1} else {n_ins}};let hdr = header(page.as_page());if mutable && page.is_dirty() && L::can_alloc(header(page.as_page()), size, n_ins) {// First, if the page is dirty and mutable, mark it mutable.let mut page = MutPage(page);let mut n = u as isize;// If we replace, we first need to "unlink" the previous// value, by erasing its offset.if replace {let p = page.0.data;let hdr = header_mut(&mut page);// In this case, we know we're not in a leaf, so offsets// are of size 8.assert!(!hdr.is_leaf());unsafe {let ptr = p.add(HDR + n as usize * 8) as *mut u64;let off = (u64::from_le(*ptr) & 0xfff) as usize;let kv_ptr = p.add(off);let size = entry_size::<K, V>(kv_ptr);core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);// Decreasing these figures here, we'll increase them// again in the calls to `alloc_write` below.hdr.decr(8 + size);hdr.set_n(hdr.n() - 1);}}// Do the actual insertions.if let Some((k1, v1)) = k1v1 {L::alloc_write(&mut page, k0, v0, 0, 0, &mut n);L::alloc_write(&mut page, k1, v1, l, r, &mut n);} else {L::alloc_write(&mut page, k0, v0, l, r, &mut n);}// Return: no page was freed.Ok(Put::Ok(Ok { page, freed: 0 }))} else if L::can_compact(hdr, size_replaced, n_ins) {// Allocate a new page, split the offsets at the position of// the insertion, and each side of the split onto the new// page, with the insertion between them.let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);// Clone the leftmost child.L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);// Split the offsets.let s = L::offset_slice(page.as_page());let (s0, s1) = s.split_at(u as usize);// Drop the first element of `s1` if we're replacing it.let s1 = if replace { s1.split_at(1).1 } else { s1 };// Finally, clone and insert.let mut n = 0;clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);if let Some((k1, v1)) = k1v1 {L::alloc_write(&mut new, k0, v0, 0, l, &mut n);L::alloc_write(&mut new, k1, v1, 0, r, &mut n);} else {L::alloc_write(&mut new, k0, v0, l, r, &mut n);}clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);// And return, freeing the old version of the page.Ok(Put::Ok(Ok {page: new,freed: page.offset | if page.is_dirty() { 1 } else { 0 },}))} else {split_unsized::<_, _, _, L>(txn, page, replace, u, k0, v0, k1v1, l, r).await}}// Surprisingly, the following function is simpler than in the sized// case, because we can't be too efficient in this case, since we have// to go through all the elements linearly anyway to get their sizes.async fn split_unsized<'a,T: AllocPage,K: UnsizedStorable + ?Sized + core::fmt::Debug,V: UnsizedStorable + ?Sized + core::fmt::Debug,L: Alloc + AllocWrite<K, V>,>(txn: &mut T,page: CowPage,replace: bool,u: usize,k0: &'a K,v0: &'a V,k1v1: Option<(&'a K, &'a V)>,l0: u64,r0: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {let hdr = header(page.as_page());let mut left = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);let mut right = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);// Clone the leftmost child. If we're inserting at position 0,// this means this leftmost child must, and will be replaced.let left_child = hdr.left_page() & !0xfff;L::set_right_child(&mut left, -1, left_child);// Pointer to the split key and value.let mut split = None;// We start filling the left page, and we will change to filling// the right page when the left one is at least half-full.let mut current_page = &mut left;// There are two synchronised counters here: `hdr.n()` and// `n`. The reason for this is that operations on `hdr.n()` are// somewhat cumbersome, as they might involve extra operations to// convert to/from the little-endian encoding of `hdr.n()`.let mut n = 0;let s = L::offset_slice(page.as_page());let mut total = HDR;for (uu, off) in s.0.iter().enumerate() {// If the current position of the cursor is the insertion,// (i.e. `uu == u`), insert.if uu == u {// The following is tricky and makes assumptions on the// rest of the code. If we are inserting two elements// (i.e. if `k1v1.is_some()`), this means we're replacing// one on the page.header_mut(current_page).set_n(n as u16);if let Some((k1, v1)) = k1v1 {// As explained in the documentation for `put` in the// definition of `BTreeMutPage`, `l0` and `r0` are the// left and right children of k1v1, since this means// the right child of a deleted entry has split, and// we must replace the deleted entry with `(k0, v0)`.L::alloc_write(current_page, k0, v0, 0, l0, &mut n);total += alloc_size(k0, v0) + L::OFFSET_SIZE;L::alloc_write(current_page, k1, v1, 0, r0, &mut n);total += alloc_size(k1, v1) + L::OFFSET_SIZE;// Replacing the next element:debug_assert!(replace);continue;} else {L::alloc_write(current_page, k0, v0, l0, r0, &mut n);total += alloc_size(k0, v0) + L::OFFSET_SIZE;if replace {continue;}}}// Then, i.e. either if we're not at the position of an// insertion, or else if we're not replacing the current// position, just copy the current entry to the appropriate// page (left or right).let (r, off): (u64, usize) = (*off).into();assert_ne!(off, 0);unsafe {let ptr = page.data.add(off);let size = entry_size::<K, V>(ptr);// If the left side of the split is at least half-full, we// know that we can do all the insertions we need on the// right-hand side (even if there are two of them, and the// replaced value is of size 0), so we switch.if split.is_none() && total >= PAGE_SIZE / 2 {// Don't write the next entry onto any page, keep it// as the split entry.let (k, v) = read::<K, V>(ptr);split = Some((K::from_raw_ptr(k), V::from_raw_ptr(v)));// Set the entry count for the current page, before// switching and resetting the counter.header_mut(current_page).set_n(n as u16);// And set the leftmost child of the right page to// `r`.L::set_right_child(&mut right, -1, r);// Then, switch page and reset the counter.current_page = &mut right;n = 0;} else {// Else, we're simply allocating a new entry on the// current page.let off_new = {let hdr = header_mut(current_page);let data = hdr.data();assert!(HDR + hdr.n() as usize * L::OFFSET_SIZE + L::OFFSET_SIZE + size< data as usize);// Move the data pointer `size` bytes to the left.hdr.set_data(data - size as u16);// And increase the number of entries, and// occupied size of the page.hdr.incr(size + L::OFFSET_SIZE);data - size as u16};// Copy the entry.core::ptr::copy_nonoverlapping(ptr,current_page.0.data.offset(off_new as isize),size,);// And set the offset.L::set_offset(current_page, n, r | (off_new as u64));n += 1;}total += size + L::OFFSET_SIZE;}}header_mut(current_page).set_n(n as u16);// Finally, it is possible that we haven't had a chance to do the// insertions yet: this is the case iff we're inserting at the end// of all the entries, so handle this now.if u == s.0.len() {if let Some((k1, v1)) = k1v1 {L::alloc_write(current_page, k0, v0, 0, l0, &mut n);L::alloc_write(current_page, k1, v1, 0, r0, &mut n);} else {L::alloc_write(current_page, k0, v0, l0, r0, &mut n);}}let (split_key, split_value) = split.unwrap();Ok(Put::Split {split_key,split_value,left,right,freed: if page.is_dirty() {page.offset | 1} else {page.offset},})}
use crate::PAGE_SIZE;#[derive(Debug)]#[repr(C)]pub struct Header {data: u16,n: u16,crc: u32,left_page: u64,}impl Header {pub(crate) fn init(&mut self) {self.data = (((PAGE_SIZE as u16) << 3) | 1).to_le();self.n = 0;self.crc = 0;self.left_page = 0;}pub(crate) fn n(&self) -> u16 {u16::from_le(self.n)}pub(crate) fn set_n(&mut self, n: u16) {self.n = n.to_le();}pub fn left_page(&self) -> u64 {u64::from_le(self.left_page)}pub fn set_left_page(&mut self, l: u64) {self.left_page = l.to_le()}pub fn data(&self) -> u16 {u16::from_le(self.data) >> 3}pub fn set_data(&mut self, d: u16) {let dirty = u16::from_le(self.data) & 7;self.data = ((d << 3) | dirty).to_le()}pub fn decr(&mut self, s: usize) {self.left_page = (self.left_page() - s as u64).to_le();}pub fn set_occupied(&mut self, size: u64) {self.left_page = ((self.left_page() & !0xfff) | size).to_le()}pub fn incr(&mut self, s: usize) {self.left_page = (self.left_page() + s as u64).to_le();}pub fn is_leaf(&self) -> bool {u64::from_le(self.left_page) <= 0xfff}}pub const HDR: usize = core::mem::size_of::<Header>();pub(crate) fn header<'a>(page: crate::Page<'a>) -> &'a Header {unsafe { &*(page.data.as_ptr() as *const Header) }}pub fn header_mut(page: &mut crate::MutPage) -> &mut Header {unsafe { &mut *(page.0.data as *mut Header) }}
use super::{header, Header};#[derive(Debug, Clone, Copy)]pub struct PageCursor {pub cur: isize,pub total: usize,pub is_leaf: bool,}impl PageCursor {/// Initialise a cursor on page `p` at position `cur`, checking/// that `cur` is a valid position.pub fn new(p: &crate::CowPage, cur: isize) -> Self {let hdr = unsafe { &*(p.data as *const Header) };assert!(cur >= -1 && cur < hdr.n() as isize);PageCursor {cur,total: hdr.n() as usize,is_leaf: hdr.is_leaf(),}}/// Initialise a cursor set after the last entry of page `p`.pub fn after(p: &crate::CowPage) -> Self {let hdr = unsafe { &*(p.data as *const Header) };let total = hdr.n();PageCursor {cur: total as isize,total: total as usize,is_leaf: hdr.is_leaf(),}}/// Initialise a cursor set on the last entry of page `p`.pub fn last(p: crate::Page) -> Self {let hdr = header(p);let total = hdr.n();PageCursor {cur: (total - 1) as isize,total: total as usize,is_leaf: hdr.is_leaf(),}}}
use super::header::{header, header_mut, Header, HDR};use super::*;/// We'd love to share this trait with the sized implementation, but/// unfortunately, the type parameters of almost all methods are/// different.pub(crate) trait Alloc {const OFFSET_SIZE: usize;/// Size (including the offset size) of the entry at position/// `cur`.fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page,cur: isize,) -> usize;/// Can we allocate an entry of `size` bytes, where `size` doesn't/// include the offset bytes?fn can_alloc(hdr: &Header, size: usize, n: usize) -> bool {(HDR as usize) + (hdr.n() as usize) * Self::OFFSET_SIZE + n * Self::OFFSET_SIZE + size<= hdr.data() as usize}/// If we cloned the page, could we allocate an entry of `size`/// bytes, where `size` doesn't include the offset bytes?fn can_compact(hdr: &Header, size: isize, n: usize) -> bool {(HDR as isize)+ ((hdr.left_page() & 0xfff) as isize)+ (n * Self::OFFSET_SIZE) as isize+ size<= PAGE_SIZE as isize}/// Set the right child of the `n`th entry to `r`. If `n == -1`,/// this method can be used to set the leftmost child of a page.fn set_right_child(_new: &mut MutPage, _n: isize, _r: u64) {}/// Set the n^th offset on the page to `r`, which encodes a page/// offset in its 52 MSBs, and an offset on the page in its 12 LSBs.fn set_offset(new: &mut MutPage, n: isize, r: u64);/// The type of offsets, u64 in internal nodes, u16 in leaves.type Offset: Into<(u64, usize)> + Copy + core::fmt::Debug;fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset>;}#[derive(Debug, Clone, Copy)]pub struct Offsets<'a, A>(pub &'a [A]);impl<'a, A: Copy> Offsets<'a, A> {pub fn split_at(self, mid: usize) -> (Self, Self) {if mid < self.0.len() {let (a, b) = self.0.split_at(mid);(Offsets(a), Offsets(b))} else {debug_assert_eq!(mid, self.0.len());(self, Offsets(&[][..]))}}}pub struct Leaf {}pub struct Internal {}impl Alloc for Leaf {const OFFSET_SIZE: usize = 2;// Note: the size returned by this function includes the offset.fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page,cur: isize,) -> usize {// Find the offset of the current position, get its size.unsafe {debug_assert!(cur >= 0);let off = *(page.data.as_ptr().add(HDR + 2 * cur as usize) as *const u16);Self::OFFSET_SIZE+ entry_size::<K, V>(page.data.as_ptr().add(u16::from_le(off) as usize))}}fn set_offset(new: &mut MutPage, n: isize, off: u64) {unsafe {let ptr = new.0.data.offset(HDR as isize + n * 2) as *mut u16;*ptr = (off as u16).to_le();}}type Offset = LeafOffset;fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {let hdr_n = header(page).n();unsafe {Offsets(core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const LeafOffset,hdr_n as usize,))}}}impl Alloc for Internal {const OFFSET_SIZE: usize = 8;fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(page: crate::Page,cur: isize,) -> usize {unsafe {8 + entry_size::<K, V>(page.data.as_ptr().add((u64::from_le(*(page.data.as_ptr().add(HDR) as *const u64).offset(cur)) & 0xfff)as usize,))}}/// Set the right-hand child in the offset array, preserving the/// current offset.fn set_right_child(page: &mut MutPage, n: isize, r: u64) {unsafe {debug_assert_eq!(r & 0xfff, 0);let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;let off = u64::from_le(*ptr) & 0xfff;*ptr = (r | off as u64).to_le();}}fn set_offset(new: &mut MutPage, n: isize, off: u64) {unsafe {let ptr = new.0.data.offset(HDR as isize + n * 8) as *mut u64;*ptr = off.to_le();}}type Offset = InternalOffset;fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {let hdr_n = header(page).n() as usize;unsafe {Offsets(core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const InternalOffset,hdr_n,))}}}#[derive(Debug, Clone, Copy)]#[repr(C)]pub struct LeafOffset(u16);impl Into<(u64, usize)> for LeafOffset {fn into(self) -> (u64, usize) {(0, self.0 as usize)}}impl Into<usize> for LeafOffset {fn into(self) -> usize {self.0 as usize}}#[derive(Debug, Clone, Copy)]#[repr(C)]pub struct InternalOffset(u64);impl Into<(u64, usize)> for InternalOffset {fn into(self) -> (u64, usize) {(self.0 & !0xfff, (self.0 & 0xfff) as usize)}}impl Into<usize> for InternalOffset {fn into(self) -> usize {self.0 as usize}}impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Leaf {fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {alloc_write::<K, V, Self>(new, k0, v0, l, r, n)}fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {debug_assert_eq!(l, 0);}}impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Internal {fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {alloc_write::<K, V, Self>(new, k0, v0, l, r, n)}fn set_left_child(new: &mut MutPage, n: isize, l: u64) {if l > 0 {Internal::set_right_child(new, n - 1, l);}}}// Allocate a new entry and copy (k0, v0) into the new slot.fn alloc_write<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(new: &mut MutPage,k0: &K,v0: &V,l: u64,r: u64,n: &mut isize,) {let size = alloc_size(k0, v0);let off_new = alloc_insert::<K, V, L>(new, n, size, r);unsafe {let new_ptr = new.0.data.add(off_new);k0.write_to_page(new_ptr);let ks = k0.size();let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));v0.write_to_page(v_ptr);}if l > 0 {L::set_right_child(new, *n - 1, l);}*n += 1;}/// Reserve space on the page for `size` bytes of data (i.e. not/// counting the offset), set the right child of the new position/// to `r`, add the offset to the new data to the offset array,/// and return the new offset.fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(new: &mut MutPage,n: &mut isize,size: usize,r: u64,) -> usize {let hdr = header_mut(new);let data = hdr.data();// If we're here, the following assertion has been checked by the// `can_alloc` method.debug_assert!(HDR + (hdr.n() as usize + 1) * L::OFFSET_SIZE + size <= data as usize);hdr.set_data(data - size as u16);hdr.set_n(hdr.n() + 1);hdr.incr(L::OFFSET_SIZE + size);let off_new = data - size as u16;let hdr_n = hdr.n();// Making space for the new offsetunsafe {core::ptr::copy(new.0.data.add(HDR + (*n as usize) * L::OFFSET_SIZE),new.0.data.add(HDR + (*n as usize) * L::OFFSET_SIZE + L::OFFSET_SIZE),(hdr_n as usize - (*n as usize)) * L::OFFSET_SIZE,);}L::set_offset(new, *n, r | (off_new as u64));off_new as usize}
//! Implementation of B tree pages for Sized types, i.e. types whose//! representation has a size known at compile time (and the same as//! [`core::mem::size_of()`]).//!//! The details of the implementation are as follows://!//! - The page starts with a 16 bytes header of the following form//! (where all the fields are encoded in Little-Endian)://!//! ```//! #[repr(C)]//! pub struct Header {//! /// Offset to the first entry in the page, shifted 3 bits//! /// to the right to allow for the dirty bit (plus two//! /// extra bits, zero for now), as explained in the//! /// documentation of CowPage, at the root of this//! /// crate. This is 4096 for empty pages, and it is unused//! /// for leaves. Moreover, this field can't be increased://! /// when it reaches the bottom, the page must be cloned.//! data: u16,//! /// Number of entries on the page.//! n: u16,//! /// CRC (if used).//! crc: u32,//! /// The 52 most significant bits are the left child of//! /// this page (0 for leaves), while the 12 LSBs represent//! /// the space this page would take when cloned from scratch,//! /// minus the header. The reason for this is that entries//! /// in internal nodes aren't really removed when deleted,//! /// they're only "unlinked" from the array of offsets (see//! /// below). Therefore, we must have a way to tell when a//! /// page can be "compacted" to reclaim space.//! left_page: u64,//! }//! ```//! - For leaves, the rest of the page has the same representation as//! an array of length `n`, of elements of type `Tuple<K, V>`://! ```//! #[repr(C)]//! struct Tuple<K, V> {//! k: K,//! v: V,//! }//! ```//! If the alignment of that structure is more than 16 bytes, we//! need to add some padding between the header and that array.//! - For internal nodes, the rest of the page starts with an array of//! length `n` of Little-Endian-encoded [`u64`], where the 12 least//! significant bits of each [`u64`] are an offset to a `Tuple<K, V>` in//! the page, and the 52 other bits are an offset in the file to the//! right child of the entry.//!//! Moreover, the offset represented by the 12 LSBs is after (or at)//! `header.data`.//! We say we can "allocate" in the page if the `data` of the header//! is greater than or equal to the position of the last "offset",//! plus the size we want to allocate (note that since we allocate//! from the end of the page, `data` is always a multiple of the//! alignment of `Tuple<K, V>`).use super::*;use crate::btree::del::*;use crate::btree::put::*;use core::cmp::Ordering;mod alloc; // The "alloc" trait, to provide common methods for leaves and internal nodes.mod put; // Inserting a new value onto a page (possibly splitting the page).mod rebalance; // Rebalance two sibling pages to the left or to the right.use super::page_unsized::{cursor::PageCursor, header};use alloc::*;use header::*;use rebalance::*;/// This struct is used to allocate a pair `(K, V)` on the stack, and/// copy it from a C pointer from a page.////// This is also used to form slices of pairs in order to use/// functions from the core library.#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]#[repr(C)]struct Tuple<K, V> {k: K,v: V,}/// Empty type implementing `BTreePage` and `BTreeMutPage`.#[derive(Debug)]pub struct Page<K, V> {k: core::marker::PhantomData<K>,v: core::marker::PhantomData<V>,}#[async_trait(?Send)]impl<K: Storable, V: Storable> super::BTreePage<K, V> for Page<K, V> {// Cursors are quite straightforward. One non-trivial thing is// that they represent both a position on a page and the interval// from that position to the end of the page, as is apparent in// their `split_at` method.//// Another thing to note is that -1 and `c.total` are valid// positions for a cursor `c`. The reason for this is that// position `-1` has a right child (which is the first element's// left child).type Cursor = PageCursor;fn is_empty(c: &Self::Cursor) -> bool {c.cur >= c.total as isize}fn is_init(c: &Self::Cursor) -> bool {c.cur < 0}fn cursor_before(p: &crate::CowPage) -> Self::Cursor {PageCursor::new(p, -1)}fn cursor_after(p: &crate::CowPage) -> Self::Cursor {PageCursor::after(p)}fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {(PageCursor {cur: 0,total: c.cur.max(0) as usize,is_leaf: c.is_leaf,},*c,)}fn move_next(c: &mut Self::Cursor) -> bool {if c.cur >= c.total as isize {return false;}c.cur += 1;true}fn move_prev(c: &mut Self::Cursor) -> bool {if c.cur < 0 {return false;}c.cur -= 1;true}fn current<'a>(page: crate::Page<'a>,c: &Self::Cursor,) -> Option<(&'a K, &'a V, u64)> {// First, there's no current entry if the cursor is outside// the range of entries.if c.cur < 0 || c.cur >= c.total as isize {None} else if c.is_leaf {// Else, if this is a leaf, the elements are packed// together in a contiguous array.//// This means that the header may be followed by padding// (in order to align the entries). These are constants// known at compile-time, so `al` and `hdr` are optimised// away by the compiler.let al = core::mem::align_of::<Tuple<K, V>>();// The following is a way to compute the first multiple of// `al` after `HDR`, assuming `al` is a power of 2 (which// is always the case since we get it from `align_of`).let hdr = (HDR + al - 1) & !(al - 1);// The position of the `Tuple<K, V>` we're looking for is// `f * cur` bytes after the padded header (because// `size_of` includes alignment padding).let f = core::mem::size_of::<Tuple<K, V>>();let kv = unsafe {&*(page.data.as_ptr().add(hdr + c.cur as usize * f) as *const Tuple<K, V>)};Some((&kv.k, &kv.v, 0))} else {// Internal nodes have an extra level of indirection: we// first need to find `off`, the offset in the page, in// the initial array of offsets. Since these offsets are// `u64`, and the header is of size 16 bytes, the array is// already aligned.unsafe {let off =u64::from_le(*(page.data.as_ptr().add(HDR + 8 * c.cur as usize) as *const u64));// Check that we aren't reading outside of the page// (for example because of a malformed offset).assert!((off as usize & 0xfff) + core::mem::size_of::<Tuple<K, V>>() <= 4096);// Once we have the offset, cast its 12 LSBs to a// position in the page, and read the `Tuple<K, V>` at// that position.let kv = &*(page.data.as_ptr().add((off as usize) & 0xfff) as *const Tuple<K, V>);Some((&kv.k, &kv.v, off & !0xfff))}}}// The left and right child methods aren't really surprising. One// thing to note is that cursors are always in positions between// `-1` and `c.total` (bounds included), so we only have to check// one side of the bound in the assertions.//// We also check, before entering the `unsafe` sections, that// we're only reading data that is on a page.fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {if c.is_leaf {0} else {assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);let off =unsafe { *(page.data.as_ptr().offset((HDR as isize + c.cur * 8) - 8) as *const u64) };u64::from_le(off) & !0xfff}}fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {if c.is_leaf {0} else {assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 <= 4088);let off =unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };u64::from_le(off) & !0xfff}}async fn set_cursor<'a, 'b, T: LoadPage>(_txn: &'a T,page: crate::Page<'b>,c: &mut PageCursor,k0: &K,v0: Option<&V>,) -> Result<(&'a K, &'a V, u64), usize> {unsafe {// `lookup` has the same semantic as// `core::slice::binary_search`, i.e. it returns either// `Ok(n)`, where `n` is the position where `(k0, v0)` was// found, or `Err(n)` where `n` is the position where// `(k0, v0)` can be inserted to preserve the order.match lookup(page, c, k0, v0).await {Ok(n) => {c.cur = n as isize;// Just read the tuple and return it.if c.is_leaf {let f = core::mem::size_of::<Tuple<K, V>>();let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);let tup =&*(page.data.as_ptr().add(hdr_size + f * n) as *const Tuple<K, V>);Ok((&tup.k, &tup.v, 0))} else {let off =u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));let tup =&*(page.data.as_ptr().add(off as usize & 0xfff) as *const Tuple<K, V>);Ok((&tup.k, &tup.v, off & !0xfff))}}Err(n) => {c.cur = n as isize;Err(n)}}}}}#[async_trait(?Send)]impl<K: Storable + core::fmt::Debug, V: Storable + core::fmt::Debug> super::BTreeMutPage<K, V>for Page<K, V>{// Once again, this is quite straightforward.fn init(page: &mut MutPage) {let h = header_mut(page);h.init();}// When deleting from internal nodes, we take a replacement from// one of the leaves (in our current implementation, the leftmost// entry of the right subtree). This method copies an entry from// the leaf onto the program stack, which is necessary since// deletions in leaves overwrites entries.//// Another design choice would have been to do the same as for the// unsized implementation, but in this case this would have meant// copying the saved value to the end of the leaf, potentially// preventing merges, and not even saving a memory copy.type Saved = (K, V);fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {unsafe {let mut k0 = core::mem::MaybeUninit::uninit();let mut v0 = core::mem::MaybeUninit::uninit();core::ptr::copy_nonoverlapping(k, k0.as_mut_ptr(), 1);core::ptr::copy_nonoverlapping(v, v0.as_mut_ptr(), 1);(k0.assume_init(), v0.assume_init())}}unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {(core::mem::transmute(&s.0), core::mem::transmute(&s.1))}// `put` inserts one or two entries onto a node (internal or// leaf). This is implemented in the `put` module.async fn put<'a, T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,replace: bool,c: &PageCursor,k0: &'a K,v0: &'a V,k1v1: Option<(&'a K, &'a V)>,l: u64,r: u64,) -> Result<super::put::Put<'a, K, V>, T::Error> {assert!(c.cur >= 0);// In the sized case, deletions can never cause a split, so we// never have to insert two elements at the same position.assert!(k1v1.is_none());if r == 0 {put::put::<_, _, _, Leaf>(txn, page, mutable, replace, c.cur as usize, k0, v0, 0, 0).await} else {put::put::<_, _, _, Internal>(txn, page, mutable, replace, c.cur as usize, k0, v0, l, r).await}}unsafe fn put_mut(page: &mut MutPage, c: &mut Self::Cursor, k0: &K, v0: &V, r: u64) {use super::page_unsized::AllocWrite;let mut n = c.cur;if r == 0 {Leaf::alloc_write(page, k0, v0, 0, r, &mut n);} else {Internal::alloc_write(page, k0, v0, 0, r, &mut n);}c.total += 1;}unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}// This function updates an internal node, setting the left child// of the cursor to `l`.async fn update_left_child<T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,c: &Self::Cursor,l: u64,) -> Result<crate::btree::put::Ok, T::Error> {assert!(!c.is_leaf && c.cur >= 0 && (c.cur as usize) <= c.total);let freed;let page = if mutable && page.is_dirty() {// If the page is mutable (dirty), just convert it to a// mutable page, and update.freed = 0;MutPage(page)} else {// Else, clone the page.let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);// First clone the left child of the page.let l = header(page.as_page()).left_page() & !0xfff;let hdr = header_mut(&mut new);hdr.set_left_page(l);// And then the rest of the page.let s = Internal::offset_slice::<T, K, V>(page.as_page());clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);// Mark the former version of the page as free.freed = page.offset | if page.is_dirty() { 1 } else { 0 };new};// Finally, update the left child of the cursor.unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}Ok(Ok { page, freed })}async fn del<T: AllocPage>(txn: &mut T,page: crate::CowPage,mutable: bool,c: &PageCursor,l: u64,) -> Result<(MutPage, u64), T::Error> {assert!(c.cur >= 0 && (c.cur as usize) < c.total);if mutable && page.is_dirty() {// In the mutable case, we just need to move some memory// around.let p = page.data;let mut page = MutPage(page);let hdr = header_mut(&mut page);let f = core::mem::size_of::<Tuple<K, V>>();if c.is_leaf {// In leaves, we need to move the n - c - 1 elements// that are strictly after the cursor, by `f` (the// size of an entry).//// Here's the reasoning to avoid off-by-one errors: if// `c` is 0 (i.e. we're deleting the first element on// the page), we remove one entry, so there are n - 1// remaining entries.let n = hdr.n() as usize;let hdr_size = {// As usual, header + paddinglet al = core::mem::align_of::<Tuple<K, V>>();(HDR + al - 1) & !(al - 1)};let off = hdr_size + c.cur as usize * f;unsafe {core::ptr::copy(p.add(off + f), p.add(off), f * (n - c.cur as usize - 1));}// Removing `f` bytes from the page.hdr.decr(f);} else {// Internal nodes are easier to deal with, as we just// have to move the offsets.unsafe {let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - c.cur as usize - 1);}// Removing `f` bytes from the page (the tuple), plus// one 8-bytes offset.hdr.decr(f + 8);// Updating the left page if necessary.if l > 0 {unsafe {let off = (p.add(HDR) as *mut u64).offset(c.cur as isize - 1);*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}}}hdr.set_n(hdr.n() - 1);// Returning the mutable page itself, and 0 (for "no page freed")Ok((page, 0))} else {// Immutable pages need to be cloned. The strategy is the// same in both cases: get an "offset slice", split it at// the cursor, remove the first entry of the second half,// and clone.let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);if c.is_leaf {let s = Leaf::offset_slice::<T, K, V>(page.as_page());let (s0, s1) = s.split_at(c.cur as usize);let (_, s1) = s1.split_at(1);let mut n = 0;clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);} else {// Internal nodes a bit trickier, since the left child// is not counted in the "offset slice", so we need to// clone it separately. Also, the `l` argument to this// function might be non-zero in this case.let s = Internal::offset_slice::<T, K, V>(page.as_page());let (s0, s1) = s.split_at(c.cur as usize);let (_, s1) = s1.split_at(1);// First, clone the left child of the page.let hdr = header(page.as_page());let left = hdr.left_page() & !0xfff;unsafe { *(new.0.data.add(HDR - 8) as *mut u64) = left.to_le() };// Then, clone the entries strictly before the cursor// (i.e. clone `s0`).let mut n = 0;clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);// If we need to update the left child of the deleted// item, do it.if l > 0 {unsafe {let off = new.0.data.offset(HDR as isize + (n - 1) * 8) as *mut u64;*off = (l | (u64::from_le(*off) & 0xfff)).to_le();}}// Finally, clone the right half of the page (`s1`).clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);}Ok((new, page.offset))}}// Decide what to do with the concatenation of two neighbouring// pages, with a middle element.async fn merge_or_rebalance<'a, T: AllocPage>(txn: &mut T,m: Concat<'a, K, V, Self>,) -> Result<Op<'a, T, K, V>, T::Error> {// First evaluate the size of the middle element on a page.let (hdr_size, mid_size) = if m.modified.c0.is_leaf {let al = core::mem::align_of::<Tuple<K, V>>();((HDR + al - 1) & !(al - 1),core::mem::size_of::<Tuple<K, V>>(),)} else {(HDR, 8 + core::mem::size_of::<Tuple<K, V>>())};// Evaluate the size of the modified half of the concatenation// (which includes the header).let mod_size = size::<K, V>(&m.modified);// Add the "occupied" size (which excludes the header).let occupied = {let hdr = header(m.other.as_page());(hdr.left_page() & 0xfff) as usize};// One surprising observation here is that adding the sizes// works. This is surprising because of alignment and// padding. It works because we can split the sizes into an// offset part (always 8 bytes) and a data part (of a constant// alignment), and thus we never need any padding anywhere on// the page.if mod_size + mid_size + occupied <= PAGE_SIZE {// If the concatenation fits on a page, merge.return if m.modified.c0.is_leaf {super::page_unsized::merge::<_, _, _, _, Leaf>(txn, m).await} else {super::page_unsized::merge::<_, _, _, _, Internal>(txn, m).await};}// If the modified page is large enough, or if the other page// has only just barely the minimum number of elements to be// at least half-full, return.//// The situation where the other page isn't full enough might// happen for example if elements occupy exactly 1/5th of a// page + 1 byte, and the modified page has 2 of them after// the deletion, while the other page has 3.if mod_size >= PAGE_SIZE / 2 || hdr_size + occupied - mid_size < PAGE_SIZE / 2 {if let Some((k, v)) = m.modified.ins {// Perform the required modification and return.return Ok(Op::Put(Self::put(txn,m.modified.page,m.modified.mutable,m.modified.skip_first,&m.modified.c1,k,v,m.modified.ins2,m.modified.l,m.modified.r,).await?));} else if m.modified.skip_first {// `ins2` is only ever used when the page immediately// below a deletion inside an internal node has split,// and we need to replace the deleted value, *and*// insert the middle entry of the split.debug_assert!(m.modified.ins2.is_none());let (page, freed) = Self::del(txn,m.modified.page,m.modified.mutable,&m.modified.c1,m.modified.l,).await?;return Ok(Op::Put(Put::Ok(Ok { page, freed })));} else {let mut c1 = m.modified.c1.clone();let mut l = m.modified.l;if l == 0 {Self::move_next(&mut c1);l = m.modified.r;}return Ok(Op::Put(Put::Ok(Self::update_left_child(txn,m.modified.page,m.modified.mutable,&c1,l,).await?)));}}// Finally, if we're here, we can rebalance. There are four// (relatively explicit) cases, see the `rebalance` submodule// to see how this is done.if m.mod_is_left {if m.modified.c0.is_leaf {rebalance_left::<_, _, _, Leaf>(txn, m).await} else {rebalance_left::<_, _, _, Internal>(txn, m).await}} else {super::page_unsized::rebalance::rebalance_right::<_, _, _, Self>(txn, m).await}}}/// Size of a modified page (including the header).fn size<K: Storable, V: Storable>(m: &ModifiedPage<K, V, Page<K, V>>) -> usize {let mut total = {let hdr = header(m.page.as_page());(hdr.left_page() & 0xfff) as usize};if m.c1.is_leaf {let al = core::mem::align_of::<Tuple<K, V>>();total += (HDR + al - 1) & (!al - 1);} else {total += HDR};// Extra size for the offsets.let extra = if m.c1.is_leaf { 0 } else { 8 };if m.ins.is_some() {total += extra + core::mem::size_of::<Tuple<K, V>>();if m.ins2.is_some() {total += extra + core::mem::size_of::<Tuple<K, V>>()}}// If we skip the first entry of `m.c1`, remove its size.if m.skip_first {total -= extra + core::mem::size_of::<Tuple<K, V>>()}total}/// Linear search, leaf version. This is relatively straightforward.fn leaf_linear_search<K: Storable, V: Storable>(k0: &K,v0: Option<&V>,s: &[Tuple<K, V>],) -> Result<usize, usize> {let mut n = 0;for sm in s.iter() {match sm.k.compare(k0) {Ordering::Less => n += 1,Ordering::Greater => return Err(n),Ordering::Equal => {if let Some(v0) = v0 {match sm.v.compare(v0) {Ordering::Less => n += 1,Ordering::Greater => return Err(n),Ordering::Equal => return Ok(n),}} else {return Ok(n);}}}}Err(n)}/// An equivalent of `Ord::cmp` on `Tuple<K, V>` but using/// `Storable::compare` instead of `Ord::cmp` on `K` and `V`.fn cmp<K: Storable, V: Storable>(k0: &K,v0: Option<&V>,p: &[u8; 4096],off: u64,) -> Ordering {let off = u64::from_le(off) & 0xfff;if off as usize + core::mem::size_of::<Tuple<K, V>>() > PAGE_SIZE {panic!("off = {:?}, size = {:?}",off,core::mem::size_of::<Tuple<K, V>>());}let tup = unsafe { &*(p.as_ptr().offset(off as isize & 0xfff) as *const Tuple<K, V>) };match tup.k.compare(k0) {Ordering::Equal => {if let Some(v0) = v0 {tup.v.compare(v0)} else {Ordering::Equal}}o => o,}}/// Linear search for internal nodes. Does what it says.unsafe fn internal_search<K: Storable, V: Storable>(k0: &K,v0: Option<&V>,s: &[u64],p: &[u8; 4096],) -> Result<usize, usize> {for (n, off) in s.iter().enumerate() {match cmp(k0, v0, p, *off) {Ordering::Less => {}Ordering::Greater => return Err(n),Ordering::Equal => return Ok(n),}}Err(s.len())}/// Lookup just forms slices of offsets (for internal nodes) or values/// (for leaves), and runs an internal search on them.async unsafe fn lookup<'a, K: Storable, V: Storable>(page: crate::Page<'a>,c: &mut PageCursor,k0: &K,v0: Option<&V>,) -> Result<usize, usize> {let hdr = header(page);c.total = hdr.n() as usize;c.is_leaf = hdr.is_leaf();if c.is_leaf {let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);let s = core::slice::from_raw_parts(page.data.as_ptr().add(hdr_size) as *const Tuple<K, V>,hdr.n() as usize,);leaf_linear_search(k0, v0, s)} else {let s = core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u64,hdr.n() as usize,);internal_search(k0, v0, s, page.data)}}/// Clone a slice of offsets onto a page. This essentially does what/// it says. Note that the leftmost child of a page is not included in/// the offset slices, so we don't have to handle it here.////// This should really be in the `Alloc` trait, but Rust doesn't have/// associated type constructors, so we can't have an associated type/// that's sometimes a slice and sometimes a "Range".fn clone<K: Storable, V: Storable, L: Alloc>(page: crate::Page,new: &mut MutPage,s: Offsets,n: &mut isize,) {match s {Offsets::Slice(s) => {// We know we're in an internal node here.let size = core::mem::size_of::<Tuple<K, V>>();for off in s.iter() {let off = u64::from_le(*off);let r = off & !0xfff;let off = off & 0xfff;unsafe {let ptr = page.data.as_ptr().add(off as usize);// Reserve the space on the pagelet hdr = header_mut(new);let data = hdr.data() as u16;let off_new = data - size as u16;hdr.set_data(off_new);// Copy the entry from the original page to its// position on the new page.core::ptr::copy_nonoverlapping(ptr, new.0.data.add(off_new as usize), size);// Set the offset to this new entry in the offset// array, along with the right child page.let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;*ptr = (r | off_new as u64).to_le();}*n += 1;}let hdr = header_mut(new);hdr.set_n(hdr.n() + s.len() as u16);hdr.incr((8 + size) * s.len());}Offsets::Range(r) => {let size = core::mem::size_of::<Tuple<K, V>>();let a = core::mem::align_of::<Tuple<K, V>>();let header_size = (HDR + a - 1) & !(a - 1);let len = r.len();for off in r {unsafe {let ptr = page.data.as_ptr().add(header_size + off * size);let new_ptr = new.0.data.add(header_size + (*n as usize) * size);core::ptr::copy_nonoverlapping(ptr, new_ptr, size);}*n += 1;}// On leaves, we do have to update everything manually,// because we're simply copying stuff.let hdr = header_mut(new);hdr.set_n(hdr.n() + len as u16);hdr.incr(size * len);}}}
use super::*;use core::mem::MaybeUninit;pub(crate) async fn rebalance_left<'a,T: AllocPage,K: Storable + core::fmt::Debug,V: Storable + core::fmt::Debug,L: Alloc,>(txn: &mut T,m: Concat<'a, K, V, Page<K, V>>,) -> Result<Op<'a, T, K, V>, T::Error> {assert!(m.mod_is_left);// First element of the right page. We'll rotate it to the left// page, i.e. return a middle element, and append the current// middle element to the left page.let rc = super::PageCursor::new(&m.other, 0);let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();let mut freed_ = [0, 0];// First, perform the modification on the modified page, which we// know is the left page, and return the resulting mutable page// (the modified page may or may not be mutable before we do this).let new_left = if let Some((k, v)) = m.modified.ins {let is_dirty = m.modified.page.is_dirty();let page = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(txn,m.modified.page,m.modified.mutable,m.modified.skip_first,&m.modified.c1,k,v,m.modified.ins2,m.modified.l,m.modified.r,).await? {if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b;}page} else {unreachable!()};// Append the middle element of the concatenation at the end// of the left page. We know the left page to be mutable by// now, and we also know there's enough space to do this.let lc = PageCursor::after(&page.0);if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,).await? {if freed > 0 {let b = if is_dirty { 1 } else { 0 };// index 0 is ok here: if a page is freed, it means we// just compacted a page, which implies that the call// to `put` above didn't free.freed_[0] = freed | b}page} else {unreachable!()}} else if m.modified.skip_first {// Looking at module `super::del`, the only way we can be in// this caselet is_dirty = m.modified.page.is_dirty();let (page, freed) = <Page<K, V>>::del(txn,m.modified.page,m.modified.mutable,&m.modified.c1,m.modified.l,).await?;if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b}// Append the middle element of the concatenation at the end// of the left page. We know the left page to be mutable by// now, and we also know there's enough space to do this.let lc = PageCursor::after(&page.0);if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,).await? {if freed > 0 {let b = if is_dirty { 1 } else { 0 };// index 0 is ok here: if we freed a page in the call// to `del` above, it is mutable and we can allocate// on it. Else, we just compacted a page, but that// also means `del` above didn't free.freed_[0] = freed | b}page} else {unreachable!()}} else {let is_dirty = m.modified.page.is_dirty();let lc = PageCursor::after(&m.modified.page);if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(txn, m.modified.page, m.modified.mutable, false, &lc, m.mid.0, m.mid.1, None, 0, rl,).await? {if m.modified.l > 0 {assert_eq!(m.modified.r, 0);unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);*off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();}} else if m.modified.r > 0 {unsafe {let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);*off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();}}if freed > 0 {let b = if is_dirty { 1 } else { 0 };freed_[0] = freed | b}page} else {unreachable!()}};let f = core::mem::size_of::<Tuple<K, V>>();let (new_right, k, v) = if r == 0 && m.other_is_mutable && m.other.is_dirty() {// Since `r == 0`, we know this is a leaf. Therefore, we need// to rewrite the deleted element to the end of the page, so// that the pointer to the new middle entry is valid when this// function returns.let data = m.other.data;let mut other = MutPage(m.other);let right_hdr = header_mut(&mut other);// Remove the space for one entry.let n = right_hdr.n() as usize;right_hdr.decr(f);right_hdr.set_n((n - 1) as u16);let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);let mut t: MaybeUninit<Tuple<K, V>> = MaybeUninit::uninit();unsafe {// Save the leftmost element (we can't directly copy it to// the end, because the page may be full).core::ptr::copy_nonoverlapping(data.add(hdr_size) as *mut Tuple<K, V>,t.as_mut_ptr(),1,);// Move the n - 1 last elements one entry to the left.core::ptr::copy(data.add(hdr_size + f) as *const Tuple<K, V>,data.add(hdr_size) as *mut Tuple<K, V>,n - 1,);// Copy the last element to the end of the page.let last = data.add(hdr_size + f * (n - 1)) as *mut Tuple<K, V>;core::ptr::copy_nonoverlapping(t.as_ptr(), last, 1);(other, &(&*last).k, &(&*last).v)}} else {// If this isn't a leaf, or isn't mutable, use the standard// deletion function, because we know this will leave the// pointer to the current entry valid.// We extend the pointer's lifetime here, because we know the// current deletion (we only rebalance during deletions) won't// touch this page anymore after this.let k = unsafe { core::mem::transmute(k) };let v = unsafe { core::mem::transmute(v) };// If this frees the old "other" page, add it to the "freed"// array.let is_dirty = m.other.is_dirty();let (page, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;if freed > 0 {freed_[1] = if is_dirty { freed | 1 } else { freed };}(page, k, v)};Ok(Op::Rebalanced {l: new_left.0.offset,r: new_right.0.offset,k,v,freed: freed_,})}
use super::*;pub(super) async fn put<'a,T: AllocPage,K: Storable + core::fmt::Debug,V: Storable + core::fmt::Debug,L: Alloc + super::super::page_unsized::AllocWrite<K, V>,>(txn: &mut T,page: CowPage,mutable: bool,replace: bool,u: usize,k0: &'a K,v0: &'a V,l: u64,r: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {let hdr = header(page.as_page());let is_dirty = page.is_dirty();if mutable && is_dirty && L::can_alloc::<K, V>(hdr) {// If the page is mutable (i.e. not shared with another tree)// and dirty, here's what we do:let mut page = MutPage(page);let p = page.0.data;let hdr = header_mut(&mut page);let mut n = u as isize;if replace {// If we're replacing a value, this can't be in a leaf,// hence the only thing that needs to be done is erasing// the offset in the offset array. This is a bit naive,// since we're moving that array back and forth.assert!(!hdr.is_leaf());unsafe {let ptr = p.add(HDR + n as usize * 8) as *mut u64;core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);hdr.decr(8 + core::mem::size_of::<Tuple<K, V>>());hdr.set_n(hdr.n() - 1);}}// Do the actual insertions.L::alloc_write(&mut page, k0, v0, l, r, &mut n);// No page was freed, return the page.Ok(Put::Ok(Ok { page, freed: 0 }))} else if replace || L::can_compact::<K, V>(hdr) {// Here, we could allocate if we cloned, so let's clone:let mut new = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);// Take the offsets and split it at the cursor position.let s = L::offset_slice::<T, K, V>(page.as_page());let (s0, s1) = s.split_at(u as usize);// If we're replacing, remove the offset that needs to go.let s1 = if replace { s1.split_at(1).1 } else { s1 };// And then clone the page, inserting the new elements between// the two halves of the split offsets.let mut n = 0;clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);L::alloc_write(&mut new, k0, v0, l, r, &mut n);clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);Ok(Put::Ok(Ok {page: new,freed: if is_dirty {page.offset | 1} else {page.offset},}))} else {// Else, split the page.split::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r).await}}async fn split<'a,T: AllocPage,K: Storable + core::fmt::Debug,V: Storable + core::fmt::Debug,L: Alloc + super::super::page_unsized::AllocWrite<K, V>,>(txn: &mut T,page: CowPage,mutable: bool,u: usize,k0: &'a K,v0: &'a V,l: u64,r: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {let mut left;let hdr = header(page.as_page());let page_is_dirty = if page.is_dirty() { 1 } else { 0 };let mut n = hdr.n();let k = n / 2;n += 1;let s = L::offset_slice::<T, K, V>(page.as_page());let (s0, s1) = s.split_at(k as usize);// Find the split entry (i.e. the entry going up in the B// tree). Then, mid_child is the right child of the split// key/value, and `s1` is the offsets of the right side of the// split.let (mut split_key, mut split_value, mid_child, s1) = if u == k as usize {// The inserted element is exactly in the middle.(k0, v0, r, s1)} else {// Else, the split key is the first element of `s1`: set the// new, updated `s1`.let (s1a, s1b) = s1.split_at(1);let (k, v, r) = L::kv(page.as_page(), L::first::<K, V>(&s1a));(k, v, r, s1b)};let mut freed = 0;if u >= k as usize {// If we are here, u >= k, i.e. the insertion is in the// right-hand side of the split, or is the split entry itself.let mut right = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);if u > k as usize {// if we're inserting in the right-hand side, clone to the// newly allocated `right` page, bylet mut n = 0;// Splitting the offsets to make space for an insertion,// and insert in the middle.//// Off-by-one error? Nope: since `k` is the split entry,// the right page starts at index `k + 1`, hence if `u ==// k + 1`, `kk` must be 0.let kk = u as usize - k as usize - 1;let (s1a, s1b) = s1.split_at(kk);L::set_right_child(&mut right, -1, mid_child);clone::<K, V, L>(page.as_page(), &mut right, s1a, &mut n);L::alloc_write(&mut right, k0, v0, l, r, &mut n);clone::<K, V, L>(page.as_page(), &mut right, s1b, &mut n);} else {// Else, just clone the page, we'll take care of returning// the split entry afterwards.clone::<K, V, L>(page.as_page(), &mut right, s1, &mut 0);}if mutable && page.is_dirty() {// (k0, v0) is to be inserted on the right-hand side of// the split, hence we don't have to clone the left-hand// side, we can just truncate it.left = MutPage(page);let hdr = header_mut(&mut left);hdr.set_n(k);hdr.decr((n - 1 - k) as usize * core::mem::size_of::<Tuple<K, V>>());} else {// Else, we need to clone the first `k-1` entries,// i.e. `s0`, onto a new left page.left = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);let mut n = 0;L::set_right_child(&mut left, -1, hdr.left_page() & !0xfff);clone::<K, V, L>(page.as_page(), &mut left, s0, &mut n);freed = page.offset | page_is_dirty}// Finally, if `u` is the middle element, its left and right// children become the leftmost child of the right page, and// the rightmost child of the left page, respectively.if u == k as usize {// Insertion in the middle:// - `l` becomes the right child of the last element on `left`.L::set_right_child(&mut left, k as isize - 1, l);// - `r` (i.e. `mid_child`) becomes the left child of `right`.L::set_right_child(&mut right, -1, mid_child);}Ok(Put::Split {split_key,split_value,left,right,freed,})} else {// Else, the insertion is in the new left page. We first clone// the left page, inserting (k,v) at its position:left = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);// Clone the leftmost pagelet ll = header(page.as_page()).left_page() & !0xfff;L::set_right_child(&mut left, -1, ll);// Clone the two sides, with an entry in between.let (s0a, s0b) = s0.split_at(u as usize);let mut n = 0;clone::<K, V, L>(page.as_page(), &mut left, s0a, &mut n);L::alloc_write(&mut left, k0, v0, l, r, &mut n);clone::<K, V, L>(page.as_page(), &mut left, s0b, &mut n);let mut right;let freed;// Then, for the right page:if mutable && page.is_dirty() {// If we can mutate the original page, remove the first// `k` entries, and return a pointer to the split key (at// position `k` on the page)right = MutPage(page);if let Some((k, v)) = L::truncate_left::<K, V>(&mut right, k as usize + 1) {unsafe {split_key = &*k;split_value = &*v;}}freed = 0;} else {// Else, clone the right page.right = txn.alloc_page().await?;<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);// The left child of the right page is `mid_child`,// i.e. the right child of the split entry.L::set_right_child(&mut right, -1, mid_child);// Clone the rest of the page.let mut n = 0;clone::<K, V, L>(page.as_page(), &mut right, s1, &mut n);freed = page.offset | page_is_dirty}Ok(Put::Split {split_key,split_value,left,right,freed,})}}
use super::*;/// This trait contains methods common to leaves and internal/// nodes. These methods are meant to be just a few lines long each,/// and avoid duplicating code in higher-level functions where just a/// few constants change between internal nodes and leaves.pub(crate) trait Alloc {/// Is there enough room to add one entry into this page (without cloning)?fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool;/// If we clone the page, will there be enough room for one entry?fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool;/// Remove the leftmost `n` elements from the page. On leaves,/// this moves the last truncated element to the end of the page/// and returns a pointer to that element (this function is only/// used in `crate::btree::put`, and the pointer becomes invalid/// at the end of the `crate::btree::put`).////// Returning the last deleted element isn't required for internal/// nodes, since the entries are still valid there.fn truncate_left<K: Storable, V: Storable>(page: &mut MutPage,n: usize,) -> Option<(*const K, *const V)>;/// Allocate a new entry (size inferred), and return the offset on/// the page where the new entry can be copied.fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize;/// Set the right child of the `n`th entry to `r`. If `n == -1`,/// this method can be used to set the leftmost child of a page.fn set_right_child(new: &mut MutPage, n: isize, r: u64);/// "Slices" of offsets, which aren't really slices in the case of/// leaves, but just intervals or "ranges".fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(page: crate::Page<'a>,) -> Offsets<'a>;/// First element of a slice offset. For the sake of code/// simplicity, we return a `u64` even for leaves. In internal/// nodes, the 52 MSBs encode a child page, and the 12 LSBs encode/// an offset in the page.fn first<'a, K, V>(off: &Offsets<'a>) -> u64;/// The tuple and right child at offset `off`.fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64);}/// The type of offsets.#[derive(Debug, Clone)]pub enum Offsets<'a> {/// Slices of child pages and offset-in-page for internal nodes,Slice(&'a [u64]),/// Ranges for leaves.Range(core::ops::Range<usize>),}impl<'a> Offsets<'a> {/// Splitting an offset slice, with the same semantics as/// `split_at` for slices, except if `mid` is equal to the length,/// in which case we return `(self, &[])`.pub fn split_at(&self, mid: usize) -> (Self, Self) {match self {Offsets::Slice(s) if mid < s.len() => {let (a, b) = s.split_at(mid);(Offsets::Slice(a), Offsets::Slice(b))}Offsets::Slice(s) => {debug_assert_eq!(mid, s.len());(Offsets::Slice(s), Offsets::Slice(&[][..]))}Offsets::Range(r) => (Offsets::Range(r.start..r.start + mid),Offsets::Range(r.start + mid..r.end),),}}}pub struct Leaf {}pub struct Internal {}impl Alloc for Leaf {// Checking if there's enough room is just bookkeeping.fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {let f = core::mem::size_of::<Tuple<K, V>>();let al = core::mem::align_of::<Tuple<K, V>>();let header_size = (HDR + al - 1) & !(al - 1);header_size + (hdr.n() as usize + 1) * f <= PAGE_SIZE}/// There's no possible compaction on leaves, it's the same as allocating.fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {Self::can_alloc::<K, V>(hdr)}fn set_right_child(_: &mut MutPage, _: isize, _: u64) {}fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(page: crate::Page<'a>,) -> Offsets<'a> {let hdr = header(page);Offsets::Range(0..(hdr.n() as usize))}/// This returns an offset on the page, so we need to compute that.fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {match off {Offsets::Slice(_) => unreachable!(),Offsets::Range(r) => {let size = core::mem::size_of::<Tuple<K, V>>();let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);(hdr_size + r.start * size) as u64}}}fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {unsafe {let tup = &*(page.data.as_ptr().add(off as usize) as *const Tuple<K, V>);(&tup.k, &tup.v, 0)}}/// Here, we just move `total - *n` elements to the right, and/// increase the bookkeeping values (occupied bytes and number of/// elements).fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, _: u64) -> usize {let f = core::mem::size_of::<Tuple<K, V>>();let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);let hdr_n = header_mut(new).n();unsafe {core::ptr::copy(new.0.data.add(hdr_size + (*n as usize) * f),new.0.data.add(hdr_size + (*n as usize) * f + f),(hdr_n as usize - (*n as usize)) * f,);}let hdr = header_mut(new);hdr.set_n(hdr.n() + 1);hdr.incr(f);hdr_size + (*n as usize) * f}fn truncate_left<K: Storable, V: Storable>(page: &mut MutPage,n: usize,) -> Option<(*const K, *const V)> {let f = core::mem::size_of::<Tuple<K, V>>();let al = core::mem::align_of::<Tuple<K, V>>();let hdr_size = (HDR + al - 1) & !(al - 1);let hdr = header_mut(page);let hdr_n = hdr.n();hdr.set_n(hdr_n - n as u16);hdr.decr(n * f);// Now, this is a bit trickier. This performs a rotation by 1// without all the rotation machinery from the Rust core// library.//// Indeed, since we're in a leaf, we are effectively erasing// the split entry. There is a choice to be made here, between// passing the entry by value or by reference. Because splits// might cascade with the same middle entry, passing it by// value may be significantly worse if the tree is deep, and// is never significantly better (we'll end up copying this// entry twice anyway).unsafe {let mut swap: core::mem::MaybeUninit<Tuple<K, V>> = core::mem::MaybeUninit::uninit();core::ptr::copy(page.0.data.add(hdr_size + (n - 1) * f),swap.as_mut_ptr() as *mut u8,f,);core::ptr::copy(page.0.data.add(hdr_size + n * f),page.0.data.add(hdr_size),(hdr_n as usize - n) * f,);core::ptr::copy(swap.as_ptr() as *mut u8,page.0.data.add(hdr_size + (hdr_n as usize - n) * f),f,);let tup =&*(page.0.data.add(hdr_size + (hdr_n as usize - n) * f) as *const Tuple<K, V>);Some((&tup.k, &tup.v))}}}impl Alloc for Internal {fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {(HDR as usize) + (hdr.n() as usize + 1) * 8 + core::mem::size_of::<Tuple<K, V>>()< hdr.data() as usize}fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {(HDR as usize)+ ((hdr.left_page() & 0xfff) as usize)+ 8+ core::mem::size_of::<Tuple<K, V>>()<= PAGE_SIZE}/// Set the right-hand child in the offset array, preserving the/// current offset.fn set_right_child(page: &mut MutPage, n: isize, r: u64) {unsafe {debug_assert_eq!(r & 0xfff, 0);let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;let off = u64::from_le(*ptr) & 0xfff;*ptr = (r | off as u64).to_le();}}fn offset_slice<'a, T, K: Storable, V: Storable>(page: crate::Page<'a>) -> Offsets<'a> {let hdr = header(page);unsafe {Offsets::Slice(core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u64,hdr.n() as usize,))}}fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {match off {Offsets::Slice(s) => s[0],Offsets::Range(_) => unreachable!(),}}fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {unsafe {let tup = &*(page.data.as_ptr().add((off & 0xfff) as usize) as *const Tuple<K, V>);(&tup.k, &tup.v, off & !0xfff)}}// Much simpler than in leaves, because we just need to move the// offsets. There's a little bit of bookkeeping involved though.fn truncate_left<K: Storable, V: Storable>(page: &mut MutPage,n: usize,) -> Option<(*const K, *const V)> {let hdr_n = header_mut(page).n();unsafe {// We want to copy the left child of the first entry that// will be kept alive as the left child of the page.core::ptr::copy(page.0.data.add(HDR + (n - 1) * 8),page.0.data.add(HDR - 8),// We're removing n entries, so we need to copy the// remaining `hdr_n - n`, plus the leftmost child// (hence the + 1).(hdr_n as usize - n + 1) * 8,);}// Remaining occupied size on the page (minus the header).let size = (8 + core::mem::size_of::<Tuple<K, V>>()) * (hdr_n as usize - n);let hdr = header_mut(page);hdr.set_n(hdr_n - n as u16);// Because the leftmost child has been copied from an entry// containing an offset, its 12 LBSs are still enconding an// offset on the page, whereas it should encode the occupied// bytes on the page. Reset it.hdr.set_occupied(size as u64);None}fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize {let hdr = header_mut(new);// Move the `data` field one entry to the left.let data = hdr.data() as u16;let off_new = data - core::mem::size_of::<Tuple<K, V>>() as u16;hdr.set_data(off_new);// Increment the number of entries, add the newly occupied size.hdr.set_n(hdr.n() + 1);hdr.incr(8 + core::mem::size_of::<Tuple<K, V>>());let hdr_n = hdr.n();unsafe {// Make space in the offset array by moving the last// `hdr_n - *n` elements.core::ptr::copy(new.0.data.add(HDR + (*n as usize) * 8),new.0.data.add(HDR + (*n as usize) * 8 + 8),(hdr_n as usize - (*n as usize)) * 8,);// Add the offset.let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;*ptr = (r | off_new as u64).to_le();}off_new as usize}}impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Internal {fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {alloc_write::<K, V, Self>(new, k0, v0, l, r, n)}fn set_left_child(new: &mut MutPage, n: isize, l: u64) {if l > 0 {Internal::set_right_child(new, n - 1, l);}}}impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Leaf {fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {alloc_write::<K, V, Self>(new, k0, v0, l, r, n)}fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {debug_assert_eq!(l, 0);}}/// Simply allocate an entry in the page, copy it, and set its right/// and left children.fn alloc_write<K: Storable, V: Storable, L: Alloc>(new: &mut MutPage,k0: &K,v0: &V,l: u64,r: u64,n: &mut isize,) {let off_new = L::alloc_insert::<K, V>(new, n, r);unsafe {let new_ptr = &mut *(new.0.data.add(off_new as usize) as *mut Tuple<K, V>);core::ptr::copy_nonoverlapping(k0, &mut new_ptr.k, 1);core::ptr::copy_nonoverlapping(v0, &mut new_ptr.v, 1);}if l > 0 {L::set_right_child(new, *n - 1, l);}*n += 1;}
//! An implementation of B trees. The core operations on B trees//! (lookup, iterate, put and del) are generic in the actual//! implementation of nodes, via the [`BTreePage`] and//! [`BTreeMutPage`]. This allows for a simpler code for the//! high-level functions, as well as specialised, high-performance//! implementations for the nodes.//!//! Two different implementations are supplied: one in [`page`] for//! types with a size known at compile-time, which yields denser//! leaves, and hence shallower trees (if the values are really using//! the space). The other one, in [`page_unsized`], is for//! dynamic-sized types, or types whose representation is dynamic, for//! example enums that are `Sized` in Rust, but whose cases vary//! widely in size.use crate::*;#[doc(hidden)]pub mod cursor;pub use cursor::{iter, rev_iter, Cursor, Iter, RevIter};pub mod del;pub use del::{del, del_no_decr};pub mod put;pub use put::put;pub mod page;pub mod page_unsized;#[async_trait(?Send)]pub trait BTreePage<K: ?Sized, V: ?Sized>: Sized {type Cursor: Send + Sync + Clone + Copy + core::fmt::Debug;/// Whether this cursor is at the end of the page.fn is_empty(c: &Self::Cursor) -> bool;/// Whether this cursor is strictly before the first element.fn is_init(c: &Self::Cursor) -> bool;/// Returns a cursor set before the first element of the page/// (i.e. set to -1).fn cursor_before(p: &CowPage) -> Self::Cursor;/// Returns a cursor set to the first element of the page/// (i.e. 0). If the page is empty, the returned cursor might be/// empty.fn cursor_first(p: &CowPage) -> Self::Cursor {let mut c = Self::cursor_before(p);Self::move_next(&mut c);c}/// Returns a cursor set after the last element of the page/// (i.e. to element n)fn cursor_after(p: &CowPage) -> Self::Cursor;/// Returns a cursor set to the last element of the page. If the/// cursor is empty, this is the same as `cursor_before`.fn cursor_last(p: &CowPage) -> Self::Cursor {let mut c = Self::cursor_after(p);Self::move_prev(&mut c);c}/// Return the element currently pointed to by the cursor (if the/// cursor is not before or after the elements of the page), and/// move the cursor to the next element.fn next<'b>(p: Page<'b>,c: &mut Self::Cursor,) -> Option<(&'b K, &'b V, u64)> {if let Some((k, v, r)) = Self::current(p, c) {Self::move_next(c);Some((k, v, r))} else {None}}/// Move the cursor to the previous element, and return that/// element. Note that this is not the symmetric of `next`.fn prev<'b>(p: Page<'b>,c: &mut Self::Cursor,) -> Option<(&'b K, &'b V, u64)> {if Self::move_prev(c) {Self::current(p, c)} else {None}}/// Move the cursor to the next position. Returns whether the/// cursor was actually moved (i.e. `true` if and only if the/// cursor isn't already after the last element).fn move_next(c: &mut Self::Cursor) -> bool;/// Move the cursor to the previous position. Returns whether the/// cursor was actually moved (i.e. `true` if and only if the/// cursor isn't strictly before the page).fn move_prev(c: &mut Self::Cursor) -> bool;/// Returns the current element, if the cursor is pointing at one.fn current<'a>(p: Page<'a>,c: &Self::Cursor,) -> Option<(&'a K, &'a V, u64)>;/// Returns the left child of the entry pointed to by the cursor.fn left_child(p: Page, c: &Self::Cursor) -> u64;/// Returns the right child of the entry pointed to by the cursor.fn right_child(p: Page, c: &Self::Cursor) -> u64;/// Sets the cursor to the last element less than or equal to `k0`/// if `v0.is_none()`, and to `(k0, v0)` if `v0.is_some()`. If a/// match is found (on `k0` only if `v0.is_none()`, on `(k0, v0)`/// else), return the match along with the right child.////// Else (in the `Err` case of the `Result`), return the position/// at which `(k0, v0)` can be inserted.async fn set_cursor<'a, 'b, T: LoadPage>(txn: &'a T,page: Page<'b>,c: &mut Self::Cursor,k0: &K,v0: Option<&V>,) -> Result<(&'a K, &'a V, u64), usize>;/// Splits the cursor into two cursors: the elements strictly/// before the current position, and the elements greater than or/// equal the current element.fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor);}pub struct PageIterator<'a, T: LoadPage, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {cursor: P::Cursor,_txn: &'a T,page: Page<'a>,}impl<'a, T: LoadPage, K: ?Sized + 'a, V: ?Sized + 'a, P: BTreePage<K, V>> Iteratorfor PageIterator<'a, T, K, V, P>{type Item = (&'a K, &'a V, u64);fn next(&mut self) -> Option<Self::Item> {P::next(self.page, &mut self.cursor)}}#[async_trait(?Send)]pub trait BTreeMutPage<K: ?Sized, V: ?Sized>: BTreePage<K, V> + core::fmt::Debug {/// Initialise a page.fn init(page: &mut MutPage);/// Add an entry to the page, possibly splitting the page in the/// process.////// Makes the assumption that `k1v1.is_some()` implies/// `replace`. When `k1v1.is_some()`, we insert both `(k0, v0)`/// (as a replacement), followed by `(k1, v1)`. This is only ever/// used when deleting, and when the right child of a deleted/// entry (in an internal node) has split while we were looking/// for a replacement for the deleted entry.////// Since we only look for replacements in the right child, the/// left child of the insertion isn't modified, in which case `l`/// and `r` are interpreted as the left and right child of the new/// entry, `k1v1`.async fn put<'a, T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,replace: bool,c: &Self::Cursor,k0: &'a K,v0: &'a V,k1v1: Option<(&'a K, &'a V)>,l: u64,r: u64,) -> Result<crate::btree::put::Put<'a, K, V>, T::Error>;/// Add an entry to `page`, at position `c`. Does not check/// whether there is enough space to do so. This method is mostly/// useful for cloning pages.#[allow(unused_variables)]unsafe fn put_mut(page: &mut MutPage,c: &mut Self::Cursor,k0: &K,v0: &V,r: u64,);#[allow(unused_variables)]unsafe fn set_left_child(page: &mut MutPage,c: &Self::Cursor,l: u64);/// Update the left child of the position pointed to by the/// cursor.async fn update_left_child<T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,c: &Self::Cursor,r: u64,) -> Result<crate::btree::put::Ok, T::Error>;type Saved;/// Save a leaf entry as a replacement, when we delete at an/// internal node. This can be a pointer to the leaf if the leaf/// isn't mutated, or the actual value, or something else.fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved;/// Recover the saved value.unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V);/// Delete an entry on the page, returning the resuting page along/// with the offset of the freed page (or 0 if no page was freed,/// i.e. if `page` is mutable).async fn del<T: AllocPage>(txn: &mut T,page: CowPage,mutable: bool,c: &Self::Cursor,l: u64,) -> Result<(MutPage, u64), T::Error>;/// Merge, rebalance or update a concatenation.async fn merge_or_rebalance<'a, T: AllocPage>(txn: &mut T,m: del::Concat<'a, K, V, Self>,) -> Result<del::Op<'a, T, K, V>, T::Error>;}/// A database, which is essentially just a page offset along with markers.#[derive(Debug)]pub struct Db_<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {pub db: u64,pub k: core::marker::PhantomData<K>,pub v: core::marker::PhantomData<V>,pub p: core::marker::PhantomData<P>,}unsafe impl<K, V, P: BTreePage<K, V>> Sync for Db_<K, V, P>{}/// A database of sized values.pub type Db<K, V> = Db_<K, V, page::Page<K, V>>;#[async_trait(?Send)]impl<K:Storable, V:Storable, P: BTreePage<K, V> + Send + core::fmt::Debug> Storable for Db_<K, V, P> {type PageReferences = core::iter::Once<u64>;fn page_references(&self) -> Self::PageReferences {core::iter::once(self.db)}async fn drop<T: AllocPage>(&self, t: &mut T) -> Result<(), T::Error> {drop_(t, self).await}}/// A database of unsized values.pub type UDb<K, V> = Db_<K, V, page_unsized::Page<K, V>>;impl<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> Db_<K, V, P> {/// Load a database from a page offset.pub fn from_page(db: u64) -> Self {Db_ {db,k: core::marker::PhantomData,v: core::marker::PhantomData,p: core::marker::PhantomData,}}}/// Create a database with an arbitrary page implementation.pub async fn create_db_<T: AllocPage, K: ?Sized, V: ?Sized, P: BTreeMutPage<K, V>>(txn: &mut T,) -> Result<Db_<K, V, P>, T::Error> {let mut page = txn.alloc_page().await?;P::init(&mut page);Ok(Db_ {db: page.0.offset,k: core::marker::PhantomData,v: core::marker::PhantomData,p: core::marker::PhantomData,})}/// Create a database for sized keys and values.pub async fn create_db<T: AllocPage, K: Storable, V: Storable>(txn: &mut T,) -> Result<Db_<K, V, page::Page<K, V>>, T::Error> {create_db_(txn).await}/// Fork a database.pub async fn fork_db<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(txn: &mut T,db: &Db_<K, V, P>,) -> Result<Db_<K, V, P>, T::Error> {txn.incr_rc(db.db).await?;Ok(Db_ {db: db.db,k: core::marker::PhantomData,v: core::marker::PhantomData,p: core::marker::PhantomData,})}/// Get the first entry of a database greater than or equal to `k` (or/// to `(k, v)` if `v.is_some()`), and return `true` if this entry is/// shared with another structure (i.e. the RC of at least one page/// along a path to the entry is at least 2).pub async fn get_shared<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(txn: &'a T,db: &Db_<K, V, P>,k: &K,v: Option<&V>,) -> Result<Option<(&'a K, &'a V, bool)>, T::Error> {// Set the "cursor stack" by setting a skip list cursor in// each page from the root to the appropriate leaf.let mut last_match = None;let mut page = txn.load_page(db.db).await?;let mut is_shared = txn.rc(db.db).await? >= 2;// This is a for loop, to allow the compiler to unroll (maybe).for _ in 0..cursor::N_CURSORS {let mut cursor = P::cursor_before(&page);if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {if v.is_some() {return Ok(Some((kk, vv, is_shared)));}last_match = Some((kk, vv, is_shared));} else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {// Here, Rust says that `k` and `v` have the same lifetime// as `page`, but we actually know that they're alive for// as long as `txn` doesn't change, so we can safely// extend their lifetimes:unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v), is_shared)) }}// The cursor is set to the first element greater than or// equal to the (k, v) we're looking for, so we need to// explore the left subtree.let next_page = P::left_child(page.as_page(), &cursor);if next_page > 0 {page = txn.load_page(next_page).await?;is_shared |= txn.rc(db.db).await? >= 2;} else {break;}}Ok(last_match)}/// Get the first entry of a database greater than or equal to `k` (or/// to `(k, v)` if `v.is_some()`), and return `true` if this entry is/// shared with another structure (i.e. the RC of at least one page/// along a path to the entry is at least 2).pub async fn get<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(txn: &'a T,db: &Db_<K, V, P>,k: &K,v: Option<&V>,) -> Result<Option<(&'a K, &'a V)>, T::Error> {// Unfortunately, since the above function `get_shared` uses this// one in order to get the RC of pages, we can't really refactor,// so this is mostly a copy of the other function.// Set the "cursor stack" by setting a skip list cursor in// each page from the root to the appropriate leaf.let mut last_match = None;let mut page = txn.load_page(db.db).await?;// This is a for loop, to allow the compiler to unroll (maybe).for _ in 0..cursor::N_CURSORS {let mut cursor = P::cursor_before(&page);if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {if v.is_some() {return Ok(Some((kk, vv)));}last_match = Some((kk, vv));} else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {// Here, Rust says that `k` and `v` have the same lifetime// as `page`, but we actually know that they're alive for// as long as `txn` doesn't change, so we can safely// extend their lifetimes:unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v))) }}// The cursor is set to the first element greater than or// equal to the (k, v) we're looking for, so we need to// explore the left subtree.let next_page = P::left_child(page.as_page(), &cursor);if next_page > 0 {page = txn.load_page(next_page).await?;} else {break;}}Ok(last_match)}/// Drop a database recursively, dropping all referenced keys and/// values that aren't shared with other databases.pub async fn drop<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(txn: &mut T,db: Db_<K, V, P>,) -> Result<(), T::Error> {drop_(txn, &db).await}use core::mem::MaybeUninit;struct PageCursor_<K:?Sized, V:?Sized, P: BTreePage<K, V>>(MaybeUninit<cursor::PageCursor<K, V, P>>);unsafe impl<K: ?Sized, V:?Sized, P: BTreePage<K, V>> Send for PageCursor_<K, V, P> {}async fn drop_<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(txn: &mut T,db: &Db_<K, V, P>,) -> Result<(), T::Error> {let mut stack: [PageCursor_<K, V, P>; cursor::N_CURSORS] = [PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),PageCursor_(MaybeUninit::uninit()),];let mut ptr = 0;// Push the root page of `db` onto the stack.let page = txn.load_page(db.db).await?;stack[0] = PageCursor_(MaybeUninit::new(cursor::PageCursor {cursor: P::cursor_before(&page),page,}));// Then perform a DFS:loop {// Look at the top element of the stack.let cur_off = unsafe { (&*stack[ptr].0.as_ptr()).page.offset };// If it needs to be dropped (i.e. if its RC is <= 1), iterate// its cursor and drop all its referenced pages.let rc = txn.rc(cur_off).await?;if rc <= 1 {// if there's a current element in the cursor (i.e. we// aren't before or after the elements), decrease its RC.if let Some((k, v, _)) = {let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };P::current(cur.page.as_page(), &cur.cursor)} {k.drop(txn).await?;v.drop(txn).await?;}// In all cases, move next and push the left child onto// the stack. This works because pushed cursors are// initially set to before the page's elements.let r = {let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };if P::move_next(&mut cur.cursor) {Some(P::left_child(cur.page.as_page(), &cur.cursor))} else {None}};if let Some(r) = r {if r > 0 {ptr += 1;let page = txn.load_page(r).await?;stack[ptr] = PageCursor_(MaybeUninit::new(cursor::PageCursor {cursor: P::cursor_before(&page),page,}))}continue;}}// Here, either rc > 1 (in which case the only thing we need// to do in this iteration is to decrement the RC), or else// `P::move_next` returned `false`, meaning that the cursor is// after the last element (in which case we are done with this// page, and also need to decrement its RC, in order to free// it).let (is_dirty, offset) = {let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };(cur.page.is_dirty(), cur.page.offset)};if is_dirty {txn.decr_rc_owned(offset).await?;} else {txn.decr_rc(offset).await?;}// If this was the bottom element of the stack, stop, else, pop.if ptr == 0 {break;} else {ptr -= 1;}}Ok(())}
//! Deletions from a B tree.use super::cursor::*;use super::put::{Ok, Put};use super::*;use crate::LoadPage;/// Represents the result of a merge or rebalance, without touching/// the parent of the merge/rebalance.#[derive(Debug)]pub enum Op<'a, T, K: ?Sized, V: ?Sized> {Merged {// New merged page.page: MutPage,// Pages freed by the merge (0 meaning "no page").freed: [u64; 2],marker: core::marker::PhantomData<T>,},Rebalanced {// New middle key.k: &'a K,// New middle value.v: &'a V,// New left page.l: u64,// New right page.r: u64,// Pages freed by the rebalance (0 meaning "no page").freed: [u64; 2],},Put(crate::btree::put::Put<'a, K, V>),}/// Represents a page with modifications from a merge.#[derive(Debug)]pub struct ModifiedPage<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {pub page: CowPage,// Whether the page is mutable with another tree.pub mutable: bool,// Start copying c0 (keep `page`'s left child).pub c0: P::Cursor,// If > 0, replace the right child of c0's last element with `l`.pub l: u64,// If > 0, replace the left child of c1's last element with `r`.pub r: u64,// Possibly insert a new binding.pub ins: Option<(&'a K, &'a V)>,// If a rebalance causes a split, we might need to insert an entry// after the replacement.pub ins2: Option<(&'a K, &'a V)>,// The first element of c1 is to be deleted, the others must be copied.pub c1: P::Cursor,// Whether to skip `c1`'s first element.pub skip_first: bool,// Whether the page we just modified is `self.l`.pub mod_is_left: bool,}/// Represents the concatenation of a modified page and one of its/// sibling (left or right).#[derive(Debug)]pub struct Concat<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {/// Modified page.pub modified: ModifiedPage<'a, K, V, P>,/// Middle element.pub mid: (&'a K, &'a V),/// Sibling of the modified page.pub other: CowPage,/// Is the modified field on the left or on the right of the/// concatenation?pub mod_is_left: bool,/// Is the other page owned by this tree? If not, it means `other`/// is shared with another tree, and hence we need to increase the/// reference count of entries coming from `other`.pub other_is_mutable: bool,}/// If `value` is `None`, delete the first entry for `key` from the/// database, if present. Else, delete the entry for `(key, value)`,/// if present.pub async fn del<T: AllocPage + LoadPage,K: Storable + ?Sized + PartialEq + Sync,V: Storable + ?Sized + PartialEq + Sync,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,key: &K,value: Option<&V>,) -> Result<bool, T::Error> {let mut cursor = Cursor::new(txn, db).await?;// If the key and value weren't found, return.match (cursor.set(txn, key, value).await?, value) {(Some((k, v)), Some(value)) if k == key && v == value => {}(Some((k, _)), None) if k == key => {}_ => return Ok(false),}// Else, the cursor is set on `(key, value)` if `value.is_some()`// or on the first entry with key `key` else.//// We delete that position, which might be in a leaf or in an// internal node.del_at_cursor(txn, db, &mut cursor, true).await}/// If `value` is `None`, delete the first entry for `key` from the/// database, if present. Else, delete the entry for `(key, value)`,/// if present.pub async fn del_no_decr<T: AllocPage + LoadPage,K: Storable + ?Sized + PartialEq + Sync,V: Storable + ?Sized + PartialEq + Sync,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,key: &K,value: Option<&V>,) -> Result<bool, T::Error> {let mut cursor = Cursor::new(txn, db).await?;// If the key and value weren't found, return.match (cursor.set(txn, key, value).await?, value) {(Some((k, v)), Some(value)) if k == key && v == value => {}(Some((k, _)), None) if k == key => {}_ => return Ok(false),}// Else, the cursor is set on `(key, value)` if `value.is_some()`// or on the first entry with key `key` else.//// We delete that position, which might be in a leaf or in an// internal node.del_at_cursor(txn, db, &mut cursor, false).await}/// Delete the entry pointed to by `cursor`. The cursor can't be used/// after this.#[doc(hidden)]pub async fn del_at_cursor<T: AllocPage + LoadPage,K: Storable + ?Sized + PartialEq + Sync,V: Storable + ?Sized + Sync,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,cursor: &mut Cursor<K, V, P>,decr_del_rc: bool,) -> Result<bool, T::Error> {let p0 = cursor.len();// If p0 < cursor.first_rc_level, the page containing the deleted// element is not shared with another table. Therefore, the RC of// the pages referenced by the deleted element needs to be// decreased.//// Else, that RC doesn't need to be changed, since it's still// referenced by the same pages as before, just not by the pages// that are cloned by this deletion.//// Note in particular that if p0 == cursor.first_rc_len(), then we// don't need to touch the RC, since we're cloning this page, but// without including a reference to the deleted entry.if decr_del_rc && p0 < cursor.first_rc_len() {let (delk, delv, _) = {let cur = cursor.cur();P::current(cur.page.as_page(), &cur.cursor).unwrap()};delk.drop(txn).await?;delv.drop(txn).await?;}// Find the leftmost page in the right subtree of the deleted// element, in order to find a replacement.let cur = cursor.cur();let right_subtree = P::right_child(cur.page.as_page(), &cur.cursor);cursor.set_first_leaf(txn, right_subtree).await?;let leaf_is_shared = cursor.len() >= cursor.first_rc_len();// In the leaf, mark the replacement for deletion, and keep a// reference to it in k0v0 if the deletion happens in an internal// node (k0v0 is `Option::None` else).let (mut last_op, k0v0) = leaf_delete(txn, cursor, p0).await?;// If the leaf is shared with another tree, then since we're// cloning the leaf, update the reference counts of all the// references contained in the leaf.if leaf_is_shared {modify_rc(txn, &last_op).await?;}let mut free = [[0, 0]; N_CURSORS];// Then, climb up the stack, performing the operations lazily. At// each step, we are one level above the page that we plan to// modify, since `last_op` is the result of popping the// stack.//// We iterate up to the root. The last iteration builds a modified// page for the root, but doesn't actually execute it.while cursor.len() > 0 {// Prepare a plan for merging the current modified page (which// is stored in `last_op`) with one of its neighbours.let concat = concat(txn, cursor, p0, &k0v0, last_op).await?;let mil = concat.mod_is_left;let incr_page = if !concat.other_is_mutable {Some(CowPage {offset: concat.other.offset,data: concat.other.data,})} else {None};let incr_mid = if cursor.len() >= cursor.first_rc_len() {Some(concat.mid)} else {None};// Execute the modification plan, resulting in one of four// different outcomes (described in the big match in// `handle_merge`). This mutates or clones the current// modified page, returning an `Op` describing what happened// (between update, merge, rebalance and split).let merge = P::merge_or_rebalance(txn, concat).await?;// Prepare a description (`last_op`) of the next page// modification, without mutating nor cloning that page.last_op = handle_merge(txn, cursor, p0, &k0v0, incr_page, incr_mid, mil, merge, &mut free).await?;// If the modified page is shared with another tree, we will// clone it in the next round. Therefore, update the reference// counts of all the references in the modified page.//// Since `handle_merge` pops the stack, the modified page is// at level `cursor.len() + 1`.if cursor.len() + 1 >= cursor.first_rc_len() {modify_rc(txn, &last_op).await?;}}// The last modification plan was on the root, and still needs to// be executed.let root_is_shared = cursor.first_rc_len() == 1;update_root(txn, db, last_op, k0v0, root_is_shared, &mut free).await?;// Finally, free all the pages marked as free during the deletion,// now that we don't need to read them anymore.for p in free.iter() {if p[0] & 1 == 1 {txn.decr_rc_owned(p[0] ^ 1).await?;} else if p[0] > 0 {txn.decr_rc(p[0]).await?;}if p[1] & 1 == 1 {txn.decr_rc_owned(p[1] ^ 1).await?;} else if p[1] > 0 {txn.decr_rc(p[1]).await?;}}Ok(true)}/// Preparing a modified page for the leaf.async fn leaf_delete<'a,T: AllocPage + LoadPage,K: Storable + ?Sized + 'a,V: Storable + ?Sized + 'a,P: BTreeMutPage<K, V> + 'a,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,p0: usize,) -> Result<(ModifiedPage<'a, K, V, P>, Option<P::Saved>), T::Error> {let is_rc = cursor.len() >= cursor.first_rc_len();let del_at_internal = p0 < cursor.len();let curs0 = cursor.pop().unwrap();let (c0, c1) = P::split_at(&curs0.cursor);let mut deleted = None;if del_at_internal {// In this case, we need to save the replacement, and if this// leaf is shared with another tree, we also need increment// the replacement's references, since we are copying it.let (k, v, _) = P::current(curs0.page.as_page(), &c1).unwrap();if is_rc {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}}deleted = Some(P::save_deleted_leaf_entry(k, v))}Ok((ModifiedPage {page: curs0.page,mutable: !is_rc,c0,c1,skip_first: true,l: 0,r: 0,ins: None,ins2: None,// The following (`mod_is_left`) is meaningless in this// context, and isn't actually used: indeed, the only// place where this field is used is when modifying the// root, when `ins.is_some()`.mod_is_left: true,},deleted,))}/// From a cursor at level `p = cursor.len()`, form the/// concatenation of `last_op` (which is a modified page at level p +/// 1`, and its left or right sibling, depending on the case.async fn concat<'a,T: AllocPage + LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,p0: usize,k0v0: &Option<P::Saved>,last_op: ModifiedPage<'a, K, V, P>,) -> Result<Concat<'a, K, V, P>, T::Error> {let p = cursor.len();let rc_level = cursor.first_rc_len();let curs = cursor.current_mut();if p == p0 {// Here, p == p0, meaning that we're deleting at an internal// node. If that's the case, k0v0 is `Some`, hence we can// safely unwrap the replacement.let (k0, v0) = unsafe { P::from_saved(k0v0.as_ref().unwrap()) };// Since we've picked the leftmost entry of the right child of// the deleted entry, the other page to consider in the// concatenation is the left child of the deleted entry.let other = txn.load_page(P::left_child(curs.page.as_page(), &curs.cursor)).await?;// Then, if the page at level `p` or one of its ancestor, is// pointed at least twice (i.e. if `p >= rc_level`, or// alternatively if `other` is itself pointed at least twice,// the page is immutable, meaning that we can't delete on it// when rebalancing.let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;Ok(Concat {modified: last_op,mid: (k0, v0),other,mod_is_left: false,other_is_mutable,})} else {// Here, `p` is not a leaf (but `last_op` might be), and does// not contain the deleted entry.// In this case, the visited page at level `p+1` is always on// the left-hand side of the cursor at level `p` (this is an// invariant of cursors). However, if the cursor at level `p`// is past the end of the page, we need to move one step back// to find a middle element for the concatenation, in which// case the visited page becomes the right-hand side of the// cursor.let ((k, v, r), mod_is_left) =if let Some(x) = P::current(curs.page.as_page(), &curs.cursor) {// Not the last element of the page.(x, true)} else {// Last element of the page.let (k, v, _) = P::prev(curs.page.as_page(), &mut curs.cursor).unwrap();let l = P::left_child(curs.page.as_page(), &curs.cursor);((k, v, l), false)};let other = txn.load_page(r).await?;let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;Ok(Concat {modified: last_op,// The middle element comes from the page above this// concatenation, and hence has the same lifetime as that// page. However, our choice of lifetimes can't express// this, but we also know that we are not going to mutate// the parent before rebalancing or merging the// concatenation, and hence this pointer will be alive// during these operations (i.e. during the merge or// rebalance).mid: unsafe { (core::mem::transmute(k), core::mem::transmute(v)) },other,mod_is_left,other_is_mutable,})}}/// Prepare a modified version of the page at the current level `p =/// cursor.len()`.async fn handle_merge<'a,T: AllocPage + LoadPage,K: Storable + ?Sized + PartialEq,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,cursor: &mut Cursor<K, V, P>,p0: usize,k0v0: &Option<P::Saved>,incr_other: Option<CowPage>,incr_mid: Option<(&K, &V)>,mod_is_left: bool, // The modified page in the `merge` is the left one.merge: Op<'a, T, K, V>,free: &mut [[u64; 2]; N_CURSORS],) -> Result<ModifiedPage<'a, K, V, P>, T::Error> {let mutable = cursor.len() < cursor.first_rc_len();let mut last_op = {// Beware, a stack pop happens here, all subsequent references// to the pointer must be updated.let curs = cursor.pop().unwrap();let (c0, c1) = P::split_at(&curs.cursor);ModifiedPage {page: curs.page,mutable,c0,l: 0,r: 0,ins: None,ins2: None,c1,skip_first: true,mod_is_left,}};// For merges and rebalances, take care of the reference counts of// pages and key/values.match merge {Op::Merged {.. } | Op::Rebalanced { .. } => {// Increase the RC of the "other page's" descendants. In// the case of a rebalance, this has the effect of// increasing the RC of the new middle entry if that entry// comes from a shared page, which is what we want.if let Some(other) = incr_other {let mut curs = P::cursor_first(&other);let left = P::left_child(other.as_page(), &curs);if left > 0 {txn.incr_rc(left).await?;}while let Some((k0, v0, r)) = P::next(other.as_page(), &mut curs) {for o in k0.page_references().chain(v0.page_references()) {txn.incr_rc(o).await?;}if r != 0 {txn.incr_rc(r).await?;}}}// If the middle element comes from a shared page,// increment its references.if let Some((ref k, ref v)) = incr_mid {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}}}_ => {}}// Here, we match on `merge`, the operation that just happened at// level `cursor.len() + 1`, and build the modification plan// for the page at the current level (i.e. at level// `cursor.len()`).let freed = match merge {Op::Merged {page,freed,marker: _,} => {// If we're deleting at an internal node, the// replacement has already been included in the merged// page.last_op.l = page.0.offset;freed}Op::Rebalanced {k,v,l,r,freed,} => {// If we're deleting at an internal node, the// replacement is already in pages `l` or `r`.last_op.l = l;last_op.r = r;last_op.ins = Some((k, v));freed}Op::Put(Put::Ok(Ok { page, freed })) => {// No merge, split or rebalance has happened. If we're// deleting at an internal node (i.e. if cursor.len ==// p0), we need to mark the replacement here.if cursor.len() + 1 == p0 {// We have already incremented the references of the// replacement at the leaf.if let Some(k0v0) = k0v0 {last_op.ins = Some(unsafe { P::from_saved(k0v0) });}last_op.r = page.0.offset;} else {last_op.skip_first = false;if mod_is_left {last_op.l = page.0.offset;} else {last_op.r = page.0.offset;}}[freed, 0]}Op::Put(Put::Split {left,right,split_key,split_value,freed,}) => {// This case only happens if `(K, V)` is not `Sized`, when// either (1) a rebalance replaces a key/value pair with a// larger one or (2) another split has happened in a page// below.let split_key_is_k0 = if cursor.len() == p0 {// In this case, ins2 comes after ins, since the// replacement is in the right child of the deleted// key.if let Some(k0v0) = k0v0 {last_op.ins = Some(unsafe { P::from_saved(k0v0) });}last_op.ins2 = Some((split_key, split_value));false} else {last_op.ins = Some((split_key, split_value));last_op.skip_first = false;// If the page modified in the last step is the one on// the right of the current entry, move right one step// before inserting the split key/value.//// (remember that we popped the stack upon entering// this function).//// We need to do this because the split key/value is// inserted immediately *before* the current cursor// position, which is incorrect if the split key/value// comes from the right-hand side of the current// cursor position.//// This can happen in exactly two situations:// - when the element we are deleting is the one we are// skipping here.// - when we are deleting in the rightmost child of a// page.if cursor.len() > 0 && !mod_is_left {P::move_next(&mut last_op.c1);}// If the split key is the replacement element, we have// already increased its counter when we deleted it from// its original position at the bottom of the tree.//// This can happen if we replaced an element and that// replacement caused a split with the replacement as the// middle element.if let Some(k0v0) = k0v0 {assert!(cursor.len() + 1 < p0);let (k0, _) = unsafe { P::from_saved(k0v0) };core::ptr::eq(k0, split_key)} else {false}};// The `l` and `r` fields are relative to `ins2` if// `ins2.is_some()` or to `ins` else.last_op.l = left.0.offset;last_op.r = right.0.offset;if cursor.len() + 2 >= cursor.first_rc_len() && !split_key_is_k0 {for o in split_key.page_references().chain(split_value.page_references()){txn.incr_rc(o).await?;}}[freed, 0]}};// Free the page(s) at level `cursor.len() + 1` if it isn't// shared with another tree, or if it is the first level shared// with another tree.if cursor.len() + 1 < cursor.first_rc_len() {free[cursor.len() + 1] = freed;}Ok(last_op)}// This function modifies the reference counts of references of the// modified page, which is the page *about to be* modified.//// This function is always called when `m` is an internal node.async fn modify_rc<'a,T: AllocPage + LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreePage<K, V>,>(txn: &mut T,m: &ModifiedPage<'a, K, V, P>,) -> Result<(), T::Error> {let mut c0 = m.c0.clone();let mut c1 = m.c1.clone();let mut left = P::left_child(m.page.as_page(), &c0);while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c0) {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}if left != 0 {txn.incr_rc(left).await?;}left = r;}// The insertions are taken care of in `handle_merge` above,// so we can directly move to the `c1` part of the// modification.if m.skip_first {P::move_next(&mut c1);}// If we are not updating the left child of c1's first// element, increment that left child.if m.l == 0 {if left != 0 {txn.incr_rc(left).await?;}}// If we are not updating the right child of the first// element of `c1`, increment that right child's RC.if let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}// If `m.skip_first`, we have already skipped the entry above,// so this `r` has nothing to do with any update.//// Else, if we aren't skipping, but also aren't updating the// right child of the current entry, also increase the RC.if (m.skip_first || m.r == 0) && r != 0 {txn.incr_rc(r).await?;}}// Finally, increment the RCs of all other elements in `c1`.while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}if r != 0 {txn.incr_rc(r).await?;}}Ok(())}async fn update_root<'a,T: AllocPage + LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreeMutPage<K, V>,>(txn: &mut T,db: &mut Db_<K, V, P>,last_op: ModifiedPage<'a, K, V, P>,k0v0: Option<P::Saved>,is_rc: bool,free: &mut [[u64; 2]; N_CURSORS],) -> Result<(), T::Error> {if let Some(d) = single_child(&last_op) {// If the root had only one element, and the last operation// was a merge, this decreases the depth of the tree.//// We don't do this if the table is empty (i.e. if `d == 0`),// in order to be consistent with put and drop.if d > 0 {if last_op.page.is_dirty() {txn.decr_rc_owned(last_op.page.offset).await?;} else {txn.decr_rc(last_op.page.offset).await?;}db.db = d;} else {// The page becomes empty.let (page, freed) = P::del(txn, last_op.page, last_op.mutable, &last_op.c1, d).await?;free[0][0] = freed;db.db = page.0.offset}return Ok(());}// Else, the last operation `last_op` was relative to the root,// but it hasn't been applied yet. We apply it now:match modify::<_, K, V, P>(txn, last_op).await? {Put::Ok(Ok { page, freed }) => {// Here, the root was simply updated (i.e. some elements// might have been deleted/inserted/updated), so we just// need to update the Db.free[0][0] = freed;db.db = page.0.offset}Put::Split {split_key: k,split_value: v,left: MutPage(CowPage { offset: l, .. }),right: MutPage(CowPage { offset: r, .. }),freed,} => {// Else, the root has split, and we need to allocate a new// page to hold the split key/value and the two sides of// the split.free[0][0] = freed;let mut page = txn.alloc_page().await?;P::init(&mut page);let mut c = P::cursor_before(&page.0);P::move_next(&mut c);let page = if let Put::Ok(p) = P::put(txn, page.0, true, false, &c, k, v, None, l, r).await? {p.page} else {unreachable!()};let split_key_is_k0 = if let Some(ref k0v0) = k0v0 {let (k0, _) = unsafe { P::from_saved(k0v0) };core::ptr::eq(k0, k)} else {false};// If the root isn't shared with another tree, and the// split key is different from the replacement, increment// the RCs of the references contained in the split// key/value.//// The replacement need not be incremented again, since it// was already increment when we took it from the page in// `leaf_delete`.if is_rc && !split_key_is_k0 {for o in k.page_references().chain(v.page_references()) {txn.incr_rc(o).await?;}}// Finally, update the database.db.db = page.0.offset}}Ok(())}fn single_child<K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(m: &ModifiedPage<K, V, P>,) -> Option<u64> {let mut c1 = m.c1.clone();if m.skip_first {P::move_next(&mut c1);}if P::is_empty(&m.c0) && m.ins.is_none() && P::is_empty(&c1) {Some(m.l)} else {None}}/// Apply the modifications to a page. This is exclusively used for/// the root page, because other modifications are applied during a/// merge/rebalance attempts.async fn modify<'a, T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(txn: &mut T,m: ModifiedPage<'a, K, V, P>,) -> Result<super::put::Put<'a, K, V>, T::Error> {if let Some((k, v)) = m.ins {// The following call might actually replace the first element// of `m.c1` if `m.skip_first`.let mut c1 = m.c1.clone();if !m.skip_first && !m.mod_is_left {// This means that the page below just split, since we// have to insert an extra entry on the root page.//// However, the extra entry is to be inserted (by// `P::put`) *before* `c1`'s first element, which is// incorrect since the page that split is the right child// of `c1`'s first element. Therefore, we need to move// `c1` one notch to the right.assert!(m.ins2.is_none());P::move_next(&mut c1);}P::put(txn,m.page,m.mutable,m.skip_first,&c1,k,v,m.ins2,m.l,m.r,).await} else {// If there's no insertion to do, we might still need to// delete elements, or update children.if m.skip_first {let (page, freed) = P::del(txn, m.page, m.mutable, &m.c1, m.l).await?;Ok(Put::Ok(Ok { freed, page }))} else if m.l > 0 {// If the left page of the current entry has changed,// update it.Ok(Put::Ok(P::update_left_child(txn, m.page, m.mutable, &m.c1, m.l,).await?))} else {// If there's no insertion, and `m.l == 0`, we need to// update the right child of the current entry. The// following moves one step to the right and updates the// left child:let mut c1 = m.c1.clone();P::move_next(&mut c1);Ok(Put::Ok(P::update_left_child(txn, m.page, m.mutable, &c1, m.r,).await?))}}}
use super::*;use crate::{CowPage, LoadPage};use core::mem::MaybeUninit;#[derive(Debug)]pub(super) struct PageCursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {pub page: CowPage,pub cursor: P::Cursor,}unsafe impl<K, V, P: BTreePage<K, V>> Send for PageCursor<K, V, P>{}unsafe impl<K, V, P: BTreePage<K, V>> Sync for PageCursor<K, V, P>{}// This is 1 + the maximal depth of a tree.//// Since pages are of size 2^12, there are at most 2^52 addressable// pages (potentially less depending on the platform). Since each page// of a B tree below the root has at least 2 elements (because each// page is at least half-full, and elements are at most 1/4th of a// page), the arity is at least 3, except for the root. Since 3^33 is// the smallest power of 3 larger than 2^52, the maximum depth is 33.pub(crate) const N_CURSORS: usize = 33;#[derive(Debug)]/// A position in a B tree.pub struct Cursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {/// Invariant: the first `len` items are initialised.stack: [core::mem::MaybeUninit<PageCursor<K, V, P>>; N_CURSORS],/// The length of the stack at the position of the first page/// shared with another tree. This definition is a little bit/// convoluted, but allows for efficient comparisons between/// `first_rc_len` and `len` to check whether a page is shared/// with another tree.first_rc_len: usize,/// Number of initialised items on the stack.len: usize,}unsafe impl<K, V, P: BTreePage<K, V>> Send for Cursor<K, V, P>{}unsafe impl<K, V, P: BTreePage<K, V>> Sync for Cursor<K, V, P>{}impl<'a, K: 'a + ?Sized + Storable, V: 'a + ?Sized + Storable, P: BTreePage<K, V>> Cursor<K, V, P> {/// Create a new cursor, initialised to the first entry of the B tree.pub async fn new<T: LoadPage>(txn: &T, db: &Db_<K, V, P>) -> Result<Self, T::Error> {// Looking forward to getting array initialisation stabilised :)let mut stack = [core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),core::mem::MaybeUninit::uninit(),];let page = txn.load_page(db.db).await?;stack[0] = MaybeUninit::new(PageCursor {cursor: P::cursor_before(&page),page,});Ok(Cursor {stack,first_rc_len: N_CURSORS,len: 1,})}pub(super) fn push(&mut self, p: PageCursor<K, V, P>) {self.stack[self.len] = MaybeUninit::new(p);self.len += 1;}pub(super) fn cur(&self) -> &PageCursor<K, V, P> {assert!(self.len > 0);unsafe { &*self.stack[self.len - 1].as_ptr() }}pub(super) fn len(&self) -> usize {self.len}pub(super) fn first_rc_len(&self) -> usize {self.first_rc_len}pub(super) fn pop(&mut self) -> Option<PageCursor<K, V, P>> {if self.len > 0 {self.len -= 1;let result = core::mem::replace(&mut self.stack[self.len], MaybeUninit::uninit());Some(unsafe { result.assume_init() })} else {None}}pub(super) fn current_mut(&mut self) -> &mut PageCursor<K, V, P> {assert!(self.len > 0);unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() }}/// Push the leftmost path starting at page `left_page` onto the/// stack.pub(super) async fn set_first_leaf<T: LoadPage>(&mut self,txn: &T,mut left_page: u64,) -> Result<(), T::Error> {while left_page > 0 {if self.first_rc_len >= N_CURSORS && txn.rc(left_page).await? >= 2 {self.first_rc_len = self.len + 1}let page = txn.load_page(left_page).await?;let curs = P::cursor_first(&page);left_page = P::left_child(page.as_page(), &curs);self.push(PageCursor { page, cursor: curs });}Ok(())}/// Reset the cursor to the first element of the database.pub fn reset<T: LoadPage>(&mut self) {self.len = 1;let init = unsafe { &mut *self.stack[0].as_mut_ptr() };init.cursor = P::cursor_before(&init.page);}// An invariant of cursors, fundamental to understanding the// `next` and `prev` functions below, is that the lower levels (in// the tree, upper levels on the stack) are the left children of// the cursor's position on a page./// Set the cursor to the first position larger than or equal to/// the specified key and value.pub async fn set<T: LoadPage>(&mut self,txn: &'a T,k: &K,v: Option<&V>,) -> Result<Option<(&'a K, &'a V)>, T::Error> {// Set the "cursor stack" by setting a cursor in each page// on a path from the root to the appropriate leaf.// Start from the bottom of the stack, which is also the root// of the tree.self.len = 1;self.first_rc_len = N_CURSORS;let init = unsafe { &mut *self.stack[0].as_mut_ptr() };init.cursor = P::cursor_before(&init.page);let mut last_matching_page = N_CURSORS;let mut last_match = None;while self.len < N_CURSORS {let current = unsafe { &mut *self.stack.get_unchecked_mut(self.len - 1).as_mut_ptr() };if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {self.first_rc_len = self.len}let ref mut cursor = current.cursor;if let Ok((kk, vv, _)) = P::set_cursor(txn, current.page.as_page(), cursor, k, v).await {if v.is_some() {return Ok(Some((kk, vv)));}last_match = Some((kk, vv));last_matching_page = self.len}let next_page = P::left_child(current.page.as_page(), cursor);if next_page > 0 {let page = txn.load_page(next_page).await?;self.push(PageCursor {cursor: P::cursor_before(&page),page,});} else {break;}}if last_matching_page < N_CURSORS {self.len = last_matching_page;Ok(last_match)} else {Ok(None)}}/// Set the cursor after the last element, so that [`Self::next`]/// returns `None`, and [`Self::prev`] returns the last element.pub async fn set_last< T: LoadPage>(&mut self,txn: &'a T,) -> Result<(), T::Error> {self.len = 1;self.first_rc_len = N_CURSORS;let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };current.cursor = P::cursor_after(¤t.page);loop {let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {self.first_rc_len = self.len}let l = P::left_child(current.page.as_page(), ¤t.cursor);if l > 0 {let page = txn.load_page(l).await?;self.push(PageCursor {cursor: P::cursor_after(&page),page,})} else {break;}}Ok(())}/// Return the current position of the cursor.pub fn current<T: LoadPage>(&mut self,_txn: &'a T,) -> Result<Option<(&'a K, &'a V)>, T::Error> {loop {let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };if P::is_init(¤t.cursor) {// The cursor hasn't been set.return Ok(None)} else if let Some((k, v, _)) = P::current(current.page.as_page(), ¤t.cursor){unsafe {return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));}} else if self.len > 1 {self.len -= 1} else {// We're past the last element at the root.return Ok(None);}}}/// Return the current position of the cursor, and move the cursor/// to the next position.pub async fn next< T: LoadPage>(&mut self,txn: &'a T,) -> Result<Option<(&'a K, &'a V)>, T::Error> {loop {let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };if P::is_empty(¤t.cursor) {if self.len > 1 {if self.first_rc_len == self.len {self.first_rc_len = N_CURSORS}self.len -= 1} else {return Ok(None);}} else {let (cur_entry, r) = if let Some((k, v, r)) = P::current(current.page.as_page(), ¤t.cursor) {(Some((k, v)), r)} else {(None, P::right_child(current.page.as_page(), ¤t.cursor))};P::move_next(&mut current.cursor);if r != 0 {let page = txn.load_page(r).await?;self.push(PageCursor {cursor: P::cursor_before(&page),page,});if self.first_rc_len >= N_CURSORS && txn.rc(r).await? >= 2 {self.first_rc_len = self.len}}if let Some((k, v)) = cur_entry {unsafe {return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));}}}}}/// Move the cursor to the previous entry, and return the entry/// that was current before the move. If the cursor is initially/// after all the entries, this moves it back by two steps.pub async fn prev< T: LoadPage>(&mut self,txn: &'a T,) -> Result<Option<(&'a K, &'a V)>, T::Error> {loop {let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };if P::is_init(¤t.cursor) {if self.len > 1 {if self.first_rc_len == self.len {self.first_rc_len = N_CURSORS}self.len -= 1;let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };P::move_prev(&mut current.cursor);} else {return Ok(None);}} else {let cur_entry = P::current(current.page.as_page(), ¤t.cursor);let left = P::left_child(current.page.as_page(), ¤t.cursor);if left != 0 {let page = txn.load_page(left).await?;self.push(PageCursor {cursor: P::cursor_after(&page),page,});if self.first_rc_len >= N_CURSORS && txn.rc(left).await? >= 2 {self.first_rc_len = self.len}} else {// We are at a leaf.P::move_prev(&mut current.cursor);}if let Some((k, v, _)) = cur_entry {unsafe {return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));}}}}}}pub struct Iter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>> {txn: &'a T,cursor: Cursor<K, V, P>,}pub async fn iter<'a, T, K, V, P>(txn: &'a T,db: &super::Db_<K, V, P>,origin: Option<(&K, Option<&V>)>,) -> Result<Iter<'a, T, K, V, P>, T::Error>whereT: LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreePage<K, V>,{let mut cursor = Cursor::new(txn, db).await?;if let Some((k, v)) = origin {cursor.set(txn, k, v).await?;}Ok(Iter { cursor, txn })}impl<'a,T: LoadPage,K: Storable + ?Sized + 'a,V: Storable + ?Sized + 'a,P: BTreePage<K, V> + 'a,> Iter<'a, T, K, V, P>{pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{self.cursor.prev(self.txn).await}}pub struct RevIter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>{txn: &'a T,cursor: Cursor<K, V, P>,}pub async fn rev_iter<'a, T, K, V, P>(txn: &'a T,db: &super::Db_<K, V, P>,origin: Option<(&K, Option<&V>)>,) -> Result<RevIter<'a, T, K, V, P>, T::Error>whereT: LoadPage,K: Storable + ?Sized,V: Storable + ?Sized,P: BTreePage<K, V>,{let mut cursor = Cursor::new(txn, db).await?;if let Some((k, v)) = origin {cursor.set(txn, k, v).await?;} else {cursor.set_last(txn).await?;}Ok(RevIter { cursor, txn })}impl<'a,T: LoadPage,K: Storable + ?Sized + 'a,V: Storable + ?Sized + 'a,P: BTreePage<K, V> + 'a,> RevIter<'a, T, K, V, P>{pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{self.cursor.prev(self.txn).await}}
[package]name = "sanakirja-core-async"version = "0.0.1"edition = "2018"description = "Copy-on-write datastructures, storable on disk (or elsewhere) with a stable format."authors = ["Pierre-Étienne Meunier", "Florent Becker"]license = "MIT/Apache-2.0"documentation = "https://docs.rs/sanakirja-core-async"repository = "https://nest.pijul.com/pijul/sanakirja"include = ["Cargo.toml","src/lib.rs","src","src/btree","src/btree/page_cursor.rs","src/btree/page_unsized","src/btree/page_unsized/alloc.rs","src/btree/page_unsized/rebalance.rs","src/btree/page_unsized/header.rs","src/btree/page_unsized/put.rs","src/btree/page_unsized/cursor.rs","src/btree/put.rs","src/btree/page","src/btree/page/alloc.rs","src/btree/page/rebalance.rs","src/btree/page/put.rs","src/btree/del.rs","src/btree/mod.rs","src/btree/page_unsized.rs","src/btree/cursor.rs","src/btree/page.rs"][features]crc32 = [ "crc32fast" ]std = []ed25519 = [ "ed25519-zebra", "ed25519-zebra/serde" ][dependencies]crc32fast = { version = "1.2", optional = true, default-features = false }uuid = { version = "0.8", optional = true }ed25519-zebra = { version = "2.2", optional = true }async-trait = "0.1"