pijul nest
guest [sign in]

Adding sanakirja-core-async

pmeunier
Nov 11, 2022, 12:24 PM
KK3SBH4P3FBZ3ER344TO6WEK7EX5AFB57UX2G5PKANG3NUF5IQLAC

Dependencies

  • [2] RLVQDUPY Fixing a double-free error introduced in 1.2.13
  • [3] OP6SVMOD Resetting history

Change contents

  • file addition: sanakirja-core-async (d--r------)
    [2.2]
  • file addition: src (d--r------)
    [0.32]
  • file addition: lib.rs (----------)
    [0.49]
    //! This crate defines tools to implement datastructures that can live
    //! in main memory or on-disk, meaning that their natural habitat is
    //! memory-mapped files, but if that environment is threatened, they
    //! might seek refuge in lower-level environments.
    //!
    //! One core building block of this library is the notion of virtual
    //! memory pages, which are allocated and freed by an
    //! externally-provided allocator (see how the `sanakirja` crate does
    //! this). The particular implementation used here is meant to allow a
    //! transactional system with readers reading the structures
    //! concurrently with one writer at a time.
    //!
    //! At the moment, only B trees are implemented, as well as the
    //! following general traits:
    //!
    //! - [`LoadPage`] is a trait used to get a pointer to a page. In the
    //! most basic version, this may just return a pointer to the file,
    //! offset by the requested offset. In more sophisticated versions,
    //! this can be used to encrypt and compress pages.
    //! - [`AllocPage`] allocates and frees pages, because as
    //! datastructures need to be persisted on disk, we can't rely on
    //! Rust's memory management to do it for us. Users of this crate
    //! don't have to worry about this though.
    //!
    //! Moreover, two other traits can be used to store things on pages:
    //! [`Storable`] is a simple trait that all `Sized + Ord` types
    //! without references can readily implement (the [`direct_repr!`]
    //! macro does that). For types containing references to pages
    //! allocated in the database, the comparison function can be
    //! customised. Moreover, these types must supply an iterator over
    //! these references, in order for reference-counting to work properly
    //! when the datastructures referencing these types are forked.
    //!
    //! Dynamically-sized types, or types that need to be represented in a
    //! dynamically-sized way, can use the [`UnsizedStorable`] format.
    use async_trait::async_trait;
    pub mod btree;
    /// There's a hard-coded assumption that pages have 4K bytes. This is
    /// true for normal memory pages on almost all platforms.
    pub const PAGE_SIZE: usize = 4096;
    /// Types that can be stored on disk. This trait may be used in
    /// conjunction with `Sized` in order to determine the on-disk size,
    /// or with [`UnsizedStorable`] when special arrangements are needed.
    #[async_trait(?Send)]
    pub trait Storable: Send + Sync + core::fmt::Debug {
    /// This is required for B trees, not necessarily for other
    /// structures. The default implementation panics.
    fn compare(&self, _b: &Self) -> core::cmp::Ordering {
    unimplemented!()
    }
    /// If this value is an offset to another page at offset `offset`,
    /// return `Some(offset)`. Return `None` else.
    fn page_references(&self) -> Self::PageReferences;
    /// If this value is an offset to another page at offset `offset`,
    /// return `Some(offset)`. Return `None` else.
    async fn drop<T: AllocPage>(&self, txn: &mut T) -> Result<(), T::Error> {
    for p in self.page_references() {
    txn.decr_rc(p).await?;
    }
    Ok(())
    }
    /// An iterator over the offsets to pages contained in this
    /// value. Only values from this crate can generate non-empty
    /// iterators, but combined values (like tuples) must chain the
    /// iterators returned by method `page_offsets`.
    type PageReferences: Iterator<Item = u64> + Send;
    }
    /// A macro to implement [`Storable`] on "plain" types,
    /// i.e. fixed-sized types that are `repr(C)` and don't hold
    /// references.
    #[macro_export]
    macro_rules! direct_repr {
    ($t: ty) => {
    impl Storable for $t {
    type PageReferences = core::iter::Empty<u64>;
    fn page_references(&self) -> Self::PageReferences {
    core::iter::empty()
    }
    fn compare(&self, b: &Self) -> core::cmp::Ordering {
    self.cmp(b)
    }
    }
    impl UnsizedStorable for $t {
    const ALIGN: usize = core::mem::align_of::<$t>();
    /// If `Self::SIZE.is_some()`, this must return the same
    /// value. The default implementation is `Self;:SIZE.unwrap()`.
    fn size(&self) -> usize {
    core::mem::size_of::<Self>()
    }
    /// Read the size from an on-page entry.
    unsafe fn onpage_size(_: *const u8) -> usize {
    core::mem::size_of::<Self>()
    }
    /// Write to a page. Must not overwrite the allocated size, but
    /// this isn't checked (which is why it's unsafe).
    unsafe fn write_to_page(&self, p: *mut u8) {
    core::ptr::copy_nonoverlapping(self, p as *mut Self, 1)
    }
    unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {
    &*(p as *const Self)
    }
    }
    };
    }
    direct_repr!(());
    direct_repr!(u8);
    direct_repr!(i8);
    direct_repr!(u16);
    direct_repr!(i16);
    direct_repr!(u32);
    direct_repr!(i32);
    direct_repr!(u64);
    direct_repr!(i64);
    direct_repr!([u8; 16]);
    #[cfg(feature = "std")]
    extern crate std;
    #[cfg(feature = "std")]
    direct_repr!(std::net::Ipv4Addr);
    #[cfg(feature = "std")]
    direct_repr!(std::net::Ipv6Addr);
    #[cfg(feature = "std")]
    direct_repr!(std::net::IpAddr);
    #[cfg(feature = "std")]
    direct_repr!(std::net::SocketAddr);
    #[cfg(feature = "std")]
    direct_repr!(std::time::SystemTime);
    #[cfg(feature = "std")]
    direct_repr!(std::time::Duration);
    #[cfg(feature = "uuid")]
    direct_repr!(uuid::Uuid);
    #[cfg(feature = "ed25519")]
    direct_repr!(ed25519_zebra::VerificationKeyBytes);
    /// Types that can be stored on disk.
    pub trait UnsizedStorable: Storable {
    const ALIGN: usize;
    /// If `Self::SIZE.is_some()`, this must return the same
    /// value. The default implementation is `Self;:SIZE.unwrap()`.
    fn size(&self) -> usize;
    /// Read the size from an on-page entry. If `Self::SIZE.is_some()`
    /// this must be the same value.
    unsafe fn onpage_size(_: *const u8) -> usize;
    /// Write to a page. Must not overwrite the allocated size, but
    /// this isn't checked (which is why it's unsafe).
    unsafe fn write_to_page(&self, p: *mut u8);
    unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self;
    }
    impl Storable for [u8] {
    type PageReferences = core::iter::Empty<u64>;
    fn page_references(&self) -> Self::PageReferences {
    core::iter::empty()
    }
    fn compare(&self, b: &Self) -> core::cmp::Ordering {
    self.cmp(b)
    }
    }
    impl UnsizedStorable for [u8] {
    const ALIGN: usize = 2;
    fn size(&self) -> usize {
    2 + self.len()
    }
    unsafe fn from_raw_ptr<'a>(p: *const u8) -> &'a Self {
    let len = u16::from_le(*(p as *const u16));
    assert_ne!(len, 0);
    assert_eq!(len & 0xf000, 0);
    core::slice::from_raw_parts(p.add(2), len as usize)
    }
    unsafe fn onpage_size(p: *const u8) -> usize {
    let len = u16::from_le(*(p as *const u16));
    2 + len as usize
    }
    unsafe fn write_to_page(&self, p: *mut u8) {
    assert!(self.len() <= 510);
    *(p as *mut u16) = (self.len() as u16).to_le();
    core::ptr::copy_nonoverlapping(self.as_ptr(), p.add(2), self.len())
    }
    }
    unsafe fn read<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    k: *const u8,
    ) -> (*const u8, *const u8) {
    let s = K::onpage_size(k);
    let v = k.add(s);
    let al = v.align_offset(V::ALIGN);
    let v = v.add(al);
    (k, v)
    }
    unsafe fn entry_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    k: *const u8,
    ) -> usize {
    assert_eq!(k.align_offset(K::ALIGN), 0);
    let ks = K::onpage_size(k);
    // next multiple of va, assuming va is a power of 2.
    let v_off = (ks + V::ALIGN - 1) & !(V::ALIGN - 1);
    let v_ptr = k.add(v_off);
    let vs = V::onpage_size(v_ptr);
    let ka = K::ALIGN.max(V::ALIGN);
    let size = v_off + vs;
    (size + ka - 1) & !(ka - 1)
    }
    /// Representation of a mutable or shared page. This is an owned page
    /// (like `Vec` in Rust's std), but we do not know whether we can
    /// mutate it or not.
    ///
    /// The least-significant bit of the first byte of each page is 1 if
    /// and only if the page was allocated by the current transaction (and
    /// hence isn't visible to any other transaction, meaning we can write
    /// on it).
    #[derive(Debug)]
    #[repr(C)]
    pub struct CowPage {
    pub data: *mut u8,
    pub offset: u64,
    }
    /// Representation of a borrowed, or immutable page, like a slice in
    /// Rust.
    #[derive(Debug, Clone, Copy)]
    #[repr(C)]
    pub struct Page<'a> {
    pub data: &'a [u8; PAGE_SIZE],
    pub offset: u64,
    }
    impl CowPage {
    /// Borrows the page.
    pub fn as_page(&self) -> Page {
    Page {
    data: unsafe { &*(self.data as *const [u8; PAGE_SIZE]) },
    offset: self.offset,
    }
    }
    #[cfg(feature = "crc32")]
    pub fn crc(&self, hasher: &crc32fast::Hasher) -> u32 {
    let mut hasher = hasher.clone();
    hasher.reset();
    // Hash the beginning and the end of the page (i.e. remove
    // the CRC).
    unsafe {
    // Remove the dirty bit.
    let x = [(*self.data) & 0xfe];
    hasher.update(&x[..]);
    hasher.update(core::slice::from_raw_parts(self.data.add(1), 3));
    hasher.update(core::slice::from_raw_parts(
    self.data.add(8),
    PAGE_SIZE - 8,
    ));
    }
    hasher.finalize()
    }
    #[cfg(feature = "crc32")]
    pub fn crc_check(&self, hasher: &crc32fast::Hasher) -> bool {
    let crc = unsafe { u32::from_le(*(self.data as *const u32).add(1)) };
    self.crc(hasher) == crc
    }
    }
    /// An owned page on which we can write. This is just a wrapper around
    /// `CowPage` to avoid checking the "dirty" bit at runtime.
    #[derive(Debug)]
    pub struct MutPage(pub CowPage);
    #[cfg(not(feature = "crc32"))]
    pub fn clear_dirty(data: &mut [u8]) {
    data[0] &= 0xfe
    }
    #[cfg(feature = "crc32")]
    pub fn clear_dirty(data: &mut [u8], hasher: &crc32fast::Hasher) {
    unsafe {
    data[0] &= 0xfe;
    let crc = (self.0.data.as_mut_ptr() as *mut u32).add(1);
    *crc = self.0.crc(hasher)
    }
    }
    impl MutPage {
    #[cfg(not(feature = "crc32"))]
    pub fn clear_dirty(&mut self) {
    unsafe { *self.0.data &= 0xfe }
    }
    #[cfg(feature = "crc32")]
    pub fn clear_dirty(&mut self, hasher: &crc32fast::Hasher) {
    unsafe {
    *self.0.data &= 0xfe;
    let crc = (self.0.data as *mut u32).add(1);
    *crc = self.0.crc(hasher)
    }
    }
    }
    unsafe impl Sync for CowPage {}
    unsafe impl Send for CowPage {}
    impl CowPage {
    /// Checks the dirty bit of a page.
    pub fn is_dirty(&self) -> bool {
    unsafe { (*self.data) & 1 != 0 }
    }
    }
    /// Trait for loading a page.
    #[async_trait(?Send)]
    pub trait LoadPage: Send + Sync {
    type Error;
    /// Loading a page.
    async fn load_page(&self, off: u64) -> Result<CowPage, Self::Error>;
    /// Reference-counting. Since reference-counts are designed to be
    /// storable into B trees by external allocators, pages referenced
    /// once aren't stored, and hence are indistinguishable from pages
    /// that are never referenced. The default implementation returns
    /// 0.
    ///
    /// This has the extra benefit of requiring less disk space, and
    /// isn't more unsafe than storing the reference count, since we
    /// aren't supposed to hold a reference to a page with "logical
    /// RC" 0, so storing "1" for that page would be redundant anyway.
    async fn rc(&self, _off: u64) -> Result<u64, Self::Error> {
    Ok(0)
    }
    }
    /// Trait for allocating and freeing pages.
    #[async_trait(?Send)]
    pub trait AllocPage: LoadPage {
    /// Allocate a new page.
    async fn alloc_page(&mut self) -> Result<MutPage, Self::Error>;
    /// Increment the page's reference count.
    async fn incr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;
    /// Decrement the page's reference count, assuming the page was
    /// first allocated by another transaction. If the RC reaches 0,
    /// free the page. Must return the new RC (0 if freed).
    async fn decr_rc(&mut self, off: u64) -> Result<usize, Self::Error>;
    /// Same as [`Self::decr_rc`], but for pages allocated by the current
    /// transaction. This is an important distinction, as pages
    /// allocated by the current transaction can be reused immediately
    /// after being freed.
    async fn decr_rc_owned(&mut self, off: u64) -> Result<usize, Self::Error>;
    }
  • file addition: btree (d--r------)
    [0.49]
  • file addition: put_async.rs (----------)
    [0.12617]
    //! Insertions into a B tree.
    use super::cursor::*;
    use super::*;
    /// The representation of the update to a page.
    #[derive(Debug)]
    pub struct Ok {
    /// The new page, possibly the same as the previous version.
    pub page: MutPage,
    /// The freed page, or 0 if no page was freed.
    pub freed: u64,
    }
    /// The result of an insertion, i.e. either an update or a split.
    #[derive(Debug)]
    pub enum Put<'a, K: ?Sized, V: ?Sized> {
    Ok(Ok),
    Split {
    split_key: &'a K,
    split_value: &'a V,
    left: MutPage,
    right: MutPage,
    freed: u64,
    },
    }
    /// Insert an entry into a database, returning `false` if and only if
    /// the exact same entry (key *and* value) was already in the
    /// database.
    pub async fn put_async<
    T: AsyncAllocPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    key: &K,
    value: &V,
    ) -> Result<bool, T::Error> {
    // Set the cursor to the first element larger than or equal to
    // `(key, value)`. We will insert at that position (where "insert"
    // is meant in the sense of Rust's `Vec::insert`.
    let mut cursor = Cursor::async_new(txn, db).await?;
    if cursor.async_set(txn, key, Some(value)).await?.is_some() {
    // If `(key, value)` already exists in the tree, return.
    return Ok(false);
    }
    // Else, we are at a leaf, since cursors that don't find an
    // element go all the way to the leaves.
    //
    // We put on the leaf page, resulting in a `Put`, i.e. either
    // `Put::Ok` or `Put::Split`.
    let p = cursor.len(); // Save the position of the leaf cursor.
    let is_owned = p < cursor.first_rc_len();
    // Insert the key and value at the leaf, i.e. pop the top level of
    // the stack (the leaf) and insert in there.
    let cur = cursor.pop().unwrap();
    let put = P::put(
    txn,
    cur.page,
    is_owned,
    false,
    &cur.cursor,
    key,
    value,
    None,
    0,
    0,
    )?;
    // In both cases (`Ok` and `Split`), we need to walk up the tree
    // and update each node.
    // Moreover, since the middle elements of the splits may be on
    // pages that must be freed at the end of this call, we collect
    // them in the `free` array below, and free them only at the end
    // of this function.
    //
    // If we didn't hold on to these pages, they could be reallocated
    // in subsequent splits, and the middle element could be
    // lost. This is important when the middle element climbs up more
    // than one level (i.e. the middle element of the split of a leaf
    // is also the middle element of the split of the leaf's parent,
    // etc).
    let mut free = [0; N_CURSORS];
    db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;
    for f in &free[..p] {
    if *f & 1 != 0 {
    txn.decr_rc_owned((*f) ^ 1)?;
    } else if *f > 0 {
    txn.decr_rc(*f)?;
    }
    }
    Ok(true)
    }
    async fn put_cascade<
    'a,
    T: AsyncAllocPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    mut put: Put<'a, K, V>,
    free: &mut [u64; N_CURSORS],
    ) -> Result<MutPage, T::Error> {
    loop {
    match put {
    Put::Split {
    split_key,
    split_value,
    left,
    right,
    freed,
    } => {
    // Here, we are copying all children of the freed
    // page, except possibly the last freed page (which is
    // a child of the current node, if we aren't at a
    // leaf). This includes the split key/value, also
    // incremented in the following call:
    incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
    // Then, insert the split key/value in the relevant page:
    let is_owned = cursor.len() < cursor.first_rc_len();
    if let Some(cur) = cursor.pop() {
    // In this case, there's a page above the page
    // that just split (since we can pop the stack),
    // so the page that just split isn't the root (but
    // `cur` might be).
    put = P::put(
    txn,
    cur.page,
    is_owned,
    false,
    &cur.cursor,
    split_key,
    split_value,
    None,
    left.0.offset,
    right.0.offset,
    )?;
    } else {
    // No page above the split, so the root has just
    // split. Insert the split key/value into a new
    // page above the entire tree.
    let mut p = txn.alloc_page()?;
    let cursor = P::cursor_first(&p.0);
    P::init(&mut p);
    if let Put::Ok(p) = P::put(
    txn,
    p.0,
    true,
    false,
    &cursor,
    split_key,
    split_value,
    None,
    left.0.offset,
    right.0.offset,
    )? {
    // Return the new root.
    return Ok(p.page);
    } else {
    unreachable!()
    }
    }
    }
    Put::Ok(Ok { page, freed }) => {
    // Same as above: increment the relevant reference
    // counters.
    incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
    // And update the left child of the current cursor,
    // since the main invariant of cursors is that we're
    // always visiting the left child (if we're visiting
    // the last child of a page, the cursor is set to the
    // position strictly after the entries).
    let is_owned = cursor.len() < cursor.first_rc_len();
    if let Some(curs) = cursor.pop() {
    // If we aren't at the root, just update the child.
    put = Put::Ok(P::update_left_child(
    txn,
    curs.page,
    is_owned,
    &curs.cursor,
    page.0.offset,
    )?)
    } else {
    // If we are at the root, `page` is the updated root.
    return Ok(page);
    }
    }
    }
    }
    }
    /// This function does all the memory management for `put`.
    fn incr_descendants<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    free: &mut [u64; N_CURSORS],
    mut freed: u64,
    ) -> Result<(), T::Error> {
    // The freed page is on the page below.
    if cursor.len() < cursor.first_rc_len() {
    // If a page has split or was immutable (allocated by a
    // previous transaction) and has been updated, we need to free
    // the old page.
    free[cursor.len()] = freed;
    } else {
    // This means that the new version of the page (either split
    // or not) contains references to all the children of the
    // freed page, except the last freed page (because the new
    // version of that page isn't shared).
    let cur = cursor.cur();
    // Start a cursor at the leftmost position and increase the
    // leftmost child page's RC (if we aren't at a leaf, and if
    // that child isn't the last freed page).
    let mut c = P::cursor_first(&cur.page);
    let left = P::left_child(cur.page.as_page(), &c);
    if left != 0 {
    if left != (freed & !1) {
    txn.incr_rc(left)?;
    } else if cursor.len() == cursor.first_rc_len() {
    freed = 0
    }
    }
    // Then, for each entry of the page, increase the RCs of
    // references contained in the keys and values, and increase
    // the RC of the right child.
    while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o)?;
    }
    if r != 0 {
    if r != (freed & !1) {
    txn.incr_rc(r)?;
    } else if cursor.len() == cursor.first_rc_len() {
    freed = 0
    }
    }
    }
    // Else, the "freed" page is shared with another tree, and
    // hence we just need to decrement its RC.
    if freed > 0 && cursor.len() == cursor.first_rc_len() {
    free[cursor.len()] = freed;
    }
    }
    Ok(())
    }
  • file addition: put.rs (----------)
    [0.12617]
    //! Insertions into a B tree.
    use super::cursor::*;
    use super::*;
    /// The representation of the update to a page.
    #[derive(Debug)]
    pub struct Ok {
    /// The new page, possibly the same as the previous version.
    pub page: MutPage,
    /// The freed page, or 0 if no page was freed.
    pub freed: u64,
    }
    /// The result of an insertion, i.e. either an update or a split.
    #[derive(Debug)]
    pub enum Put<'a, K: ?Sized, V: ?Sized> {
    Ok(Ok),
    Split {
    split_key: &'a K,
    split_value: &'a V,
    left: MutPage,
    right: MutPage,
    freed: u64,
    },
    }
    /// Insert an entry into a database, returning `false` if and only if
    /// the exact same entry (key *and* value) was already in the
    /// database.
    pub async fn put<
    T: AllocPage,
    K: Storable + ?Sized + Sync,
    V: Storable + ?Sized + Sync,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    key: &K,
    value: &V,
    ) -> Result<bool, T::Error> {
    // Set the cursor to the first element larger than or equal to
    // `(key, value)`. We will insert at that position (where "insert"
    // is meant in the sense of Rust's `Vec::insert`.
    let mut cursor = Cursor::new(txn, db).await?;
    if cursor.set(txn, key, Some(value)).await?.is_some() {
    // If `(key, value)` already exists in the tree, return.
    return Ok(false);
    }
    // Else, we are at a leaf, since cursors that don't find an
    // element go all the way to the leaves.
    //
    // We put on the leaf page, resulting in a `Put`, i.e. either
    // `Put::Ok` or `Put::Split`.
    let p = cursor.len(); // Save the position of the leaf cursor.
    let is_owned = p < cursor.first_rc_len();
    // Insert the key and value at the leaf, i.e. pop the top level of
    // the stack (the leaf) and insert in there.
    let cur = cursor.pop().unwrap();
    let put = P::put(
    txn,
    cur.page,
    is_owned,
    false,
    &cur.cursor,
    key,
    value,
    None,
    0,
    0,
    ).await?;
    // In both cases (`Ok` and `Split`), we need to walk up the tree
    // and update each node.
    // Moreover, since the middle elements of the splits may be on
    // pages that must be freed at the end of this call, we collect
    // them in the `free` array below, and free them only at the end
    // of this function.
    //
    // If we didn't hold on to these pages, they could be reallocated
    // in subsequent splits, and the middle element could be
    // lost. This is important when the middle element climbs up more
    // than one level (i.e. the middle element of the split of a leaf
    // is also the middle element of the split of the leaf's parent,
    // etc).
    let mut free = [0; N_CURSORS];
    db.db = put_cascade(txn, &mut cursor, put, &mut free).await?.0.offset;
    for f in &free[..p] {
    if *f & 1 != 0 {
    txn.decr_rc_owned((*f) ^ 1).await?;
    } else if *f > 0 {
    txn.decr_rc(*f).await?;
    }
    }
    Ok(true)
    }
    async fn put_cascade<
    'a,
    T: AllocPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    mut put: Put<'a, K, V>,
    free: &mut [u64; N_CURSORS],
    ) -> Result<MutPage, T::Error> {
    loop {
    match put {
    Put::Split {
    split_key,
    split_value,
    left,
    right,
    freed,
    } => {
    // Here, we are copying all children of the freed
    // page, except possibly the last freed page (which is
    // a child of the current node, if we aren't at a
    // leaf). This includes the split key/value, also
    // incremented in the following call:
    incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
    // Then, insert the split key/value in the relevant page:
    let is_owned = cursor.len() < cursor.first_rc_len();
    if let Some(cur) = cursor.pop() {
    // In this case, there's a page above the page
    // that just split (since we can pop the stack),
    // so the page that just split isn't the root (but
    // `cur` might be).
    put = P::put(
    txn,
    cur.page,
    is_owned,
    false,
    &cur.cursor,
    split_key,
    split_value,
    None,
    left.0.offset,
    right.0.offset,
    ).await?;
    } else {
    // No page above the split, so the root has just
    // split. Insert the split key/value into a new
    // page above the entire tree.
    let mut p = txn.alloc_page().await?;
    let cursor = P::cursor_first(&p.0);
    P::init(&mut p);
    if let Put::Ok(p) = P::put(
    txn,
    p.0,
    true,
    false,
    &cursor,
    split_key,
    split_value,
    None,
    left.0.offset,
    right.0.offset,
    ).await? {
    // Return the new root.
    return Ok(p.page);
    } else {
    unreachable!()
    }
    }
    }
    Put::Ok(Ok { page, freed }) => {
    // Same as above: increment the relevant reference
    // counters.
    incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
    // And update the left child of the current cursor,
    // since the main invariant of cursors is that we're
    // always visiting the left child (if we're visiting
    // the last child of a page, the cursor is set to the
    // position strictly after the entries).
    let is_owned = cursor.len() < cursor.first_rc_len();
    if let Some(curs) = cursor.pop() {
    // If we aren't at the root, just update the child.
    put = Put::Ok(P::update_left_child(
    txn,
    curs.page,
    is_owned,
    &curs.cursor,
    page.0.offset,
    ).await?)
    } else {
    // If we are at the root, `page` is the updated root.
    return Ok(page);
    }
    }
    }
    }
    }
    /// This function does all the memory management for `put`.
    async fn incr_descendants<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    free: &mut [u64; N_CURSORS],
    mut freed: u64,
    ) -> Result<(), T::Error> {
    // The freed page is on the page below.
    if cursor.len() < cursor.first_rc_len() {
    // If a page has split or was immutable (allocated by a
    // previous transaction) and has been updated, we need to free
    // the old page.
    free[cursor.len()] = freed;
    } else {
    // This means that the new version of the page (either split
    // or not) contains references to all the children of the
    // freed page, except the last freed page (because the new
    // version of that page isn't shared).
    let cur = cursor.cur();
    // Start a cursor at the leftmost position and increase the
    // leftmost child page's RC (if we aren't at a leaf, and if
    // that child isn't the last freed page).
    let mut c = P::cursor_first(&cur.page);
    let left = P::left_child(cur.page.as_page(), &c);
    if left != 0 {
    if left != (freed & !1) {
    txn.incr_rc(left).await?;
    } else if cursor.len() == cursor.first_rc_len() {
    freed = 0
    }
    }
    // Then, for each entry of the page, increase the RCs of
    // references contained in the keys and values, and increase
    // the RC of the right child.
    while let Some((k, v, r)) = P::next(cur.page.as_page(), &mut c) {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    if r != 0 {
    if r != (freed & !1) {
    txn.incr_rc(r).await?;
    } else if cursor.len() == cursor.first_rc_len() {
    freed = 0
    }
    }
    }
    // Else, the "freed" page is shared with another tree, and
    // hence we just need to decrement its RC.
    if freed > 0 && cursor.len() == cursor.first_rc_len() {
    free[cursor.len()] = freed;
    }
    }
    Ok(())
    }
  • file addition: page_unsized.rs (----------)
    [0.12617]
    //! Implementation of B tree pages for Unsized types, or types with an
    //! dynamically-sized representation (for example enums with widely
    //! different sizes).
    //!
    //! This module follows the same organisation as the sized
    //! implementation, and contains types shared between the two
    //! implementations.
    //!
    //! The types that can be used with this implementation must implement
    //! the [`UnsizedStorable`] trait, which essentially replaces the
    //! [`core::mem`] functions for determining the size and alignment of
    //! values.
    //!
    //! One key difference is the implementation of leaves (internal nodes
    //! have the same format): indeed, in this implementation, leaves have
    //! almost the same format as internal nodes, except that their
    //! offsets are written on the page as little-endian-encoded [`u16`]
    //! (with only the 12 LSBs used, i.e. 4 bits unused).
    use super::*;
    use crate::btree::del::*;
    use crate::btree::put::*;
    use core::cmp::Ordering;
    // The header is the same as for the sized implementation, so we share
    // it here.
    pub(super) mod header;
    // Like in the sized implementation, we have the same three submodules.
    mod alloc;
    // This is a common module with the sized implementation.
    pub(super) mod cursor;
    mod put;
    pub(super) mod rebalance;
    use alloc::*;
    use cursor::*;
    use header::*;
    use rebalance::*;
    #[derive(Debug)]
    pub struct Page<K: ?Sized, V: ?Sized> {
    k: core::marker::PhantomData<K>,
    v: core::marker::PhantomData<V>,
    }
    // Many of these functions are the same as in the Sized implementations.
    #[async_trait(?Send)]
    impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
    for Page<K, V>
    {
    fn is_empty(c: &Self::Cursor) -> bool {
    c.cur >= c.total as isize
    }
    fn is_init(c: &Self::Cursor) -> bool {
    c.cur < 0
    }
    type Cursor = PageCursor;
    fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
    PageCursor::new(p, -1)
    }
    fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
    PageCursor::after(p)
    }
    // Split a cursor, returning two cursors `(a, b)` where b is the
    // same as `c`, and `a` is a cursor running from the first element
    // of the page to `c` (excluding `c`).
    fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
    (
    PageCursor {
    cur: 0,
    total: c.cur.max(0) as usize,
    is_leaf: c.is_leaf,
    },
    *c,
    )
    }
    fn move_next(c: &mut Self::Cursor) -> bool {
    if c.cur < c.total as isize {
    c.cur += 1;
    true
    } else {
    false
    }
    }
    fn move_prev(c: &mut Self::Cursor) -> bool {
    if c.cur > 0 {
    c.cur -= 1;
    true
    } else {
    c.cur = -1;
    false
    }
    }
    // This function is the same as the sized implementation for
    // internal nodes, since the only difference between leaves and
    // internal nodes in this implementation is the size of offsets (2
    // bytes for leaves, 8 bytes for internal nodes).
    fn current<'a>(
    page: crate::Page<'a>,
    c: &Self::Cursor,
    ) -> Option<(&'a K, &'a V, u64)> {
    if c.cur < 0 || c.cur >= c.total as isize {
    None
    } else if c.is_leaf {
    unsafe {
    let off =
    u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));
    let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
    Some((
    K::from_raw_ptr(k as *const u8),
    V::from_raw_ptr(v as *const u8),
    0,
    ))
    }
    } else {
    unsafe {
    let off =
    u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));
    let (k, v) = read::<K, V>(page.data.as_ptr().add((off & 0xfff) as usize));
    Some((
    K::from_raw_ptr(k as *const u8),
    V::from_raw_ptr(v as *const u8),
    off & !0xfff,
    ))
    }
    }
    }
    // These methods are the same as in the sized implementation.
    fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
    assert!(c.cur >= 0);
    if c.is_leaf {
    0
    } else {
    assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
    let off =
    unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8 - 8) as *const u64) };
    u64::from_le(off) & !0xfff
    }
    }
    fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
    assert!(c.cur < c.total as isize);
    if c.is_leaf {
    0
    } else {
    assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);
    let off = unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
    u64::from_le(off) & !0xfff
    }
    }
    async fn set_cursor<'a, 'b, T: LoadPage>(
    _txn: &'a T,
    page: crate::Page<'b>,
    c: &mut PageCursor,
    k0: &K,
    v0: Option<&V>,
    ) -> Result<(&'a K, &'a V, u64), usize> {
    unsafe {
    // `lookup` has the same semantic as
    // `core::slice::binary_search`, i.e. it returns either
    // `Ok(n)`, where `n` is the position where `(k0, v0)` was
    // found, or `Err(n)` where `n` is the position where
    // `(k0, v0)` can be inserted to preserve the order.
    match lookup(page, c, k0, v0).await {
    Ok(n) => {
    c.cur = n as isize;
    if c.is_leaf {
    let off =
    u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
    let (k, v) = read::<K, V>(page.data.as_ptr().add(off as usize));
    Ok((K::from_raw_ptr(k), V::from_raw_ptr(v), 0))
    } else {
    let off =
    u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
    let (k, v) =
    read::<K, V>(page.data.as_ptr().add(off as usize & 0xfff));
    Ok((
    K::from_raw_ptr(k),
    V::from_raw_ptr(v),
    off & !0xfff,
    ))
    }
    }
    Err(n) => {
    c.cur = n as isize;
    Err(n)
    }
    }
    }
    }
    }
    // There quite some duplicated code in the following function, because
    // we're forming a slice of offsets, and the using the core library's
    // `binary_search_by` method on slices.
    async unsafe fn lookup<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page<'a>,
    c: &mut PageCursor,
    k0: &K,
    v0: Option<&V>,
    ) -> Result<usize, usize> {
    let hdr = header(page);
    c.total = hdr.n() as usize;
    c.is_leaf = hdr.is_leaf();
    if c.is_leaf {
    lookup_leaf(page, k0, v0, hdr)
    } else {
    lookup_internal(page, k0, v0, hdr)
    }
    }
    unsafe fn lookup_internal<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page<'a>,
    k0: &K,
    v0: Option<&V>,
    hdr: &header::Header,
    ) -> Result<usize, usize> {
    let s = core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const u64,
    hdr.n() as usize,
    );
    if let Some(v0) = v0 {
    s.binary_search_by(|&off| {
    let off = u64::from_le(off) & 0xfff;
    let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));
    let k = K::from_raw_ptr(k);
    match k.compare(k0) {
    Ordering::Equal => {
    let v = V::from_raw_ptr(v);
    v.compare(v0)
    }
    cmp => cmp,
    }
    })
    } else {
    match s.binary_search_by(|&off| {
    let off = u64::from_le(off) & 0xfff;
    let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize & 0xfff));
    let k = K::from_raw_ptr(k);
    k.compare(k0)
    }) {
    Err(i) => Err(i),
    Ok(mut i) => {
    // Rewind if there are multiple matching keys.
    while i > 0 {
    let off = u64::from_le(s[i-1]) & 0xfff;
    let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
    let k = K::from_raw_ptr(k);
    if let Ordering::Equal = k.compare(k0) {
    i -= 1
    } else {
    break
    }
    }
    Ok(i)
    }
    }
    }
    }
    unsafe fn lookup_leaf<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page<'a>,
    k0: &K,
    v0: Option<&V>,
    hdr: &header::Header,
    ) -> Result<usize, usize> {
    let s = core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const u16,
    hdr.n() as usize,
    );
    if let Some(v0) = v0 {
    s.binary_search_by(|&off| {
    let off = u16::from_le(off);
    let (k, v) = read::<K, V>(page.data.as_ptr().offset(off as isize));
    let k = K::from_raw_ptr(k as *const u8);
    match k.compare(k0) {
    Ordering::Equal => {
    let v = V::from_raw_ptr(v as *const u8);
    v.compare(v0)
    }
    cmp => cmp,
    }
    })
    } else {
    match s.binary_search_by(|&off| {
    let off = u16::from_le(off);
    let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
    let k = K::from_raw_ptr(k);
    k.compare(k0)
    }) {
    Err(e) => Err(e),
    Ok(mut i) => {
    // Rewind if there are multiple matching keys.
    while i > 0 {
    let off = u16::from_le(s[i-1]);
    let (k, _) = read::<K, V>(page.data.as_ptr().offset(off as isize));
    let k = K::from_raw_ptr(k);
    if let Ordering::Equal = k.compare(k0){
    i -= 1
    } else {
    break
    }
    }
    Ok(i)
    }
    }
    }
    }
    #[async_trait(?Send)]
    impl<
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    > super::BTreeMutPage<K, V> for Page<K, V>
    {
    // The init function is straightforward.
    fn init(page: &mut MutPage) {
    let h = header_mut(page);
    h.init();
    }
    // Deletions in internal nodes of a B tree need to replace the
    // deleted value with a value from a leaf.
    //
    // In this implementation of pages, we never actually wipe any
    // data from pages, we're even only appending data, or cloning the
    // pages to compact them. Therefore, raw pointers to leaves are
    // always valid for as long as the page isn't freed, which can
    // only happen at the very last step of an insertion or deletion.
    type Saved = (*const K, *const V);
    fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
    (k as *const K, v as *const V)
    }
    unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
    (&*s.0, &*s.1)
    }
    // As in the sized implementation, `put` is split into its own submodule.
    async fn put<'a, T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    c: &PageCursor,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l: u64,
    r: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    debug_assert!(c.cur >= 0);
    debug_assert!(k1v1.is_none() || replace);
    if r == 0 {
    put::put::<_, _, _, Leaf>(
    txn,
    page,
    mutable,
    replace,
    c.cur as usize,
    k0,
    v0,
    k1v1,
    0,
    0,
    ).await
    } else {
    put::put::<_, _, _, Internal>(
    txn,
    page,
    mutable,
    replace,
    c.cur as usize,
    k0,
    v0,
    k1v1,
    l,
    r,
    ).await
    }
    }
    unsafe fn put_mut(
    page: &mut MutPage,
    c: &mut Self::Cursor,
    k0: &K,
    v0: &V,
    r: u64,
    ) {
    let mut n = c.cur;
    if r == 0 {
    Leaf::alloc_write(page, k0, v0, 0, r, &mut n);
    } else {
    Internal::alloc_write(page, k0, v0, 0, r, &mut n);
    }
    c.total += 1;
    }
    unsafe fn set_left_child(
    page: &mut MutPage,
    c: &Self::Cursor,
    l: u64
    ) {
    let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    // Update the left child of the entry pointed to by cursor `c`.
    async fn update_left_child<T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    c: &Self::Cursor,
    l: u64,
    ) -> Result<crate::btree::put::Ok, T::Error> {
    assert!(!c.is_leaf);
    let freed;
    let page = if mutable && page.is_dirty() {
    // If the page is dirty (allocated by this transaction)
    // and isn't shared, just make it mutable.
    freed = 0;
    MutPage(page)
    } else {
    // Else, clone the page:
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    // Copy the left child
    let l = header(page.as_page()).left_page() & !0xfff;
    let hdr = header_mut(&mut new);
    hdr.set_left_page(l);
    // Copy all the entries
    let s = Internal::offset_slice(page.as_page());
    clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
    // Mark the old version for freeing.
    freed = page.offset | if page.is_dirty() { 1 } else { 0 };
    new
    };
    // Finally, update the left children of the cursor. We know
    // that all valid positions of a cursor except the leftmost
    // one (-1) have a left child.
    assert!(c.cur >= 0);
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    Ok(Ok { page, freed })
    }
    // Here is how deletions work: if the page is dirty and mutable,
    // we "unlink" the value by moving the end of the offset array to
    // the left by one offset (2 bytes in leaves, 8 bytes in internal
    // nodes).
    async fn del<T: AllocPage>(
    txn: &mut T,
    page: crate::CowPage,
    mutable: bool,
    c: &PageCursor,
    l: u64,
    ) -> Result<(MutPage, u64), T::Error> {
    // Check that the cursor is at a valid position for a deletion.
    debug_assert!(c.cur >= 0 && c.cur < c.total as isize);
    if mutable && page.is_dirty() {
    let p = page.data;
    let mut page = MutPage(page);
    let hdr = header_mut(&mut page);
    let n = hdr.n();
    if c.is_leaf {
    unsafe {
    let ptr = p.add(HDR + c.cur as usize * 2) as *mut u16;
    // Get the offset in the page of the key/value data.
    let off = u16::from_le(*ptr);
    assert_eq!(off & 0xf000, 0);
    // Erase the offset by shifting the last (`n -
    // c.cur - 1`) offsets. The reasoning against
    // "off-by-one errors" is that if the cursor is
    // positioned on the first element (`c.cur == 0`),
    // there are `n-1` elements to shift.
    core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
    // Remove the size of the actualy key/value, plus
    // 2 bytes (the offset).
    hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));
    }
    } else {
    unsafe {
    let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
    // Offset to the key/value data.
    let off = u64::from_le(*ptr);
    // Shift the offsets like in the leaf case above.
    core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
    if l > 0 {
    // In an internal node, we may have to update
    // the left child of the current
    // position. After the move, the current
    // offset is at `ptr`, so we need to find the
    // offset one step to the left.
    let p = ptr.offset(-1);
    *p = (l | (u64::from_le(*p) & 0xfff)).to_le();
    }
    // Remove the size of the key/value, plus 8 bytes
    // (the offset).
    hdr.decr(8 + entry_size::<K, V>(p.add((off & 0xfff) as usize)));
    }
    };
    hdr.set_n(n - 1);
    // Return the page, and 0 for "nothing was freed".
    Ok((page, 0))
    } else {
    // If the page cannot be mutated, we allocate a new page and clone.
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    if c.is_leaf {
    // In a leaf, this is just a matter of getting the
    // offset slice, removing the current position and
    // cloning.
    let s = Leaf::offset_slice(page.as_page());
    let (s0, s1) = s.split_at(c.cur as usize);
    let (_, s1) = s1.split_at(1);
    let mut n = 0;
    clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
    clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
    } else {
    // In an internal node, things are a bit trickier,
    // since we might need to update the left child.
    //
    // First, clone the leftmost child of the page.
    let hdr = header(page.as_page());
    let left = hdr.left_page() & !0xfff;
    unsafe {
    *(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();
    }
    // Then, clone the first half of the page.
    let s = Internal::offset_slice(page.as_page());
    let (s0, s1) = s.split_at(c.cur as usize);
    let (_, s1) = s1.split_at(1);
    let mut n = 0;
    clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
    // Update the left child, which was written by the
    // call to `clone` as the right child of the last
    // entry in `s0`.
    if l > 0 {
    unsafe {
    let off = (new.0.data.add(HDR) as *mut u64).offset(n - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    // Then, clone the second half of the page.
    clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
    }
    // Return the new page, and the offset of the freed page.
    Ok((new, page.offset))
    }
    }
    // Decide what to do with the concatenation of two neighbouring
    // pages, with a middle element.
    //
    // This is highly similar to the sized case.
    async fn merge_or_rebalance<'a, T: AllocPage>(
    txn: &mut T,
    m: Concat<'a, K, V, Self>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    // First evaluate the size of the middle element on a
    // page. Contrarily to the sized case, the offsets are
    // aligned, so the header is always the same size (no
    // padding).
    let mid_size = if m.modified.c0.is_leaf {
    2 + alloc_size::<K, V>(m.mid.0, m.mid.1)
    } else {
    8 + alloc_size::<K, V>(m.mid.0, m.mid.1)
    };
    // Evaluate the size of the modified page of the concatenation
    // (which includes the header).
    let mod_size = size::<K, V>(&m.modified);
    // Add the "occupied" size (which excludes the header).
    let occupied = {
    let hdr = header(m.other.as_page());
    (hdr.left_page() & 0xfff) as usize
    };
    if mod_size + mid_size + occupied <= PAGE_SIZE {
    // If the concatenation fits on a page, merge.
    return if m.modified.c0.is_leaf {
    merge::<_, _, _, _, Leaf>(txn, m).await
    } else {
    merge::<_, _, _, _, Internal>(txn, m).await
    };
    }
    // If we can't merge, evaluate the size of the first binding
    // on the other page, to see if we can rebalance.
    let first_other = PageCursor::new(&m.other, 0);
    let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);
    // If the modified page is at least half-full, or if removing
    // the first entry on the other page would make that other
    // page less than half-full, don't rebalance. See the Sized
    // implementation to see cases where this happens.
    if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {
    if let Some((k, v)) = m.modified.ins {
    return Ok(Op::Put(Self::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    m.modified.skip_first,
    &m.modified.c1,
    k,
    v,
    m.modified.ins2,
    m.modified.l,
    m.modified.r,
    ).await?));
    } else if m.modified.skip_first {
    debug_assert!(m.modified.ins2.is_none());
    let (page, freed) = Self::del(
    txn,
    m.modified.page,
    m.modified.mutable,
    &m.modified.c1,
    m.modified.l,
    ).await?;
    return Ok(Op::Put(Put::Ok(Ok { page, freed })));
    } else {
    let mut c1 = m.modified.c1.clone();
    let mut l = m.modified.l;
    if l == 0 {
    Self::move_next(&mut c1);
    l = m.modified.r;
    }
    return Ok(Op::Put(Put::Ok(Self::update_left_child(
    txn,
    m.modified.page,
    m.modified.mutable,
    &c1,
    l,
    ).await?)));
    }
    }
    // Finally, if we're here, we can rebalance. There are four
    // (relatively explicit) cases, see the `rebalance` submodule
    // to see how this is done.
    if m.mod_is_left {
    if m.modified.c0.is_leaf {
    rebalance_left::<_, _, _, Leaf>(txn, m).await
    } else {
    rebalance_left::<_, _, _, Internal>(txn, m).await
    }
    } else {
    rebalance_right::<_, _, _, Self>(txn, m).await
    }
    }
    }
    /// Size of a modified page (including the header).
    fn size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    m: &ModifiedPage<K, V, Page<K, V>>,
    ) -> usize {
    let mut total = {
    let hdr = header(m.page.as_page());
    (hdr.left_page() & 0xfff) as usize
    };
    total += HDR;
    // Extra size for the offsets.
    let extra = if m.c1.is_leaf { 2 } else { 8 };
    if let Some((k, v)) = m.ins {
    total += extra + alloc_size(k, v) as usize;
    if let Some((k, v)) = m.ins2 {
    total += extra + alloc_size(k, v) as usize;
    }
    }
    if m.skip_first {
    total -= current_size::<K, V>(m.page.as_page(), &m.c1) as usize;
    }
    total
    }
    // Size of a pair of type `(K, V)`. This is computed in the same way
    // as a struct with fields of type `K` and `V` in C.
    fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
    let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
    let al = K::ALIGN.max(V::ALIGN);
    (s + al - 1) & !(al - 1)
    }
    // Total size of the entry for that cursor position, including the
    // offset size.
    fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page,
    c: &PageCursor,
    ) -> usize {
    if c.is_leaf {
    Leaf::current_size::<K, V>(page, c.cur)
    } else {
    Internal::current_size::<K, V>(page, c.cur)
    }
    }
    pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {
    fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize);
    fn set_left_child(new: &mut MutPage, n: isize, l: u64);
    }
    /// Perform the modifications on a page, by copying it onto page `new`.
    fn modify<K: core::fmt::Debug + ?Sized, V: core::fmt::Debug + ?Sized, P: BTreeMutPage<K, V>, L: AllocWrite<K, V>>(
    new: &mut MutPage,
    m: &mut ModifiedPage<K, V, P>,
    n: &mut isize,
    ) {
    let mut l = P::left_child(m.page.as_page(), &m.c0);
    while let Some((k, v, r)) = P::next(m.page.as_page(), &mut m.c0) {
    L::alloc_write(new, k, v, l, r, n);
    l = 0;
    }
    let mut rr = m.r;
    if let Some((k, v)) = m.ins {
    if let Some((k2, v2)) = m.ins2 {
    L::alloc_write(new, k, v, l, m.l, n);
    L::alloc_write(new, k2, v2, 0, m.r, n);
    } else if m.l > 0 {
    L::alloc_write(new, k, v, m.l, m.r, n);
    } else {
    L::alloc_write(new, k, v, l, m.r, n);
    }
    l = 0;
    rr = 0;
    } else if m.l > 0 {
    l = m.l
    }
    let mut c1 = m.c1.clone();
    if m.skip_first {
    P::move_next(&mut c1);
    }
    // Here's a confusing thing: if the first element of `c1` is the
    // last element of a page, we may be updating its right child (in
    // which case m.r > 0) rather than its left child like for all
    // other elements.
    //
    // This case only ever happens for this function when we're
    // updating the last child of a page p, and then merging p with
    // its right neighbour.
    while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
    if rr > 0 {
    L::alloc_write(new, k, v, l, rr, n);
    rr = 0;
    } else {
    L::alloc_write(new, k, v, l, r, n);
    }
    l = 0;
    }
    if l != 0 {
    // The while loop above didn't run, i.e. the insertion
    // happened at the end of the page. In this case, we haven't
    // had a chance to update the left page, so do it now.
    L::set_left_child(new, *n, l)
    }
    }
    /// The very unsurprising `merge` function
    pub(super) async fn merge<
    'a,
    T: AllocPage + LoadPage,
    K: ?Sized + core::fmt::Debug,
    V: ?Sized + core::fmt::Debug,
    P: BTreeMutPage<K, V>,
    L: AllocWrite<K, V>,
    >(
    txn: &mut T,
    mut m: Concat<'a, K, V, P>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    // Here, we first allocate a page, then clone both pages onto it,
    // in a different order depending on whether the modified page is
    // the left or the right child.
    //
    // Note that in the case that this merge happens immediately after
    // a put that reallocated the two sides of the merge in order to
    // split (not all splits do that), we could be slightly more
    // efficient, but with considerably more code.
    let mut new = txn.alloc_page().await?;
    P::init(&mut new);
    let mut n = 0;
    if m.mod_is_left {
    modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);
    let mut rc = P::cursor_first(&m.other);
    let l = P::left_child(m.other.as_page(), &rc);
    L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, l, &mut n);
    while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {
    L::alloc_write(&mut new, k, v, 0, r, &mut n);
    }
    } else {
    let mut rc = P::cursor_first(&m.other);
    let mut l = P::left_child(m.other.as_page(), &rc);
    while let Some((k, v, r)) = P::next(m.other.as_page(), &mut rc) {
    L::alloc_write(&mut new, k, v, l, r, &mut n);
    l = 0;
    }
    L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, 0, &mut n);
    modify::<_, _, _, L>(&mut new, &mut m.modified, &mut n);
    }
    let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
    let b1 = if m.other.is_dirty() { 1 } else { 0 };
    Ok(Op::Merged {
    page: new,
    freed: [m.modified.page.offset | b0, m.other.offset | b1],
    marker: core::marker::PhantomData,
    })
    }
    fn clone<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
    page: crate::Page,
    new: &mut MutPage,
    s: Offsets<L::Offset>,
    n: &mut isize,
    ) {
    for off in s.0.iter() {
    let (r, off): (u64, usize) = (*off).into();
    unsafe {
    let ptr = page.data.as_ptr().add(off);
    let size = entry_size::<K, V>(ptr);
    // Reserve the space on the page
    let hdr = header_mut(new);
    let data = hdr.data() as u16;
    let off_new = data - size as u16;
    hdr.set_data(off_new);
    hdr.set_n(hdr.n() + 1);
    if hdr.is_leaf() {
    hdr.incr(2 + size);
    let ptr = new.0.data.offset(HDR as isize + *n * 2) as *mut u16;
    *ptr = (off_new as u16).to_le();
    } else {
    hdr.incr(8 + size);
    // Set the offset to this new entry in the offset
    // array, along with the right child page.
    let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
    *ptr = (r | off_new as u64).to_le();
    }
    core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
    }
    *n += 1;
    }
    }
  • file addition: page_unsized (d--r------)
    [0.12617]
  • file addition: rebalance.rs (----------)
    [0.61868]
    use super::cursor::*;
    use super::*;
    // The implementation here is mostly the same as in the sized case,
    // except for leaves, which makes it hard to refactor.
    pub(crate) async fn rebalance_left<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc,
    >(
    txn: &mut T,
    m: Concat<'a, K, V, Page<K, V>>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    assert!(m.mod_is_left);
    // First element of the right page. We'll rotate it to the left
    // page, i.e. return a middle element, and append the current
    // middle element to the left page.
    let rc = super::cursor::PageCursor::new(&m.other, 0);
    let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);
    let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();
    let mut freed_ = [0, 0];
    // First, perform the modification on the modified page, which we
    // know is the left page, and return the resulting mutable page
    // (the modified page may or may not be mutable before we do this).
    let new_left = if let Some((k, v)) = m.modified.ins {
    let is_dirty = m.modified.page.is_dirty();
    let new_left = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    m.modified.skip_first,
    &m.modified.c1,
    k,
    v,
    m.modified.ins2,
    m.modified.l,
    m.modified.r,
    ).await? {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    page
    } else {
    unreachable!()
    };
    // Append the middle element of the concatenation at the end
    // of the left page. We know the left page to be mutable by
    // now, and we also know there's enough space to do this.
    let lc = PageCursor::after(&new_left.0);
    if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
    txn, new_left.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
    ).await? {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    page
    } else {
    unreachable!()
    }
    } else if m.modified.skip_first {
    let is_dirty = m.modified.page.is_dirty();
    let (page, freed) = <Page<K, V>>::del(
    txn,
    m.modified.page,
    m.modified.mutable,
    &m.modified.c1,
    m.modified.l,
    ).await?;
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    // Append the middle element of the concatenation at the end
    // of the left page. We know the left page to be mutable by
    // now, and we also know there's enough space to do this.
    let lc = PageCursor::after(&page.0);
    if let Put::Ok(Ok { freed, page }) =
    <Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl).await?
    {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    page
    } else {
    unreachable!()
    }
    } else {
    // Append the middle element of the concatenation at the end
    // of the left page. We know the left page to be mutable by
    // now, and we also know there's enough space to do this.
    let is_dirty = m.modified.page.is_dirty();
    let lc = PageCursor::after(&m.modified.page);
    if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    false,
    &lc,
    m.mid.0,
    m.mid.1,
    None,
    0,
    rl,
    ).await? {
    if m.modified.l > 0 {
    assert_eq!(m.modified.r, 0);
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);
    *off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    } else if m.modified.r > 0 {
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
    *off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    page
    } else {
    unreachable!()
    }
    };
    // We extend the pointer's lifetime here, because we know the
    // current deletion (we only rebalance during deletions) won't
    // touch this page anymore after this.
    let k = unsafe { core::mem::transmute(k) };
    let v = unsafe { core::mem::transmute(v) };
    // If this frees the old "other" page, add it to the "freed"
    // array.
    let is_dirty = m.other.is_dirty();
    let (new_right, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;
    if freed > 0 {
    freed_[1] = if is_dirty { freed | 1 } else { freed }
    }
    Ok(Op::Rebalanced {
    l: new_left.0.offset,
    r: new_right.0.offset,
    k,
    v,
    freed: freed_,
    })
    }
    // Surprisingly, the `rebalance_right` function is simpler,
    // since:
    //
    // - if we call it to rebalance two internal nodes, we're in the easy
    // case of rebalance_left.
    //
    // - Else, the middle element is the last one on the left page, and
    // isn't erased be leaf deletions, because these just move entries to
    // the left.
    //
    // This implementation is shared with the sized one.
    pub(crate) async fn rebalance_right<
    'a,
    T: AllocPage,
    K: ?Sized,
    V: ?Sized,
    P: BTreeMutPage<K, V, Cursor = super::PageCursor>,
    >(
    txn: &mut T,
    m: Concat<'a, K, V, P>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    assert!(!m.mod_is_left);
    // Take the last element of the left page.
    let lc = P::cursor_last(&m.other);
    let (k0, v0, r0) = P::current(m.other.as_page(), &lc).unwrap();
    let mut freed_ = [0, 0];
    // Perform the modification on the modified page.
    let new_right = if let Some((k, v)) = m.modified.ins {
    let is_dirty = m.modified.page.is_dirty();
    let new_right = if let Put::Ok(Ok { page, freed }) = P::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    m.modified.skip_first,
    &m.modified.c1,
    k,
    v,
    m.modified.ins2,
    m.modified.l,
    m.modified.r,
    ).await? {
    if freed > 0 {
    freed_[0] = if is_dirty { freed | 1 } else { freed };
    }
    page
    } else {
    unreachable!()
    };
    // Add the middle element of the concatenation as the first
    // element of the right page. We know the right page is
    // mutable, since we just modified it (hence the
    // `assert_eq!(freed, 0)`.
    // First element of the right page (after potential
    // modification by `put` above).
    let rc = P::cursor_first(&new_right.0);
    let rl = P::left_child(new_right.0.as_page(), &rc);
    if let Put::Ok(Ok { freed, page }) = P::put(
    txn,
    new_right.0,
    true,
    false,
    &rc,
    m.mid.0,
    m.mid.1,
    None,
    r0,
    rl,
    ).await? {
    debug_assert_eq!(freed, 0);
    page
    } else {
    unreachable!()
    }
    } else if m.modified.skip_first {
    let is_dirty = m.modified.page.is_dirty();
    let (page, freed) = P::del(
    txn,
    m.modified.page,
    m.modified.mutable,
    &m.modified.c1,
    m.modified.l,
    ).await?;
    if freed > 0 {
    freed_[0] = if is_dirty { freed | 1 } else { freed };
    }
    // Add the middle element of the concatenation as the first
    // element of the right page. We know the right page is
    // mutable, since we just modified it. Moreover, if it is
    // compacted by the `put` below, we know that the `del` didn't
    // free anything, hence we can reuse the slot 0..
    // First element of the right page (after potential
    // modification by `del` above).
    let rc = P::cursor_first(&page.0);
    let rl = P::left_child(page.0.as_page(), &rc);
    if let Put::Ok(Ok { freed, page }) = P::put(
    txn, page.0, true, false, &rc, m.mid.0, m.mid.1, None, r0, rl,
    ).await? {
    if freed > 0 {
    freed_[0] = if is_dirty { freed | 1 } else { freed };
    }
    page
    } else {
    unreachable!()
    }
    } else {
    let is_dirty = m.modified.page.is_dirty();
    let rc = P::cursor_first(&m.modified.page);
    let rl = P::left_child(m.modified.page.as_page(), &rc);
    if let Put::Ok(Ok { freed, page }) = P::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    false,
    &rc,
    m.mid.0,
    m.mid.1,
    None,
    r0,
    rl,
    ).await? {
    // Update the left and right offsets. We know that at
    // least one of them is 0, but it can be either one (or
    // both), depending on what happened on the page below.
    //
    // Since we inserted an entry at the beginning of the
    // page, we need to add 1 to the index given by
    // `m.modified.c1.cur`.
    if m.modified.l > 0 {
    assert_eq!(m.modified.r, 0);
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
    *off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    } else if m.modified.r > 0 {
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur + 1);
    *off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    if freed > 0 {
    freed_[0] = if is_dirty { freed | 1 } else { freed };
    }
    page
    } else {
    unreachable!()
    }
    };
    // As explained in the general comment on this function, this
    // entry isn't erased by the deletion in `m.other` below, so we
    // can safely extend its lifetime.
    let k = unsafe { core::mem::transmute(k0) };
    let v = unsafe { core::mem::transmute(v0) };
    let is_dirty = m.other.is_dirty();
    let (new_left, freed) = P::del(txn, m.other, m.other_is_mutable, &lc, 0).await?;
    if freed > 0 {
    freed_[1] = if is_dirty { freed | 1 } else { freed }
    }
    Ok(Op::Rebalanced {
    l: new_left.0.offset,
    r: new_right.0.offset,
    k,
    v,
    freed: freed_,
    })
    }
  • file addition: put.rs (----------)
    [0.61868]
    use super::header::{header, header_mut, HDR};
    use super::*;
    pub(super) async fn put<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc + AllocWrite<K, V>,
    >(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l: u64,
    r: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    // Size of the new insertions, not counting the offsets.
    let size = alloc_size(k0, v0)
    + if let Some((k1, v1)) = k1v1 {
    alloc_size(k1, v1)
    } else {
    0
    };
    // Sized occupied by the data in these insertions (not counting
    // the offsets), if we need to compact the page.
    let size_replaced = if replace {
    // We're always in an internal node if we replace.
    let cur_size = Internal::current_size::<K, V>(page.as_page(), u as isize);
    // Since `size` isn't counting the size of the 64-bit offset,
    // and `cur_size` is, we need to add 8.
    8 + size as isize - cur_size as isize
    } else {
    size as isize
    };
    // Number of extra offsets.
    let n_ins = {
    let n_ins = if k1v1.is_some() { 2 } else { 1 };
    if replace {
    n_ins - 1
    } else {
    n_ins
    }
    };
    let hdr = header(page.as_page());
    if mutable && page.is_dirty() && L::can_alloc(header(page.as_page()), size, n_ins) {
    // First, if the page is dirty and mutable, mark it mutable.
    let mut page = MutPage(page);
    let mut n = u as isize;
    // If we replace, we first need to "unlink" the previous
    // value, by erasing its offset.
    if replace {
    let p = page.0.data;
    let hdr = header_mut(&mut page);
    // In this case, we know we're not in a leaf, so offsets
    // are of size 8.
    assert!(!hdr.is_leaf());
    unsafe {
    let ptr = p.add(HDR + n as usize * 8) as *mut u64;
    let off = (u64::from_le(*ptr) & 0xfff) as usize;
    let kv_ptr = p.add(off);
    let size = entry_size::<K, V>(kv_ptr);
    core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
    // Decreasing these figures here, we'll increase them
    // again in the calls to `alloc_write` below.
    hdr.decr(8 + size);
    hdr.set_n(hdr.n() - 1);
    }
    }
    // Do the actual insertions.
    if let Some((k1, v1)) = k1v1 {
    L::alloc_write(&mut page, k0, v0, 0, 0, &mut n);
    L::alloc_write(&mut page, k1, v1, l, r, &mut n);
    } else {
    L::alloc_write(&mut page, k0, v0, l, r, &mut n);
    }
    // Return: no page was freed.
    Ok(Put::Ok(Ok { page, freed: 0 }))
    } else if L::can_compact(hdr, size_replaced, n_ins) {
    // Allocate a new page, split the offsets at the position of
    // the insertion, and each side of the split onto the new
    // page, with the insertion between them.
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    // Clone the leftmost child.
    L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);
    // Split the offsets.
    let s = L::offset_slice(page.as_page());
    let (s0, s1) = s.split_at(u as usize);
    // Drop the first element of `s1` if we're replacing it.
    let s1 = if replace { s1.split_at(1).1 } else { s1 };
    // Finally, clone and insert.
    let mut n = 0;
    clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
    if let Some((k1, v1)) = k1v1 {
    L::alloc_write(&mut new, k0, v0, 0, l, &mut n);
    L::alloc_write(&mut new, k1, v1, 0, r, &mut n);
    } else {
    L::alloc_write(&mut new, k0, v0, l, r, &mut n);
    }
    clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);
    // And return, freeing the old version of the page.
    Ok(Put::Ok(Ok {
    page: new,
    freed: page.offset | if page.is_dirty() { 1 } else { 0 },
    }))
    } else {
    split_unsized::<_, _, _, L>(txn, page, replace, u, k0, v0, k1v1, l, r).await
    }
    }
    // Surprisingly, the following function is simpler than in the sized
    // case, because we can't be too efficient in this case, since we have
    // to go through all the elements linearly anyway to get their sizes.
    async fn split_unsized<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc + AllocWrite<K, V>,
    >(
    txn: &mut T,
    page: CowPage,
    replace: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l0: u64,
    r0: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    let hdr = header(page.as_page());
    let mut left = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
    let mut right = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
    // Clone the leftmost child. If we're inserting at position 0,
    // this means this leftmost child must, and will be replaced.
    let left_child = hdr.left_page() & !0xfff;
    L::set_right_child(&mut left, -1, left_child);
    // Pointer to the split key and value.
    let mut split = None;
    // We start filling the left page, and we will change to filling
    // the right page when the left one is at least half-full.
    let mut current_page = &mut left;
    // There are two synchronised counters here: `hdr.n()` and
    // `n`. The reason for this is that operations on `hdr.n()` are
    // somewhat cumbersome, as they might involve extra operations to
    // convert to/from the little-endian encoding of `hdr.n()`.
    let mut n = 0;
    let s = L::offset_slice(page.as_page());
    let mut total = HDR;
    for (uu, off) in s.0.iter().enumerate() {
    // If the current position of the cursor is the insertion,
    // (i.e. `uu == u`), insert.
    if uu == u {
    // The following is tricky and makes assumptions on the
    // rest of the code. If we are inserting two elements
    // (i.e. if `k1v1.is_some()`), this means we're replacing
    // one on the page.
    header_mut(current_page).set_n(n as u16);
    if let Some((k1, v1)) = k1v1 {
    // As explained in the documentation for `put` in the
    // definition of `BTreeMutPage`, `l0` and `r0` are the
    // left and right children of k1v1, since this means
    // the right child of a deleted entry has split, and
    // we must replace the deleted entry with `(k0, v0)`.
    L::alloc_write(current_page, k0, v0, 0, l0, &mut n);
    total += alloc_size(k0, v0) + L::OFFSET_SIZE;
    L::alloc_write(current_page, k1, v1, 0, r0, &mut n);
    total += alloc_size(k1, v1) + L::OFFSET_SIZE;
    // Replacing the next element:
    debug_assert!(replace);
    continue;
    } else {
    L::alloc_write(current_page, k0, v0, l0, r0, &mut n);
    total += alloc_size(k0, v0) + L::OFFSET_SIZE;
    if replace {
    continue;
    }
    }
    }
    // Then, i.e. either if we're not at the position of an
    // insertion, or else if we're not replacing the current
    // position, just copy the current entry to the appropriate
    // page (left or right).
    let (r, off): (u64, usize) = (*off).into();
    assert_ne!(off, 0);
    unsafe {
    let ptr = page.data.add(off);
    let size = entry_size::<K, V>(ptr);
    // If the left side of the split is at least half-full, we
    // know that we can do all the insertions we need on the
    // right-hand side (even if there are two of them, and the
    // replaced value is of size 0), so we switch.
    if split.is_none() && total >= PAGE_SIZE / 2 {
    // Don't write the next entry onto any page, keep it
    // as the split entry.
    let (k, v) = read::<K, V>(ptr);
    split = Some((K::from_raw_ptr(k), V::from_raw_ptr(v)));
    // Set the entry count for the current page, before
    // switching and resetting the counter.
    header_mut(current_page).set_n(n as u16);
    // And set the leftmost child of the right page to
    // `r`.
    L::set_right_child(&mut right, -1, r);
    // Then, switch page and reset the counter.
    current_page = &mut right;
    n = 0;
    } else {
    // Else, we're simply allocating a new entry on the
    // current page.
    let off_new = {
    let hdr = header_mut(current_page);
    let data = hdr.data();
    assert!(
    HDR + hdr.n() as usize * L::OFFSET_SIZE + L::OFFSET_SIZE + size
    < data as usize
    );
    // Move the data pointer `size` bytes to the left.
    hdr.set_data(data - size as u16);
    // And increase the number of entries, and
    // occupied size of the page.
    hdr.incr(size + L::OFFSET_SIZE);
    data - size as u16
    };
    // Copy the entry.
    core::ptr::copy_nonoverlapping(
    ptr,
    current_page.0.data.offset(off_new as isize),
    size,
    );
    // And set the offset.
    L::set_offset(current_page, n, r | (off_new as u64));
    n += 1;
    }
    total += size + L::OFFSET_SIZE;
    }
    }
    header_mut(current_page).set_n(n as u16);
    // Finally, it is possible that we haven't had a chance to do the
    // insertions yet: this is the case iff we're inserting at the end
    // of all the entries, so handle this now.
    if u == s.0.len() {
    if let Some((k1, v1)) = k1v1 {
    L::alloc_write(current_page, k0, v0, 0, l0, &mut n);
    L::alloc_write(current_page, k1, v1, 0, r0, &mut n);
    } else {
    L::alloc_write(current_page, k0, v0, l0, r0, &mut n);
    }
    }
    let (split_key, split_value) = split.unwrap();
    Ok(Put::Split {
    split_key,
    split_value,
    left,
    right,
    freed: if page.is_dirty() {
    page.offset | 1
    } else {
    page.offset
    },
    })
    }
  • file addition: header.rs (----------)
    [0.61868]
    use crate::PAGE_SIZE;
    #[derive(Debug)]
    #[repr(C)]
    pub struct Header {
    data: u16,
    n: u16,
    crc: u32,
    left_page: u64,
    }
    impl Header {
    pub(crate) fn init(&mut self) {
    self.data = (((PAGE_SIZE as u16) << 3) | 1).to_le();
    self.n = 0;
    self.crc = 0;
    self.left_page = 0;
    }
    pub(crate) fn n(&self) -> u16 {
    u16::from_le(self.n)
    }
    pub(crate) fn set_n(&mut self, n: u16) {
    self.n = n.to_le();
    }
    pub fn left_page(&self) -> u64 {
    u64::from_le(self.left_page)
    }
    pub fn set_left_page(&mut self, l: u64) {
    self.left_page = l.to_le()
    }
    pub fn data(&self) -> u16 {
    u16::from_le(self.data) >> 3
    }
    pub fn set_data(&mut self, d: u16) {
    let dirty = u16::from_le(self.data) & 7;
    self.data = ((d << 3) | dirty).to_le()
    }
    pub fn decr(&mut self, s: usize) {
    self.left_page = (self.left_page() - s as u64).to_le();
    }
    pub fn set_occupied(&mut self, size: u64) {
    self.left_page = ((self.left_page() & !0xfff) | size).to_le()
    }
    pub fn incr(&mut self, s: usize) {
    self.left_page = (self.left_page() + s as u64).to_le();
    }
    pub fn is_leaf(&self) -> bool {
    u64::from_le(self.left_page) <= 0xfff
    }
    }
    pub const HDR: usize = core::mem::size_of::<Header>();
    pub(crate) fn header<'a>(page: crate::Page<'a>) -> &'a Header {
    unsafe { &*(page.data.as_ptr() as *const Header) }
    }
    pub fn header_mut(page: &mut crate::MutPage) -> &mut Header {
    unsafe { &mut *(page.0.data as *mut Header) }
    }
  • file addition: cursor.rs (----------)
    [0.61868]
    use super::{header, Header};
    #[derive(Debug, Clone, Copy)]
    pub struct PageCursor {
    pub cur: isize,
    pub total: usize,
    pub is_leaf: bool,
    }
    impl PageCursor {
    /// Initialise a cursor on page `p` at position `cur`, checking
    /// that `cur` is a valid position.
    pub fn new(p: &crate::CowPage, cur: isize) -> Self {
    let hdr = unsafe { &*(p.data as *const Header) };
    assert!(cur >= -1 && cur < hdr.n() as isize);
    PageCursor {
    cur,
    total: hdr.n() as usize,
    is_leaf: hdr.is_leaf(),
    }
    }
    /// Initialise a cursor set after the last entry of page `p`.
    pub fn after(p: &crate::CowPage) -> Self {
    let hdr = unsafe { &*(p.data as *const Header) };
    let total = hdr.n();
    PageCursor {
    cur: total as isize,
    total: total as usize,
    is_leaf: hdr.is_leaf(),
    }
    }
    /// Initialise a cursor set on the last entry of page `p`.
    pub fn last(p: crate::Page) -> Self {
    let hdr = header(p);
    let total = hdr.n();
    PageCursor {
    cur: (total - 1) as isize,
    total: total as usize,
    is_leaf: hdr.is_leaf(),
    }
    }
    }
  • file addition: alloc.rs (----------)
    [0.61868]
    use super::header::{header, header_mut, Header, HDR};
    use super::*;
    /// We'd love to share this trait with the sized implementation, but
    /// unfortunately, the type parameters of almost all methods are
    /// different.
    pub(crate) trait Alloc {
    const OFFSET_SIZE: usize;
    /// Size (including the offset size) of the entry at position
    /// `cur`.
    fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page,
    cur: isize,
    ) -> usize;
    /// Can we allocate an entry of `size` bytes, where `size` doesn't
    /// include the offset bytes?
    fn can_alloc(hdr: &Header, size: usize, n: usize) -> bool {
    (HDR as usize) + (hdr.n() as usize) * Self::OFFSET_SIZE + n * Self::OFFSET_SIZE + size
    <= hdr.data() as usize
    }
    /// If we cloned the page, could we allocate an entry of `size`
    /// bytes, where `size` doesn't include the offset bytes?
    fn can_compact(hdr: &Header, size: isize, n: usize) -> bool {
    (HDR as isize)
    + ((hdr.left_page() & 0xfff) as isize)
    + (n * Self::OFFSET_SIZE) as isize
    + size
    <= PAGE_SIZE as isize
    }
    /// Set the right child of the `n`th entry to `r`. If `n == -1`,
    /// this method can be used to set the leftmost child of a page.
    fn set_right_child(_new: &mut MutPage, _n: isize, _r: u64) {}
    /// Set the n^th offset on the page to `r`, which encodes a page
    /// offset in its 52 MSBs, and an offset on the page in its 12 LSBs.
    fn set_offset(new: &mut MutPage, n: isize, r: u64);
    /// The type of offsets, u64 in internal nodes, u16 in leaves.
    type Offset: Into<(u64, usize)> + Copy + core::fmt::Debug;
    fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset>;
    }
    #[derive(Debug, Clone, Copy)]
    pub struct Offsets<'a, A>(pub &'a [A]);
    impl<'a, A: Copy> Offsets<'a, A> {
    pub fn split_at(self, mid: usize) -> (Self, Self) {
    if mid < self.0.len() {
    let (a, b) = self.0.split_at(mid);
    (Offsets(a), Offsets(b))
    } else {
    debug_assert_eq!(mid, self.0.len());
    (self, Offsets(&[][..]))
    }
    }
    }
    pub struct Leaf {}
    pub struct Internal {}
    impl Alloc for Leaf {
    const OFFSET_SIZE: usize = 2;
    // Note: the size returned by this function includes the offset.
    fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page,
    cur: isize,
    ) -> usize {
    // Find the offset of the current position, get its size.
    unsafe {
    debug_assert!(cur >= 0);
    let off = *(page.data.as_ptr().add(HDR + 2 * cur as usize) as *const u16);
    Self::OFFSET_SIZE
    + entry_size::<K, V>(page.data.as_ptr().add(u16::from_le(off) as usize))
    }
    }
    fn set_offset(new: &mut MutPage, n: isize, off: u64) {
    unsafe {
    let ptr = new.0.data.offset(HDR as isize + n * 2) as *mut u16;
    *ptr = (off as u16).to_le();
    }
    }
    type Offset = LeafOffset;
    fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
    let hdr_n = header(page).n();
    unsafe {
    Offsets(core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const LeafOffset,
    hdr_n as usize,
    ))
    }
    }
    }
    impl Alloc for Internal {
    const OFFSET_SIZE: usize = 8;
    fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
    page: crate::Page,
    cur: isize,
    ) -> usize {
    unsafe {
    8 + entry_size::<K, V>(page.data.as_ptr().add(
    (u64::from_le(*(page.data.as_ptr().add(HDR) as *const u64).offset(cur)) & 0xfff)
    as usize,
    ))
    }
    }
    /// Set the right-hand child in the offset array, preserving the
    /// current offset.
    fn set_right_child(page: &mut MutPage, n: isize, r: u64) {
    unsafe {
    debug_assert_eq!(r & 0xfff, 0);
    let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
    let off = u64::from_le(*ptr) & 0xfff;
    *ptr = (r | off as u64).to_le();
    }
    }
    fn set_offset(new: &mut MutPage, n: isize, off: u64) {
    unsafe {
    let ptr = new.0.data.offset(HDR as isize + n * 8) as *mut u64;
    *ptr = off.to_le();
    }
    }
    type Offset = InternalOffset;
    fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
    let hdr_n = header(page).n() as usize;
    unsafe {
    Offsets(core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const InternalOffset,
    hdr_n,
    ))
    }
    }
    }
    #[derive(Debug, Clone, Copy)]
    #[repr(C)]
    pub struct LeafOffset(u16);
    impl Into<(u64, usize)> for LeafOffset {
    fn into(self) -> (u64, usize) {
    (0, self.0 as usize)
    }
    }
    impl Into<usize> for LeafOffset {
    fn into(self) -> usize {
    self.0 as usize
    }
    }
    #[derive(Debug, Clone, Copy)]
    #[repr(C)]
    pub struct InternalOffset(u64);
    impl Into<(u64, usize)> for InternalOffset {
    fn into(self) -> (u64, usize) {
    (self.0 & !0xfff, (self.0 & 0xfff) as usize)
    }
    }
    impl Into<usize> for InternalOffset {
    fn into(self) -> usize {
    self.0 as usize
    }
    }
    impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Leaf {
    fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
    alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
    }
    fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {
    debug_assert_eq!(l, 0);
    }
    }
    impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Internal {
    fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
    alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
    }
    fn set_left_child(new: &mut MutPage, n: isize, l: u64) {
    if l > 0 {
    Internal::set_right_child(new, n - 1, l);
    }
    }
    }
    // Allocate a new entry and copy (k0, v0) into the new slot.
    fn alloc_write<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
    new: &mut MutPage,
    k0: &K,
    v0: &V,
    l: u64,
    r: u64,
    n: &mut isize,
    ) {
    let size = alloc_size(k0, v0);
    let off_new = alloc_insert::<K, V, L>(new, n, size, r);
    unsafe {
    let new_ptr = new.0.data.add(off_new);
    k0.write_to_page(new_ptr);
    let ks = k0.size();
    let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));
    v0.write_to_page(v_ptr);
    }
    if l > 0 {
    L::set_right_child(new, *n - 1, l);
    }
    *n += 1;
    }
    /// Reserve space on the page for `size` bytes of data (i.e. not
    /// counting the offset), set the right child of the new position
    /// to `r`, add the offset to the new data to the offset array,
    /// and return the new offset.
    fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
    new: &mut MutPage,
    n: &mut isize,
    size: usize,
    r: u64,
    ) -> usize {
    let hdr = header_mut(new);
    let data = hdr.data();
    // If we're here, the following assertion has been checked by the
    // `can_alloc` method.
    debug_assert!(HDR + (hdr.n() as usize + 1) * L::OFFSET_SIZE + size <= data as usize);
    hdr.set_data(data - size as u16);
    hdr.set_n(hdr.n() + 1);
    hdr.incr(L::OFFSET_SIZE + size);
    let off_new = data - size as u16;
    let hdr_n = hdr.n();
    // Making space for the new offset
    unsafe {
    core::ptr::copy(
    new.0.data.add(HDR + (*n as usize) * L::OFFSET_SIZE),
    new.0
    .data
    .add(HDR + (*n as usize) * L::OFFSET_SIZE + L::OFFSET_SIZE),
    (hdr_n as usize - (*n as usize)) * L::OFFSET_SIZE,
    );
    }
    L::set_offset(new, *n, r | (off_new as u64));
    off_new as usize
    }
  • file addition: page.rs (----------)
    [0.12617]
    //! Implementation of B tree pages for Sized types, i.e. types whose
    //! representation has a size known at compile time (and the same as
    //! [`core::mem::size_of()`]).
    //!
    //! The details of the implementation are as follows:
    //!
    //! - The page starts with a 16 bytes header of the following form
    //! (where all the fields are encoded in Little-Endian):
    //!
    //! ```
    //! #[repr(C)]
    //! pub struct Header {
    //! /// Offset to the first entry in the page, shifted 3 bits
    //! /// to the right to allow for the dirty bit (plus two
    //! /// extra bits, zero for now), as explained in the
    //! /// documentation of CowPage, at the root of this
    //! /// crate. This is 4096 for empty pages, and it is unused
    //! /// for leaves. Moreover, this field can't be increased:
    //! /// when it reaches the bottom, the page must be cloned.
    //! data: u16,
    //! /// Number of entries on the page.
    //! n: u16,
    //! /// CRC (if used).
    //! crc: u32,
    //! /// The 52 most significant bits are the left child of
    //! /// this page (0 for leaves), while the 12 LSBs represent
    //! /// the space this page would take when cloned from scratch,
    //! /// minus the header. The reason for this is that entries
    //! /// in internal nodes aren't really removed when deleted,
    //! /// they're only "unlinked" from the array of offsets (see
    //! /// below). Therefore, we must have a way to tell when a
    //! /// page can be "compacted" to reclaim space.
    //! left_page: u64,
    //! }
    //! ```
    //! - For leaves, the rest of the page has the same representation as
    //! an array of length `n`, of elements of type `Tuple<K, V>`:
    //! ```
    //! #[repr(C)]
    //! struct Tuple<K, V> {
    //! k: K,
    //! v: V,
    //! }
    //! ```
    //! If the alignment of that structure is more than 16 bytes, we
    //! need to add some padding between the header and that array.
    //! - For internal nodes, the rest of the page starts with an array of
    //! length `n` of Little-Endian-encoded [`u64`], where the 12 least
    //! significant bits of each [`u64`] are an offset to a `Tuple<K, V>` in
    //! the page, and the 52 other bits are an offset in the file to the
    //! right child of the entry.
    //!
    //! Moreover, the offset represented by the 12 LSBs is after (or at)
    //! `header.data`.
    //! We say we can "allocate" in the page if the `data` of the header
    //! is greater than or equal to the position of the last "offset",
    //! plus the size we want to allocate (note that since we allocate
    //! from the end of the page, `data` is always a multiple of the
    //! alignment of `Tuple<K, V>`).
    use super::*;
    use crate::btree::del::*;
    use crate::btree::put::*;
    use core::cmp::Ordering;
    mod alloc; // The "alloc" trait, to provide common methods for leaves and internal nodes.
    mod put; // Inserting a new value onto a page (possibly splitting the page).
    mod rebalance; // Rebalance two sibling pages to the left or to the right.
    use super::page_unsized::{cursor::PageCursor, header};
    use alloc::*;
    use header::*;
    use rebalance::*;
    /// This struct is used to allocate a pair `(K, V)` on the stack, and
    /// copy it from a C pointer from a page.
    ///
    /// This is also used to form slices of pairs in order to use
    /// functions from the core library.
    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
    #[repr(C)]
    struct Tuple<K, V> {
    k: K,
    v: V,
    }
    /// Empty type implementing `BTreePage` and `BTreeMutPage`.
    #[derive(Debug)]
    pub struct Page<K, V> {
    k: core::marker::PhantomData<K>,
    v: core::marker::PhantomData<V>,
    }
    #[async_trait(?Send)]
    impl<K: Storable, V: Storable> super::BTreePage<K, V> for Page<K, V> {
    // Cursors are quite straightforward. One non-trivial thing is
    // that they represent both a position on a page and the interval
    // from that position to the end of the page, as is apparent in
    // their `split_at` method.
    //
    // Another thing to note is that -1 and `c.total` are valid
    // positions for a cursor `c`. The reason for this is that
    // position `-1` has a right child (which is the first element's
    // left child).
    type Cursor = PageCursor;
    fn is_empty(c: &Self::Cursor) -> bool {
    c.cur >= c.total as isize
    }
    fn is_init(c: &Self::Cursor) -> bool {
    c.cur < 0
    }
    fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
    PageCursor::new(p, -1)
    }
    fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
    PageCursor::after(p)
    }
    fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
    (
    PageCursor {
    cur: 0,
    total: c.cur.max(0) as usize,
    is_leaf: c.is_leaf,
    },
    *c,
    )
    }
    fn move_next(c: &mut Self::Cursor) -> bool {
    if c.cur >= c.total as isize {
    return false;
    }
    c.cur += 1;
    true
    }
    fn move_prev(c: &mut Self::Cursor) -> bool {
    if c.cur < 0 {
    return false;
    }
    c.cur -= 1;
    true
    }
    fn current<'a>(
    page: crate::Page<'a>,
    c: &Self::Cursor,
    ) -> Option<(&'a K, &'a V, u64)> {
    // First, there's no current entry if the cursor is outside
    // the range of entries.
    if c.cur < 0 || c.cur >= c.total as isize {
    None
    } else if c.is_leaf {
    // Else, if this is a leaf, the elements are packed
    // together in a contiguous array.
    //
    // This means that the header may be followed by padding
    // (in order to align the entries). These are constants
    // known at compile-time, so `al` and `hdr` are optimised
    // away by the compiler.
    let al = core::mem::align_of::<Tuple<K, V>>();
    // The following is a way to compute the first multiple of
    // `al` after `HDR`, assuming `al` is a power of 2 (which
    // is always the case since we get it from `align_of`).
    let hdr = (HDR + al - 1) & !(al - 1);
    // The position of the `Tuple<K, V>` we're looking for is
    // `f * cur` bytes after the padded header (because
    // `size_of` includes alignment padding).
    let f = core::mem::size_of::<Tuple<K, V>>();
    let kv = unsafe {
    &*(page.data.as_ptr().add(hdr + c.cur as usize * f) as *const Tuple<K, V>)
    };
    Some((&kv.k, &kv.v, 0))
    } else {
    // Internal nodes have an extra level of indirection: we
    // first need to find `off`, the offset in the page, in
    // the initial array of offsets. Since these offsets are
    // `u64`, and the header is of size 16 bytes, the array is
    // already aligned.
    unsafe {
    let off =
    u64::from_le(*(page.data.as_ptr().add(HDR + 8 * c.cur as usize) as *const u64));
    // Check that we aren't reading outside of the page
    // (for example because of a malformed offset).
    assert!((off as usize & 0xfff) + core::mem::size_of::<Tuple<K, V>>() <= 4096);
    // Once we have the offset, cast its 12 LSBs to a
    // position in the page, and read the `Tuple<K, V>` at
    // that position.
    let kv = &*(page.data.as_ptr().add((off as usize) & 0xfff) as *const Tuple<K, V>);
    Some((&kv.k, &kv.v, off & !0xfff))
    }
    }
    }
    // The left and right child methods aren't really surprising. One
    // thing to note is that cursors are always in positions between
    // `-1` and `c.total` (bounds included), so we only have to check
    // one side of the bound in the assertions.
    //
    // We also check, before entering the `unsafe` sections, that
    // we're only reading data that is on a page.
    fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
    if c.is_leaf {
    0
    } else {
    assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
    let off =
    unsafe { *(page.data.as_ptr().offset((HDR as isize + c.cur * 8) - 8) as *const u64) };
    u64::from_le(off) & !0xfff
    }
    }
    fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
    if c.is_leaf {
    0
    } else {
    assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 <= 4088);
    let off =
    unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
    u64::from_le(off) & !0xfff
    }
    }
    async fn set_cursor<'a, 'b, T: LoadPage>(
    _txn: &'a T,
    page: crate::Page<'b>,
    c: &mut PageCursor,
    k0: &K,
    v0: Option<&V>,
    ) -> Result<(&'a K, &'a V, u64), usize> {
    unsafe {
    // `lookup` has the same semantic as
    // `core::slice::binary_search`, i.e. it returns either
    // `Ok(n)`, where `n` is the position where `(k0, v0)` was
    // found, or `Err(n)` where `n` is the position where
    // `(k0, v0)` can be inserted to preserve the order.
    match lookup(page, c, k0, v0).await {
    Ok(n) => {
    c.cur = n as isize;
    // Just read the tuple and return it.
    if c.is_leaf {
    let f = core::mem::size_of::<Tuple<K, V>>();
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    let tup =
    &*(page.data.as_ptr().add(hdr_size + f * n) as *const Tuple<K, V>);
    Ok((&tup.k, &tup.v, 0))
    } else {
    let off =
    u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
    let tup =
    &*(page.data.as_ptr().add(off as usize & 0xfff) as *const Tuple<K, V>);
    Ok((&tup.k, &tup.v, off & !0xfff))
    }
    }
    Err(n) => {
    c.cur = n as isize;
    Err(n)
    }
    }
    }
    }
    }
    #[async_trait(?Send)]
    impl<K: Storable + core::fmt::Debug, V: Storable + core::fmt::Debug> super::BTreeMutPage<K, V>
    for Page<K, V>
    {
    // Once again, this is quite straightforward.
    fn init(page: &mut MutPage) {
    let h = header_mut(page);
    h.init();
    }
    // When deleting from internal nodes, we take a replacement from
    // one of the leaves (in our current implementation, the leftmost
    // entry of the right subtree). This method copies an entry from
    // the leaf onto the program stack, which is necessary since
    // deletions in leaves overwrites entries.
    //
    // Another design choice would have been to do the same as for the
    // unsized implementation, but in this case this would have meant
    // copying the saved value to the end of the leaf, potentially
    // preventing merges, and not even saving a memory copy.
    type Saved = (K, V);
    fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
    unsafe {
    let mut k0 = core::mem::MaybeUninit::uninit();
    let mut v0 = core::mem::MaybeUninit::uninit();
    core::ptr::copy_nonoverlapping(k, k0.as_mut_ptr(), 1);
    core::ptr::copy_nonoverlapping(v, v0.as_mut_ptr(), 1);
    (k0.assume_init(), v0.assume_init())
    }
    }
    unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
    (core::mem::transmute(&s.0), core::mem::transmute(&s.1))
    }
    // `put` inserts one or two entries onto a node (internal or
    // leaf). This is implemented in the `put` module.
    async fn put<'a, T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    c: &PageCursor,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l: u64,
    r: u64,
    ) -> Result<super::put::Put<'a, K, V>, T::Error> {
    assert!(c.cur >= 0);
    // In the sized case, deletions can never cause a split, so we
    // never have to insert two elements at the same position.
    assert!(k1v1.is_none());
    if r == 0 {
    put::put::<_, _, _, Leaf>(txn, page, mutable, replace, c.cur as usize, k0, v0, 0, 0).await
    } else {
    put::put::<_, _, _, Internal>(txn, page, mutable, replace, c.cur as usize, k0, v0, l, r).await
    }
    }
    unsafe fn put_mut(page: &mut MutPage, c: &mut Self::Cursor, k0: &K, v0: &V, r: u64) {
    use super::page_unsized::AllocWrite;
    let mut n = c.cur;
    if r == 0 {
    Leaf::alloc_write(page, k0, v0, 0, r, &mut n);
    } else {
    Internal::alloc_write(page, k0, v0, 0, r, &mut n);
    }
    c.total += 1;
    }
    unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {
    let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    // This function updates an internal node, setting the left child
    // of the cursor to `l`.
    async fn update_left_child<T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    c: &Self::Cursor,
    l: u64,
    ) -> Result<crate::btree::put::Ok, T::Error> {
    assert!(!c.is_leaf && c.cur >= 0 && (c.cur as usize) <= c.total);
    let freed;
    let page = if mutable && page.is_dirty() {
    // If the page is mutable (dirty), just convert it to a
    // mutable page, and update.
    freed = 0;
    MutPage(page)
    } else {
    // Else, clone the page.
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    // First clone the left child of the page.
    let l = header(page.as_page()).left_page() & !0xfff;
    let hdr = header_mut(&mut new);
    hdr.set_left_page(l);
    // And then the rest of the page.
    let s = Internal::offset_slice::<T, K, V>(page.as_page());
    clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
    // Mark the former version of the page as free.
    freed = page.offset | if page.is_dirty() { 1 } else { 0 };
    new
    };
    // Finally, update the left child of the cursor.
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    Ok(Ok { page, freed })
    }
    async fn del<T: AllocPage>(
    txn: &mut T,
    page: crate::CowPage,
    mutable: bool,
    c: &PageCursor,
    l: u64,
    ) -> Result<(MutPage, u64), T::Error> {
    assert!(c.cur >= 0 && (c.cur as usize) < c.total);
    if mutable && page.is_dirty() {
    // In the mutable case, we just need to move some memory
    // around.
    let p = page.data;
    let mut page = MutPage(page);
    let hdr = header_mut(&mut page);
    let f = core::mem::size_of::<Tuple<K, V>>();
    if c.is_leaf {
    // In leaves, we need to move the n - c - 1 elements
    // that are strictly after the cursor, by `f` (the
    // size of an entry).
    //
    // Here's the reasoning to avoid off-by-one errors: if
    // `c` is 0 (i.e. we're deleting the first element on
    // the page), we remove one entry, so there are n - 1
    // remaining entries.
    let n = hdr.n() as usize;
    let hdr_size = {
    // As usual, header + padding
    let al = core::mem::align_of::<Tuple<K, V>>();
    (HDR + al - 1) & !(al - 1)
    };
    let off = hdr_size + c.cur as usize * f;
    unsafe {
    core::ptr::copy(p.add(off + f), p.add(off), f * (n - c.cur as usize - 1));
    }
    // Removing `f` bytes from the page.
    hdr.decr(f);
    } else {
    // Internal nodes are easier to deal with, as we just
    // have to move the offsets.
    unsafe {
    let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
    core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - c.cur as usize - 1);
    }
    // Removing `f` bytes from the page (the tuple), plus
    // one 8-bytes offset.
    hdr.decr(f + 8);
    // Updating the left page if necessary.
    if l > 0 {
    unsafe {
    let off = (p.add(HDR) as *mut u64).offset(c.cur as isize - 1);
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    }
    hdr.set_n(hdr.n() - 1);
    // Returning the mutable page itself, and 0 (for "no page freed")
    Ok((page, 0))
    } else {
    // Immutable pages need to be cloned. The strategy is the
    // same in both cases: get an "offset slice", split it at
    // the cursor, remove the first entry of the second half,
    // and clone.
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    if c.is_leaf {
    let s = Leaf::offset_slice::<T, K, V>(page.as_page());
    let (s0, s1) = s.split_at(c.cur as usize);
    let (_, s1) = s1.split_at(1);
    let mut n = 0;
    clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
    clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
    } else {
    // Internal nodes a bit trickier, since the left child
    // is not counted in the "offset slice", so we need to
    // clone it separately. Also, the `l` argument to this
    // function might be non-zero in this case.
    let s = Internal::offset_slice::<T, K, V>(page.as_page());
    let (s0, s1) = s.split_at(c.cur as usize);
    let (_, s1) = s1.split_at(1);
    // First, clone the left child of the page.
    let hdr = header(page.as_page());
    let left = hdr.left_page() & !0xfff;
    unsafe { *(new.0.data.add(HDR - 8) as *mut u64) = left.to_le() };
    // Then, clone the entries strictly before the cursor
    // (i.e. clone `s0`).
    let mut n = 0;
    clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
    // If we need to update the left child of the deleted
    // item, do it.
    if l > 0 {
    unsafe {
    let off = new.0.data.offset(HDR as isize + (n - 1) * 8) as *mut u64;
    *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    // Finally, clone the right half of the page (`s1`).
    clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
    }
    Ok((new, page.offset))
    }
    }
    // Decide what to do with the concatenation of two neighbouring
    // pages, with a middle element.
    async fn merge_or_rebalance<'a, T: AllocPage>(
    txn: &mut T,
    m: Concat<'a, K, V, Self>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    // First evaluate the size of the middle element on a page.
    let (hdr_size, mid_size) = if m.modified.c0.is_leaf {
    let al = core::mem::align_of::<Tuple<K, V>>();
    (
    (HDR + al - 1) & !(al - 1),
    core::mem::size_of::<Tuple<K, V>>(),
    )
    } else {
    (HDR, 8 + core::mem::size_of::<Tuple<K, V>>())
    };
    // Evaluate the size of the modified half of the concatenation
    // (which includes the header).
    let mod_size = size::<K, V>(&m.modified);
    // Add the "occupied" size (which excludes the header).
    let occupied = {
    let hdr = header(m.other.as_page());
    (hdr.left_page() & 0xfff) as usize
    };
    // One surprising observation here is that adding the sizes
    // works. This is surprising because of alignment and
    // padding. It works because we can split the sizes into an
    // offset part (always 8 bytes) and a data part (of a constant
    // alignment), and thus we never need any padding anywhere on
    // the page.
    if mod_size + mid_size + occupied <= PAGE_SIZE {
    // If the concatenation fits on a page, merge.
    return if m.modified.c0.is_leaf {
    super::page_unsized::merge::<_, _, _, _, Leaf>(txn, m).await
    } else {
    super::page_unsized::merge::<_, _, _, _, Internal>(txn, m).await
    };
    }
    // If the modified page is large enough, or if the other page
    // has only just barely the minimum number of elements to be
    // at least half-full, return.
    //
    // The situation where the other page isn't full enough might
    // happen for example if elements occupy exactly 1/5th of a
    // page + 1 byte, and the modified page has 2 of them after
    // the deletion, while the other page has 3.
    if mod_size >= PAGE_SIZE / 2 || hdr_size + occupied - mid_size < PAGE_SIZE / 2 {
    if let Some((k, v)) = m.modified.ins {
    // Perform the required modification and return.
    return Ok(Op::Put(Self::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    m.modified.skip_first,
    &m.modified.c1,
    k,
    v,
    m.modified.ins2,
    m.modified.l,
    m.modified.r,
    ).await?));
    } else if m.modified.skip_first {
    // `ins2` is only ever used when the page immediately
    // below a deletion inside an internal node has split,
    // and we need to replace the deleted value, *and*
    // insert the middle entry of the split.
    debug_assert!(m.modified.ins2.is_none());
    let (page, freed) = Self::del(
    txn,
    m.modified.page,
    m.modified.mutable,
    &m.modified.c1,
    m.modified.l,
    ).await?;
    return Ok(Op::Put(Put::Ok(Ok { page, freed })));
    } else {
    let mut c1 = m.modified.c1.clone();
    let mut l = m.modified.l;
    if l == 0 {
    Self::move_next(&mut c1);
    l = m.modified.r;
    }
    return Ok(Op::Put(Put::Ok(Self::update_left_child(
    txn,
    m.modified.page,
    m.modified.mutable,
    &c1,
    l,
    ).await?)));
    }
    }
    // Finally, if we're here, we can rebalance. There are four
    // (relatively explicit) cases, see the `rebalance` submodule
    // to see how this is done.
    if m.mod_is_left {
    if m.modified.c0.is_leaf {
    rebalance_left::<_, _, _, Leaf>(txn, m).await
    } else {
    rebalance_left::<_, _, _, Internal>(txn, m).await
    }
    } else {
    super::page_unsized::rebalance::rebalance_right::<_, _, _, Self>(txn, m).await
    }
    }
    }
    /// Size of a modified page (including the header).
    fn size<K: Storable, V: Storable>(m: &ModifiedPage<K, V, Page<K, V>>) -> usize {
    let mut total = {
    let hdr = header(m.page.as_page());
    (hdr.left_page() & 0xfff) as usize
    };
    if m.c1.is_leaf {
    let al = core::mem::align_of::<Tuple<K, V>>();
    total += (HDR + al - 1) & (!al - 1);
    } else {
    total += HDR
    };
    // Extra size for the offsets.
    let extra = if m.c1.is_leaf { 0 } else { 8 };
    if m.ins.is_some() {
    total += extra + core::mem::size_of::<Tuple<K, V>>();
    if m.ins2.is_some() {
    total += extra + core::mem::size_of::<Tuple<K, V>>()
    }
    }
    // If we skip the first entry of `m.c1`, remove its size.
    if m.skip_first {
    total -= extra + core::mem::size_of::<Tuple<K, V>>()
    }
    total
    }
    /// Linear search, leaf version. This is relatively straightforward.
    fn leaf_linear_search<K: Storable, V: Storable>(
    k0: &K,
    v0: Option<&V>,
    s: &[Tuple<K, V>],
    ) -> Result<usize, usize> {
    let mut n = 0;
    for sm in s.iter() {
    match sm.k.compare(k0) {
    Ordering::Less => n += 1,
    Ordering::Greater => return Err(n),
    Ordering::Equal => {
    if let Some(v0) = v0 {
    match sm.v.compare(v0) {
    Ordering::Less => n += 1,
    Ordering::Greater => return Err(n),
    Ordering::Equal => return Ok(n),
    }
    } else {
    return Ok(n);
    }
    }
    }
    }
    Err(n)
    }
    /// An equivalent of `Ord::cmp` on `Tuple<K, V>` but using
    /// `Storable::compare` instead of `Ord::cmp` on `K` and `V`.
    fn cmp<K: Storable, V: Storable>(
    k0: &K,
    v0: Option<&V>,
    p: &[u8; 4096],
    off: u64,
    ) -> Ordering {
    let off = u64::from_le(off) & 0xfff;
    if off as usize + core::mem::size_of::<Tuple<K, V>>() > PAGE_SIZE {
    panic!(
    "off = {:?}, size = {:?}",
    off,
    core::mem::size_of::<Tuple<K, V>>()
    );
    }
    let tup = unsafe { &*(p.as_ptr().offset(off as isize & 0xfff) as *const Tuple<K, V>) };
    match tup.k.compare(k0) {
    Ordering::Equal => {
    if let Some(v0) = v0 {
    tup.v.compare(v0)
    } else {
    Ordering::Equal
    }
    }
    o => o,
    }
    }
    /// Linear search for internal nodes. Does what it says.
    unsafe fn internal_search<K: Storable, V: Storable>(
    k0: &K,
    v0: Option<&V>,
    s: &[u64],
    p: &[u8; 4096],
    ) -> Result<usize, usize> {
    for (n, off) in s.iter().enumerate() {
    match cmp(k0, v0, p, *off) {
    Ordering::Less => {}
    Ordering::Greater => return Err(n),
    Ordering::Equal => return Ok(n),
    }
    }
    Err(s.len())
    }
    /// Lookup just forms slices of offsets (for internal nodes) or values
    /// (for leaves), and runs an internal search on them.
    async unsafe fn lookup<'a, K: Storable, V: Storable>(
    page: crate::Page<'a>,
    c: &mut PageCursor,
    k0: &K,
    v0: Option<&V>,
    ) -> Result<usize, usize> {
    let hdr = header(page);
    c.total = hdr.n() as usize;
    c.is_leaf = hdr.is_leaf();
    if c.is_leaf {
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    let s = core::slice::from_raw_parts(
    page.data.as_ptr().add(hdr_size) as *const Tuple<K, V>,
    hdr.n() as usize,
    );
    leaf_linear_search(k0, v0, s)
    } else {
    let s = core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const u64,
    hdr.n() as usize,
    );
    internal_search(k0, v0, s, page.data)
    }
    }
    /// Clone a slice of offsets onto a page. This essentially does what
    /// it says. Note that the leftmost child of a page is not included in
    /// the offset slices, so we don't have to handle it here.
    ///
    /// This should really be in the `Alloc` trait, but Rust doesn't have
    /// associated type constructors, so we can't have an associated type
    /// that's sometimes a slice and sometimes a "Range".
    fn clone<K: Storable, V: Storable, L: Alloc>(
    page: crate::Page,
    new: &mut MutPage,
    s: Offsets,
    n: &mut isize,
    ) {
    match s {
    Offsets::Slice(s) => {
    // We know we're in an internal node here.
    let size = core::mem::size_of::<Tuple<K, V>>();
    for off in s.iter() {
    let off = u64::from_le(*off);
    let r = off & !0xfff;
    let off = off & 0xfff;
    unsafe {
    let ptr = page.data.as_ptr().add(off as usize);
    // Reserve the space on the page
    let hdr = header_mut(new);
    let data = hdr.data() as u16;
    let off_new = data - size as u16;
    hdr.set_data(off_new);
    // Copy the entry from the original page to its
    // position on the new page.
    core::ptr::copy_nonoverlapping(ptr, new.0.data.add(off_new as usize), size);
    // Set the offset to this new entry in the offset
    // array, along with the right child page.
    let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
    *ptr = (r | off_new as u64).to_le();
    }
    *n += 1;
    }
    let hdr = header_mut(new);
    hdr.set_n(hdr.n() + s.len() as u16);
    hdr.incr((8 + size) * s.len());
    }
    Offsets::Range(r) => {
    let size = core::mem::size_of::<Tuple<K, V>>();
    let a = core::mem::align_of::<Tuple<K, V>>();
    let header_size = (HDR + a - 1) & !(a - 1);
    let len = r.len();
    for off in r {
    unsafe {
    let ptr = page.data.as_ptr().add(header_size + off * size);
    let new_ptr = new.0.data.add(header_size + (*n as usize) * size);
    core::ptr::copy_nonoverlapping(ptr, new_ptr, size);
    }
    *n += 1;
    }
    // On leaves, we do have to update everything manually,
    // because we're simply copying stuff.
    let hdr = header_mut(new);
    hdr.set_n(hdr.n() + len as u16);
    hdr.incr(size * len);
    }
    }
    }
  • file addition: page (d--r------)
    [0.12617]
  • file addition: rebalance.rs (----------)
    [0.125961]
    use super::*;
    use core::mem::MaybeUninit;
    pub(crate) async fn rebalance_left<
    'a,
    T: AllocPage,
    K: Storable + core::fmt::Debug,
    V: Storable + core::fmt::Debug,
    L: Alloc,
    >(
    txn: &mut T,
    m: Concat<'a, K, V, Page<K, V>>,
    ) -> Result<Op<'a, T, K, V>, T::Error> {
    assert!(m.mod_is_left);
    // First element of the right page. We'll rotate it to the left
    // page, i.e. return a middle element, and append the current
    // middle element to the left page.
    let rc = super::PageCursor::new(&m.other, 0);
    let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);
    let (k, v, r) = <Page<K, V>>::current(m.other.as_page(), &rc).unwrap();
    let mut freed_ = [0, 0];
    // First, perform the modification on the modified page, which we
    // know is the left page, and return the resulting mutable page
    // (the modified page may or may not be mutable before we do this).
    let new_left = if let Some((k, v)) = m.modified.ins {
    let is_dirty = m.modified.page.is_dirty();
    let page = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
    txn,
    m.modified.page,
    m.modified.mutable,
    m.modified.skip_first,
    &m.modified.c1,
    k,
    v,
    m.modified.ins2,
    m.modified.l,
    m.modified.r,
    ).await? {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b;
    }
    page
    } else {
    unreachable!()
    };
    // Append the middle element of the concatenation at the end
    // of the left page. We know the left page to be mutable by
    // now, and we also know there's enough space to do this.
    let lc = PageCursor::after(&page.0);
    if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
    txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
    ).await? {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    // index 0 is ok here: if a page is freed, it means we
    // just compacted a page, which implies that the call
    // to `put` above didn't free.
    freed_[0] = freed | b
    }
    page
    } else {
    unreachable!()
    }
    } else if m.modified.skip_first {
    // Looking at module `super::del`, the only way we can be in
    // this case
    let is_dirty = m.modified.page.is_dirty();
    let (page, freed) = <Page<K, V>>::del(
    txn,
    m.modified.page,
    m.modified.mutable,
    &m.modified.c1,
    m.modified.l,
    ).await?;
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b
    }
    // Append the middle element of the concatenation at the end
    // of the left page. We know the left page to be mutable by
    // now, and we also know there's enough space to do this.
    let lc = PageCursor::after(&page.0);
    if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
    txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
    ).await? {
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    // index 0 is ok here: if we freed a page in the call
    // to `del` above, it is mutable and we can allocate
    // on it. Else, we just compacted a page, but that
    // also means `del` above didn't free.
    freed_[0] = freed | b
    }
    page
    } else {
    unreachable!()
    }
    } else {
    let is_dirty = m.modified.page.is_dirty();
    let lc = PageCursor::after(&m.modified.page);
    if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
    txn, m.modified.page, m.modified.mutable, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
    ).await? {
    if m.modified.l > 0 {
    assert_eq!(m.modified.r, 0);
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);
    *off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
    }
    } else if m.modified.r > 0 {
    unsafe {
    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
    *off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
    }
    }
    if freed > 0 {
    let b = if is_dirty { 1 } else { 0 };
    freed_[0] = freed | b
    }
    page
    } else {
    unreachable!()
    }
    };
    let f = core::mem::size_of::<Tuple<K, V>>();
    let (new_right, k, v) = if r == 0 && m.other_is_mutable && m.other.is_dirty() {
    // Since `r == 0`, we know this is a leaf. Therefore, we need
    // to rewrite the deleted element to the end of the page, so
    // that the pointer to the new middle entry is valid when this
    // function returns.
    let data = m.other.data;
    let mut other = MutPage(m.other);
    let right_hdr = header_mut(&mut other);
    // Remove the space for one entry.
    let n = right_hdr.n() as usize;
    right_hdr.decr(f);
    right_hdr.set_n((n - 1) as u16);
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    let mut t: MaybeUninit<Tuple<K, V>> = MaybeUninit::uninit();
    unsafe {
    // Save the leftmost element (we can't directly copy it to
    // the end, because the page may be full).
    core::ptr::copy_nonoverlapping(
    data.add(hdr_size) as *mut Tuple<K, V>,
    t.as_mut_ptr(),
    1,
    );
    // Move the n - 1 last elements one entry to the left.
    core::ptr::copy(
    data.add(hdr_size + f) as *const Tuple<K, V>,
    data.add(hdr_size) as *mut Tuple<K, V>,
    n - 1,
    );
    // Copy the last element to the end of the page.
    let last = data.add(hdr_size + f * (n - 1)) as *mut Tuple<K, V>;
    core::ptr::copy_nonoverlapping(t.as_ptr(), last, 1);
    (other, &(&*last).k, &(&*last).v)
    }
    } else {
    // If this isn't a leaf, or isn't mutable, use the standard
    // deletion function, because we know this will leave the
    // pointer to the current entry valid.
    // We extend the pointer's lifetime here, because we know the
    // current deletion (we only rebalance during deletions) won't
    // touch this page anymore after this.
    let k = unsafe { core::mem::transmute(k) };
    let v = unsafe { core::mem::transmute(v) };
    // If this frees the old "other" page, add it to the "freed"
    // array.
    let is_dirty = m.other.is_dirty();
    let (page, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;
    if freed > 0 {
    freed_[1] = if is_dirty { freed | 1 } else { freed };
    }
    (page, k, v)
    };
    Ok(Op::Rebalanced {
    l: new_left.0.offset,
    r: new_right.0.offset,
    k,
    v,
    freed: freed_,
    })
    }
  • file addition: put.rs (----------)
    [0.125961]
    use super::*;
    pub(super) async fn put<
    'a,
    T: AllocPage,
    K: Storable + core::fmt::Debug,
    V: Storable + core::fmt::Debug,
    L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
    >(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    l: u64,
    r: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    let hdr = header(page.as_page());
    let is_dirty = page.is_dirty();
    if mutable && is_dirty && L::can_alloc::<K, V>(hdr) {
    // If the page is mutable (i.e. not shared with another tree)
    // and dirty, here's what we do:
    let mut page = MutPage(page);
    let p = page.0.data;
    let hdr = header_mut(&mut page);
    let mut n = u as isize;
    if replace {
    // If we're replacing a value, this can't be in a leaf,
    // hence the only thing that needs to be done is erasing
    // the offset in the offset array. This is a bit naive,
    // since we're moving that array back and forth.
    assert!(!hdr.is_leaf());
    unsafe {
    let ptr = p.add(HDR + n as usize * 8) as *mut u64;
    core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
    hdr.decr(8 + core::mem::size_of::<Tuple<K, V>>());
    hdr.set_n(hdr.n() - 1);
    }
    }
    // Do the actual insertions.
    L::alloc_write(&mut page, k0, v0, l, r, &mut n);
    // No page was freed, return the page.
    Ok(Put::Ok(Ok { page, freed: 0 }))
    } else if replace || L::can_compact::<K, V>(hdr) {
    // Here, we could allocate if we cloned, so let's clone:
    let mut new = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
    L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);
    // Take the offsets and split it at the cursor position.
    let s = L::offset_slice::<T, K, V>(page.as_page());
    let (s0, s1) = s.split_at(u as usize);
    // If we're replacing, remove the offset that needs to go.
    let s1 = if replace { s1.split_at(1).1 } else { s1 };
    // And then clone the page, inserting the new elements between
    // the two halves of the split offsets.
    let mut n = 0;
    clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
    L::alloc_write(&mut new, k0, v0, l, r, &mut n);
    clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);
    Ok(Put::Ok(Ok {
    page: new,
    freed: if is_dirty {
    page.offset | 1
    } else {
    page.offset
    },
    }))
    } else {
    // Else, split the page.
    split::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r).await
    }
    }
    async fn split<
    'a,
    T: AllocPage,
    K: Storable + core::fmt::Debug,
    V: Storable + core::fmt::Debug,
    L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
    >(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    l: u64,
    r: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    let mut left;
    let hdr = header(page.as_page());
    let page_is_dirty = if page.is_dirty() { 1 } else { 0 };
    let mut n = hdr.n();
    let k = n / 2;
    n += 1;
    let s = L::offset_slice::<T, K, V>(page.as_page());
    let (s0, s1) = s.split_at(k as usize);
    // Find the split entry (i.e. the entry going up in the B
    // tree). Then, mid_child is the right child of the split
    // key/value, and `s1` is the offsets of the right side of the
    // split.
    let (mut split_key, mut split_value, mid_child, s1) = if u == k as usize {
    // The inserted element is exactly in the middle.
    (k0, v0, r, s1)
    } else {
    // Else, the split key is the first element of `s1`: set the
    // new, updated `s1`.
    let (s1a, s1b) = s1.split_at(1);
    let (k, v, r) = L::kv(page.as_page(), L::first::<K, V>(&s1a));
    (k, v, r, s1b)
    };
    let mut freed = 0;
    if u >= k as usize {
    // If we are here, u >= k, i.e. the insertion is in the
    // right-hand side of the split, or is the split entry itself.
    let mut right = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
    if u > k as usize {
    // if we're inserting in the right-hand side, clone to the
    // newly allocated `right` page, by
    let mut n = 0;
    // Splitting the offsets to make space for an insertion,
    // and insert in the middle.
    //
    // Off-by-one error? Nope: since `k` is the split entry,
    // the right page starts at index `k + 1`, hence if `u ==
    // k + 1`, `kk` must be 0.
    let kk = u as usize - k as usize - 1;
    let (s1a, s1b) = s1.split_at(kk);
    L::set_right_child(&mut right, -1, mid_child);
    clone::<K, V, L>(page.as_page(), &mut right, s1a, &mut n);
    L::alloc_write(&mut right, k0, v0, l, r, &mut n);
    clone::<K, V, L>(page.as_page(), &mut right, s1b, &mut n);
    } else {
    // Else, just clone the page, we'll take care of returning
    // the split entry afterwards.
    clone::<K, V, L>(page.as_page(), &mut right, s1, &mut 0);
    }
    if mutable && page.is_dirty() {
    // (k0, v0) is to be inserted on the right-hand side of
    // the split, hence we don't have to clone the left-hand
    // side, we can just truncate it.
    left = MutPage(page);
    let hdr = header_mut(&mut left);
    hdr.set_n(k);
    hdr.decr((n - 1 - k) as usize * core::mem::size_of::<Tuple<K, V>>());
    } else {
    // Else, we need to clone the first `k-1` entries,
    // i.e. `s0`, onto a new left page.
    left = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
    let mut n = 0;
    L::set_right_child(&mut left, -1, hdr.left_page() & !0xfff);
    clone::<K, V, L>(page.as_page(), &mut left, s0, &mut n);
    freed = page.offset | page_is_dirty
    }
    // Finally, if `u` is the middle element, its left and right
    // children become the leftmost child of the right page, and
    // the rightmost child of the left page, respectively.
    if u == k as usize {
    // Insertion in the middle:
    // - `l` becomes the right child of the last element on `left`.
    L::set_right_child(&mut left, k as isize - 1, l);
    // - `r` (i.e. `mid_child`) becomes the left child of `right`.
    L::set_right_child(&mut right, -1, mid_child);
    }
    Ok(Put::Split {
    split_key,
    split_value,
    left,
    right,
    freed,
    })
    } else {
    // Else, the insertion is in the new left page. We first clone
    // the left page, inserting (k,v) at its position:
    left = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
    // Clone the leftmost page
    let ll = header(page.as_page()).left_page() & !0xfff;
    L::set_right_child(&mut left, -1, ll);
    // Clone the two sides, with an entry in between.
    let (s0a, s0b) = s0.split_at(u as usize);
    let mut n = 0;
    clone::<K, V, L>(page.as_page(), &mut left, s0a, &mut n);
    L::alloc_write(&mut left, k0, v0, l, r, &mut n);
    clone::<K, V, L>(page.as_page(), &mut left, s0b, &mut n);
    let mut right;
    let freed;
    // Then, for the right page:
    if mutable && page.is_dirty() {
    // If we can mutate the original page, remove the first
    // `k` entries, and return a pointer to the split key (at
    // position `k` on the page)
    right = MutPage(page);
    if let Some((k, v)) = L::truncate_left::<K, V>(&mut right, k as usize + 1) {
    unsafe {
    split_key = &*k;
    split_value = &*v;
    }
    }
    freed = 0;
    } else {
    // Else, clone the right page.
    right = txn.alloc_page().await?;
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
    // The left child of the right page is `mid_child`,
    // i.e. the right child of the split entry.
    L::set_right_child(&mut right, -1, mid_child);
    // Clone the rest of the page.
    let mut n = 0;
    clone::<K, V, L>(page.as_page(), &mut right, s1, &mut n);
    freed = page.offset | page_is_dirty
    }
    Ok(Put::Split {
    split_key,
    split_value,
    left,
    right,
    freed,
    })
    }
    }
  • file addition: alloc.rs (----------)
    [0.125961]
    use super::*;
    /// This trait contains methods common to leaves and internal
    /// nodes. These methods are meant to be just a few lines long each,
    /// and avoid duplicating code in higher-level functions where just a
    /// few constants change between internal nodes and leaves.
    pub(crate) trait Alloc {
    /// Is there enough room to add one entry into this page (without cloning)?
    fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool;
    /// If we clone the page, will there be enough room for one entry?
    fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool;
    /// Remove the leftmost `n` elements from the page. On leaves,
    /// this moves the last truncated element to the end of the page
    /// and returns a pointer to that element (this function is only
    /// used in `crate::btree::put`, and the pointer becomes invalid
    /// at the end of the `crate::btree::put`).
    ///
    /// Returning the last deleted element isn't required for internal
    /// nodes, since the entries are still valid there.
    fn truncate_left<K: Storable, V: Storable>(
    page: &mut MutPage,
    n: usize,
    ) -> Option<(*const K, *const V)>;
    /// Allocate a new entry (size inferred), and return the offset on
    /// the page where the new entry can be copied.
    fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize;
    /// Set the right child of the `n`th entry to `r`. If `n == -1`,
    /// this method can be used to set the leftmost child of a page.
    fn set_right_child(new: &mut MutPage, n: isize, r: u64);
    /// "Slices" of offsets, which aren't really slices in the case of
    /// leaves, but just intervals or "ranges".
    fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(
    page: crate::Page<'a>,
    ) -> Offsets<'a>;
    /// First element of a slice offset. For the sake of code
    /// simplicity, we return a `u64` even for leaves. In internal
    /// nodes, the 52 MSBs encode a child page, and the 12 LSBs encode
    /// an offset in the page.
    fn first<'a, K, V>(off: &Offsets<'a>) -> u64;
    /// The tuple and right child at offset `off`.
    fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64);
    }
    /// The type of offsets.
    #[derive(Debug, Clone)]
    pub enum Offsets<'a> {
    /// Slices of child pages and offset-in-page for internal nodes,
    Slice(&'a [u64]),
    /// Ranges for leaves.
    Range(core::ops::Range<usize>),
    }
    impl<'a> Offsets<'a> {
    /// Splitting an offset slice, with the same semantics as
    /// `split_at` for slices, except if `mid` is equal to the length,
    /// in which case we return `(self, &[])`.
    pub fn split_at(&self, mid: usize) -> (Self, Self) {
    match self {
    Offsets::Slice(s) if mid < s.len() => {
    let (a, b) = s.split_at(mid);
    (Offsets::Slice(a), Offsets::Slice(b))
    }
    Offsets::Slice(s) => {
    debug_assert_eq!(mid, s.len());
    (Offsets::Slice(s), Offsets::Slice(&[][..]))
    }
    Offsets::Range(r) => (
    Offsets::Range(r.start..r.start + mid),
    Offsets::Range(r.start + mid..r.end),
    ),
    }
    }
    }
    pub struct Leaf {}
    pub struct Internal {}
    impl Alloc for Leaf {
    // Checking if there's enough room is just bookkeeping.
    fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {
    let f = core::mem::size_of::<Tuple<K, V>>();
    let al = core::mem::align_of::<Tuple<K, V>>();
    let header_size = (HDR + al - 1) & !(al - 1);
    header_size + (hdr.n() as usize + 1) * f <= PAGE_SIZE
    }
    /// There's no possible compaction on leaves, it's the same as allocating.
    fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {
    Self::can_alloc::<K, V>(hdr)
    }
    fn set_right_child(_: &mut MutPage, _: isize, _: u64) {}
    fn offset_slice<'a, T: LoadPage, K: Storable, V: Storable>(
    page: crate::Page<'a>,
    ) -> Offsets<'a> {
    let hdr = header(page);
    Offsets::Range(0..(hdr.n() as usize))
    }
    /// This returns an offset on the page, so we need to compute that.
    fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {
    match off {
    Offsets::Slice(_) => unreachable!(),
    Offsets::Range(r) => {
    let size = core::mem::size_of::<Tuple<K, V>>();
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    (hdr_size + r.start * size) as u64
    }
    }
    }
    fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {
    unsafe {
    let tup = &*(page.data.as_ptr().add(off as usize) as *const Tuple<K, V>);
    (&tup.k, &tup.v, 0)
    }
    }
    /// Here, we just move `total - *n` elements to the right, and
    /// increase the bookkeeping values (occupied bytes and number of
    /// elements).
    fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, _: u64) -> usize {
    let f = core::mem::size_of::<Tuple<K, V>>();
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    let hdr_n = header_mut(new).n();
    unsafe {
    core::ptr::copy(
    new.0.data.add(hdr_size + (*n as usize) * f),
    new.0.data.add(hdr_size + (*n as usize) * f + f),
    (hdr_n as usize - (*n as usize)) * f,
    );
    }
    let hdr = header_mut(new);
    hdr.set_n(hdr.n() + 1);
    hdr.incr(f);
    hdr_size + (*n as usize) * f
    }
    fn truncate_left<K: Storable, V: Storable>(
    page: &mut MutPage,
    n: usize,
    ) -> Option<(*const K, *const V)> {
    let f = core::mem::size_of::<Tuple<K, V>>();
    let al = core::mem::align_of::<Tuple<K, V>>();
    let hdr_size = (HDR + al - 1) & !(al - 1);
    let hdr = header_mut(page);
    let hdr_n = hdr.n();
    hdr.set_n(hdr_n - n as u16);
    hdr.decr(n * f);
    // Now, this is a bit trickier. This performs a rotation by 1
    // without all the rotation machinery from the Rust core
    // library.
    //
    // Indeed, since we're in a leaf, we are effectively erasing
    // the split entry. There is a choice to be made here, between
    // passing the entry by value or by reference. Because splits
    // might cascade with the same middle entry, passing it by
    // value may be significantly worse if the tree is deep, and
    // is never significantly better (we'll end up copying this
    // entry twice anyway).
    unsafe {
    let mut swap: core::mem::MaybeUninit<Tuple<K, V>> = core::mem::MaybeUninit::uninit();
    core::ptr::copy(
    page.0.data.add(hdr_size + (n - 1) * f),
    swap.as_mut_ptr() as *mut u8,
    f,
    );
    core::ptr::copy(
    page.0.data.add(hdr_size + n * f),
    page.0.data.add(hdr_size),
    (hdr_n as usize - n) * f,
    );
    core::ptr::copy(
    swap.as_ptr() as *mut u8,
    page.0.data.add(hdr_size + (hdr_n as usize - n) * f),
    f,
    );
    let tup =
    &*(page.0.data.add(hdr_size + (hdr_n as usize - n) * f) as *const Tuple<K, V>);
    Some((&tup.k, &tup.v))
    }
    }
    }
    impl Alloc for Internal {
    fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {
    (HDR as usize) + (hdr.n() as usize + 1) * 8 + core::mem::size_of::<Tuple<K, V>>()
    < hdr.data() as usize
    }
    fn can_compact<K: Storable, V: Storable>(hdr: &Header) -> bool {
    (HDR as usize)
    + ((hdr.left_page() & 0xfff) as usize)
    + 8
    + core::mem::size_of::<Tuple<K, V>>()
    <= PAGE_SIZE
    }
    /// Set the right-hand child in the offset array, preserving the
    /// current offset.
    fn set_right_child(page: &mut MutPage, n: isize, r: u64) {
    unsafe {
    debug_assert_eq!(r & 0xfff, 0);
    let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
    let off = u64::from_le(*ptr) & 0xfff;
    *ptr = (r | off as u64).to_le();
    }
    }
    fn offset_slice<'a, T, K: Storable, V: Storable>(page: crate::Page<'a>) -> Offsets<'a> {
    let hdr = header(page);
    unsafe {
    Offsets::Slice(core::slice::from_raw_parts(
    page.data.as_ptr().add(HDR) as *const u64,
    hdr.n() as usize,
    ))
    }
    }
    fn first<'a, K, V>(off: &Offsets<'a>) -> u64 {
    match off {
    Offsets::Slice(s) => s[0],
    Offsets::Range(_) => unreachable!(),
    }
    }
    fn kv<'a, K: Storable, V: Storable>(page: crate::Page, off: u64) -> (&'a K, &'a V, u64) {
    unsafe {
    let tup = &*(page.data.as_ptr().add((off & 0xfff) as usize) as *const Tuple<K, V>);
    (&tup.k, &tup.v, off & !0xfff)
    }
    }
    // Much simpler than in leaves, because we just need to move the
    // offsets. There's a little bit of bookkeeping involved though.
    fn truncate_left<K: Storable, V: Storable>(
    page: &mut MutPage,
    n: usize,
    ) -> Option<(*const K, *const V)> {
    let hdr_n = header_mut(page).n();
    unsafe {
    // We want to copy the left child of the first entry that
    // will be kept alive as the left child of the page.
    core::ptr::copy(
    page.0.data.add(HDR + (n - 1) * 8),
    page.0.data.add(HDR - 8),
    // We're removing n entries, so we need to copy the
    // remaining `hdr_n - n`, plus the leftmost child
    // (hence the + 1).
    (hdr_n as usize - n + 1) * 8,
    );
    }
    // Remaining occupied size on the page (minus the header).
    let size = (8 + core::mem::size_of::<Tuple<K, V>>()) * (hdr_n as usize - n);
    let hdr = header_mut(page);
    hdr.set_n(hdr_n - n as u16);
    // Because the leftmost child has been copied from an entry
    // containing an offset, its 12 LBSs are still enconding an
    // offset on the page, whereas it should encode the occupied
    // bytes on the page. Reset it.
    hdr.set_occupied(size as u64);
    None
    }
    fn alloc_insert<K: Storable, V: Storable>(new: &mut MutPage, n: &mut isize, r: u64) -> usize {
    let hdr = header_mut(new);
    // Move the `data` field one entry to the left.
    let data = hdr.data() as u16;
    let off_new = data - core::mem::size_of::<Tuple<K, V>>() as u16;
    hdr.set_data(off_new);
    // Increment the number of entries, add the newly occupied size.
    hdr.set_n(hdr.n() + 1);
    hdr.incr(8 + core::mem::size_of::<Tuple<K, V>>());
    let hdr_n = hdr.n();
    unsafe {
    // Make space in the offset array by moving the last
    // `hdr_n - *n` elements.
    core::ptr::copy(
    new.0.data.add(HDR + (*n as usize) * 8),
    new.0.data.add(HDR + (*n as usize) * 8 + 8),
    (hdr_n as usize - (*n as usize)) * 8,
    );
    // Add the offset.
    let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
    *ptr = (r | off_new as u64).to_le();
    }
    off_new as usize
    }
    }
    impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Internal {
    fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
    alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
    }
    fn set_left_child(new: &mut MutPage, n: isize, l: u64) {
    if l > 0 {
    Internal::set_right_child(new, n - 1, l);
    }
    }
    }
    impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Leaf {
    fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
    alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
    }
    fn set_left_child(_new: &mut MutPage, _n: isize, l: u64) {
    debug_assert_eq!(l, 0);
    }
    }
    /// Simply allocate an entry in the page, copy it, and set its right
    /// and left children.
    fn alloc_write<K: Storable, V: Storable, L: Alloc>(
    new: &mut MutPage,
    k0: &K,
    v0: &V,
    l: u64,
    r: u64,
    n: &mut isize,
    ) {
    let off_new = L::alloc_insert::<K, V>(new, n, r);
    unsafe {
    let new_ptr = &mut *(new.0.data.add(off_new as usize) as *mut Tuple<K, V>);
    core::ptr::copy_nonoverlapping(k0, &mut new_ptr.k, 1);
    core::ptr::copy_nonoverlapping(v0, &mut new_ptr.v, 1);
    }
    if l > 0 {
    L::set_right_child(new, *n - 1, l);
    }
    *n += 1;
    }
  • file addition: mod.rs (----------)
    [0.12617]
    //! An implementation of B trees. The core operations on B trees
    //! (lookup, iterate, put and del) are generic in the actual
    //! implementation of nodes, via the [`BTreePage`] and
    //! [`BTreeMutPage`]. This allows for a simpler code for the
    //! high-level functions, as well as specialised, high-performance
    //! implementations for the nodes.
    //!
    //! Two different implementations are supplied: one in [`page`] for
    //! types with a size known at compile-time, which yields denser
    //! leaves, and hence shallower trees (if the values are really using
    //! the space). The other one, in [`page_unsized`], is for
    //! dynamic-sized types, or types whose representation is dynamic, for
    //! example enums that are `Sized` in Rust, but whose cases vary
    //! widely in size.
    use crate::*;
    #[doc(hidden)]
    pub mod cursor;
    pub use cursor::{iter, rev_iter, Cursor, Iter, RevIter};
    pub mod del;
    pub use del::{del, del_no_decr};
    pub mod put;
    pub use put::put;
    pub mod page;
    pub mod page_unsized;
    #[async_trait(?Send)]
    pub trait BTreePage<K: ?Sized, V: ?Sized>: Sized {
    type Cursor: Send + Sync + Clone + Copy + core::fmt::Debug;
    /// Whether this cursor is at the end of the page.
    fn is_empty(c: &Self::Cursor) -> bool;
    /// Whether this cursor is strictly before the first element.
    fn is_init(c: &Self::Cursor) -> bool;
    /// Returns a cursor set before the first element of the page
    /// (i.e. set to -1).
    fn cursor_before(p: &CowPage) -> Self::Cursor;
    /// Returns a cursor set to the first element of the page
    /// (i.e. 0). If the page is empty, the returned cursor might be
    /// empty.
    fn cursor_first(p: &CowPage) -> Self::Cursor {
    let mut c = Self::cursor_before(p);
    Self::move_next(&mut c);
    c
    }
    /// Returns a cursor set after the last element of the page
    /// (i.e. to element n)
    fn cursor_after(p: &CowPage) -> Self::Cursor;
    /// Returns a cursor set to the last element of the page. If the
    /// cursor is empty, this is the same as `cursor_before`.
    fn cursor_last(p: &CowPage) -> Self::Cursor {
    let mut c = Self::cursor_after(p);
    Self::move_prev(&mut c);
    c
    }
    /// Return the element currently pointed to by the cursor (if the
    /// cursor is not before or after the elements of the page), and
    /// move the cursor to the next element.
    fn next<'b>(
    p: Page<'b>,
    c: &mut Self::Cursor,
    ) -> Option<(&'b K, &'b V, u64)> {
    if let Some((k, v, r)) = Self::current(p, c) {
    Self::move_next(c);
    Some((k, v, r))
    } else {
    None
    }
    }
    /// Move the cursor to the previous element, and return that
    /// element. Note that this is not the symmetric of `next`.
    fn prev<'b>(
    p: Page<'b>,
    c: &mut Self::Cursor,
    ) -> Option<(&'b K, &'b V, u64)> {
    if Self::move_prev(c) {
    Self::current(p, c)
    } else {
    None
    }
    }
    /// Move the cursor to the next position. Returns whether the
    /// cursor was actually moved (i.e. `true` if and only if the
    /// cursor isn't already after the last element).
    fn move_next(c: &mut Self::Cursor) -> bool;
    /// Move the cursor to the previous position. Returns whether the
    /// cursor was actually moved (i.e. `true` if and only if the
    /// cursor isn't strictly before the page).
    fn move_prev(c: &mut Self::Cursor) -> bool;
    /// Returns the current element, if the cursor is pointing at one.
    fn current<'a>(
    p: Page<'a>,
    c: &Self::Cursor,
    ) -> Option<(&'a K, &'a V, u64)>;
    /// Returns the left child of the entry pointed to by the cursor.
    fn left_child(p: Page, c: &Self::Cursor) -> u64;
    /// Returns the right child of the entry pointed to by the cursor.
    fn right_child(p: Page, c: &Self::Cursor) -> u64;
    /// Sets the cursor to the last element less than or equal to `k0`
    /// if `v0.is_none()`, and to `(k0, v0)` if `v0.is_some()`. If a
    /// match is found (on `k0` only if `v0.is_none()`, on `(k0, v0)`
    /// else), return the match along with the right child.
    ///
    /// Else (in the `Err` case of the `Result`), return the position
    /// at which `(k0, v0)` can be inserted.
    async fn set_cursor<'a, 'b, T: LoadPage>(
    txn: &'a T,
    page: Page<'b>,
    c: &mut Self::Cursor,
    k0: &K,
    v0: Option<&V>,
    ) -> Result<(&'a K, &'a V, u64), usize>;
    /// Splits the cursor into two cursors: the elements strictly
    /// before the current position, and the elements greater than or
    /// equal the current element.
    fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor);
    }
    pub struct PageIterator<'a, T: LoadPage, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    cursor: P::Cursor,
    _txn: &'a T,
    page: Page<'a>,
    }
    impl<'a, T: LoadPage, K: ?Sized + 'a, V: ?Sized + 'a, P: BTreePage<K, V>> Iterator
    for PageIterator<'a, T, K, V, P>
    {
    type Item = (&'a K, &'a V, u64);
    fn next(&mut self) -> Option<Self::Item> {
    P::next(self.page, &mut self.cursor)
    }
    }
    #[async_trait(?Send)]
    pub trait BTreeMutPage<K: ?Sized, V: ?Sized>: BTreePage<K, V> + core::fmt::Debug {
    /// Initialise a page.
    fn init(page: &mut MutPage);
    /// Add an entry to the page, possibly splitting the page in the
    /// process.
    ///
    /// Makes the assumption that `k1v1.is_some()` implies
    /// `replace`. When `k1v1.is_some()`, we insert both `(k0, v0)`
    /// (as a replacement), followed by `(k1, v1)`. This is only ever
    /// used when deleting, and when the right child of a deleted
    /// entry (in an internal node) has split while we were looking
    /// for a replacement for the deleted entry.
    ///
    /// Since we only look for replacements in the right child, the
    /// left child of the insertion isn't modified, in which case `l`
    /// and `r` are interpreted as the left and right child of the new
    /// entry, `k1v1`.
    async fn put<'a, T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    c: &Self::Cursor,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l: u64,
    r: u64,
    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error>;
    /// Add an entry to `page`, at position `c`. Does not check
    /// whether there is enough space to do so. This method is mostly
    /// useful for cloning pages.
    #[allow(unused_variables)]
    unsafe fn put_mut(
    page: &mut MutPage,
    c: &mut Self::Cursor,
    k0: &K,
    v0: &V,
    r: u64,
    );
    #[allow(unused_variables)]
    unsafe fn set_left_child(
    page: &mut MutPage,
    c: &Self::Cursor,
    l: u64
    );
    /// Update the left child of the position pointed to by the
    /// cursor.
    async fn update_left_child<T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    c: &Self::Cursor,
    r: u64,
    ) -> Result<crate::btree::put::Ok, T::Error>;
    type Saved;
    /// Save a leaf entry as a replacement, when we delete at an
    /// internal node. This can be a pointer to the leaf if the leaf
    /// isn't mutated, or the actual value, or something else.
    fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved;
    /// Recover the saved value.
    unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V);
    /// Delete an entry on the page, returning the resuting page along
    /// with the offset of the freed page (or 0 if no page was freed,
    /// i.e. if `page` is mutable).
    async fn del<T: AllocPage>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    c: &Self::Cursor,
    l: u64,
    ) -> Result<(MutPage, u64), T::Error>;
    /// Merge, rebalance or update a concatenation.
    async fn merge_or_rebalance<'a, T: AllocPage>(
    txn: &mut T,
    m: del::Concat<'a, K, V, Self>,
    ) -> Result<del::Op<'a, T, K, V>, T::Error>;
    }
    /// A database, which is essentially just a page offset along with markers.
    #[derive(Debug)]
    pub struct Db_<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    pub db: u64,
    pub k: core::marker::PhantomData<K>,
    pub v: core::marker::PhantomData<V>,
    pub p: core::marker::PhantomData<P>,
    }
    unsafe impl<K, V, P: BTreePage<K, V>> Sync for Db_<K, V, P>{}
    /// A database of sized values.
    pub type Db<K, V> = Db_<K, V, page::Page<K, V>>;
    #[async_trait(?Send)]
    impl<K:Storable, V:Storable, P: BTreePage<K, V> + Send + core::fmt::Debug> Storable for Db_<K, V, P> {
    type PageReferences = core::iter::Once<u64>;
    fn page_references(&self) -> Self::PageReferences {
    core::iter::once(self.db)
    }
    async fn drop<T: AllocPage>(&self, t: &mut T) -> Result<(), T::Error> {
    drop_(t, self).await
    }
    }
    /// A database of unsized values.
    pub type UDb<K, V> = Db_<K, V, page_unsized::Page<K, V>>;
    impl<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> Db_<K, V, P> {
    /// Load a database from a page offset.
    pub fn from_page(db: u64) -> Self {
    Db_ {
    db,
    k: core::marker::PhantomData,
    v: core::marker::PhantomData,
    p: core::marker::PhantomData,
    }
    }
    }
    /// Create a database with an arbitrary page implementation.
    pub async fn create_db_<T: AllocPage, K: ?Sized, V: ?Sized, P: BTreeMutPage<K, V>>(
    txn: &mut T,
    ) -> Result<Db_<K, V, P>, T::Error> {
    let mut page = txn.alloc_page().await?;
    P::init(&mut page);
    Ok(Db_ {
    db: page.0.offset,
    k: core::marker::PhantomData,
    v: core::marker::PhantomData,
    p: core::marker::PhantomData,
    })
    }
    /// Create a database for sized keys and values.
    pub async fn create_db<T: AllocPage, K: Storable, V: Storable>(
    txn: &mut T,
    ) -> Result<Db_<K, V, page::Page<K, V>>, T::Error> {
    create_db_(txn).await
    }
    /// Fork a database.
    pub async fn fork_db<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
    txn: &mut T,
    db: &Db_<K, V, P>,
    ) -> Result<Db_<K, V, P>, T::Error> {
    txn.incr_rc(db.db).await?;
    Ok(Db_ {
    db: db.db,
    k: core::marker::PhantomData,
    v: core::marker::PhantomData,
    p: core::marker::PhantomData,
    })
    }
    /// Get the first entry of a database greater than or equal to `k` (or
    /// to `(k, v)` if `v.is_some()`), and return `true` if this entry is
    /// shared with another structure (i.e. the RC of at least one page
    /// along a path to the entry is at least 2).
    pub async fn get_shared<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(
    txn: &'a T,
    db: &Db_<K, V, P>,
    k: &K,
    v: Option<&V>,
    ) -> Result<Option<(&'a K, &'a V, bool)>, T::Error> {
    // Set the "cursor stack" by setting a skip list cursor in
    // each page from the root to the appropriate leaf.
    let mut last_match = None;
    let mut page = txn.load_page(db.db).await?;
    let mut is_shared = txn.rc(db.db).await? >= 2;
    // This is a for loop, to allow the compiler to unroll (maybe).
    for _ in 0..cursor::N_CURSORS {
    let mut cursor = P::cursor_before(&page);
    if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {
    if v.is_some() {
    return Ok(Some((kk, vv, is_shared)));
    }
    last_match = Some((kk, vv, is_shared));
    } else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {
    // Here, Rust says that `k` and `v` have the same lifetime
    // as `page`, but we actually know that they're alive for
    // as long as `txn` doesn't change, so we can safely
    // extend their lifetimes:
    unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v), is_shared)) }
    }
    // The cursor is set to the first element greater than or
    // equal to the (k, v) we're looking for, so we need to
    // explore the left subtree.
    let next_page = P::left_child(page.as_page(), &cursor);
    if next_page > 0 {
    page = txn.load_page(next_page).await?;
    is_shared |= txn.rc(db.db).await? >= 2;
    } else {
    break;
    }
    }
    Ok(last_match)
    }
    /// Get the first entry of a database greater than or equal to `k` (or
    /// to `(k, v)` if `v.is_some()`), and return `true` if this entry is
    /// shared with another structure (i.e. the RC of at least one page
    /// along a path to the entry is at least 2).
    pub async fn get<'a, T: LoadPage, K: 'a + Storable + ?Sized, V: 'a + Storable + ?Sized, P: BTreePage<K, V>>(
    txn: &'a T,
    db: &Db_<K, V, P>,
    k: &K,
    v: Option<&V>,
    ) -> Result<Option<(&'a K, &'a V)>, T::Error> {
    // Unfortunately, since the above function `get_shared` uses this
    // one in order to get the RC of pages, we can't really refactor,
    // so this is mostly a copy of the other function.
    // Set the "cursor stack" by setting a skip list cursor in
    // each page from the root to the appropriate leaf.
    let mut last_match = None;
    let mut page = txn.load_page(db.db).await?;
    // This is a for loop, to allow the compiler to unroll (maybe).
    for _ in 0..cursor::N_CURSORS {
    let mut cursor = P::cursor_before(&page);
    if let Ok((kk, vv, _)) = P::set_cursor(txn, page.as_page(), &mut cursor, k, v).await {
    if v.is_some() {
    return Ok(Some((kk, vv)));
    }
    last_match = Some((kk, vv));
    } else if let Some((k, v, _)) = P::current(page.as_page(), &cursor) {
    // Here, Rust says that `k` and `v` have the same lifetime
    // as `page`, but we actually know that they're alive for
    // as long as `txn` doesn't change, so we can safely
    // extend their lifetimes:
    unsafe { last_match = Some((core::mem::transmute(k), core::mem::transmute(v))) }
    }
    // The cursor is set to the first element greater than or
    // equal to the (k, v) we're looking for, so we need to
    // explore the left subtree.
    let next_page = P::left_child(page.as_page(), &cursor);
    if next_page > 0 {
    page = txn.load_page(next_page).await?;
    } else {
    break;
    }
    }
    Ok(last_match)
    }
    /// Drop a database recursively, dropping all referenced keys and
    /// values that aren't shared with other databases.
    pub async fn drop<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(
    txn: &mut T,
    db: Db_<K, V, P>,
    ) -> Result<(), T::Error> {
    drop_(txn, &db).await
    }
    use core::mem::MaybeUninit;
    struct PageCursor_<K:?Sized, V:?Sized, P: BTreePage<K, V>>(MaybeUninit<cursor::PageCursor<K, V, P>>);
    unsafe impl<K: ?Sized, V:?Sized, P: BTreePage<K, V>> Send for PageCursor_<K, V, P> {}
    async fn drop_<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>(
    txn: &mut T,
    db: &Db_<K, V, P>,
    ) -> Result<(), T::Error> {
    let mut stack: [PageCursor_<K, V, P>; cursor::N_CURSORS] = [
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    PageCursor_(MaybeUninit::uninit()),
    ];
    let mut ptr = 0;
    // Push the root page of `db` onto the stack.
    let page = txn.load_page(db.db).await?;
    stack[0] = PageCursor_(MaybeUninit::new(cursor::PageCursor {
    cursor: P::cursor_before(&page),
    page,
    }));
    // Then perform a DFS:
    loop {
    // Look at the top element of the stack.
    let cur_off = unsafe { (&*stack[ptr].0.as_ptr()).page.offset };
    // If it needs to be dropped (i.e. if its RC is <= 1), iterate
    // its cursor and drop all its referenced pages.
    let rc = txn.rc(cur_off).await?;
    if rc <= 1 {
    // if there's a current element in the cursor (i.e. we
    // aren't before or after the elements), decrease its RC.
    if let Some((k, v, _)) = {
    let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
    P::current(cur.page.as_page(), &cur.cursor)
    } {
    k.drop(txn).await?;
    v.drop(txn).await?;
    }
    // In all cases, move next and push the left child onto
    // the stack. This works because pushed cursors are
    // initially set to before the page's elements.
    let r = {
    let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
    if P::move_next(&mut cur.cursor) {
    Some(P::left_child(cur.page.as_page(), &cur.cursor))
    } else {
    None
    }
    };
    if let Some(r) = r {
    if r > 0 {
    ptr += 1;
    let page = txn.load_page(r).await?;
    stack[ptr] = PageCursor_(MaybeUninit::new(cursor::PageCursor {
    cursor: P::cursor_before(&page),
    page,
    }))
    }
    continue;
    }
    }
    // Here, either rc > 1 (in which case the only thing we need
    // to do in this iteration is to decrement the RC), or else
    // `P::move_next` returned `false`, meaning that the cursor is
    // after the last element (in which case we are done with this
    // page, and also need to decrement its RC, in order to free
    // it).
    let (is_dirty, offset) = {
    let cur = unsafe { &mut *stack[ptr].0.as_mut_ptr() };
    (cur.page.is_dirty(), cur.page.offset)
    };
    if is_dirty {
    txn.decr_rc_owned(offset).await?;
    } else {
    txn.decr_rc(offset).await?;
    }
    // If this was the bottom element of the stack, stop, else, pop.
    if ptr == 0 {
    break;
    } else {
    ptr -= 1;
    }
    }
    Ok(())
    }
  • file addition: del.rs (----------)
    [0.12617]
    //! Deletions from a B tree.
    use super::cursor::*;
    use super::put::{Ok, Put};
    use super::*;
    use crate::LoadPage;
    /// Represents the result of a merge or rebalance, without touching
    /// the parent of the merge/rebalance.
    #[derive(Debug)]
    pub enum Op<'a, T, K: ?Sized, V: ?Sized> {
    Merged {
    // New merged page.
    page: MutPage,
    // Pages freed by the merge (0 meaning "no page").
    freed: [u64; 2],
    marker: core::marker::PhantomData<T>,
    },
    Rebalanced {
    // New middle key.
    k: &'a K,
    // New middle value.
    v: &'a V,
    // New left page.
    l: u64,
    // New right page.
    r: u64,
    // Pages freed by the rebalance (0 meaning "no page").
    freed: [u64; 2],
    },
    Put(crate::btree::put::Put<'a, K, V>),
    }
    /// Represents a page with modifications from a merge.
    #[derive(Debug)]
    pub struct ModifiedPage<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    pub page: CowPage,
    // Whether the page is mutable with another tree.
    pub mutable: bool,
    // Start copying c0 (keep `page`'s left child).
    pub c0: P::Cursor,
    // If > 0, replace the right child of c0's last element with `l`.
    pub l: u64,
    // If > 0, replace the left child of c1's last element with `r`.
    pub r: u64,
    // Possibly insert a new binding.
    pub ins: Option<(&'a K, &'a V)>,
    // If a rebalance causes a split, we might need to insert an entry
    // after the replacement.
    pub ins2: Option<(&'a K, &'a V)>,
    // The first element of c1 is to be deleted, the others must be copied.
    pub c1: P::Cursor,
    // Whether to skip `c1`'s first element.
    pub skip_first: bool,
    // Whether the page we just modified is `self.l`.
    pub mod_is_left: bool,
    }
    /// Represents the concatenation of a modified page and one of its
    /// sibling (left or right).
    #[derive(Debug)]
    pub struct Concat<'a, K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    /// Modified page.
    pub modified: ModifiedPage<'a, K, V, P>,
    /// Middle element.
    pub mid: (&'a K, &'a V),
    /// Sibling of the modified page.
    pub other: CowPage,
    /// Is the modified field on the left or on the right of the
    /// concatenation?
    pub mod_is_left: bool,
    /// Is the other page owned by this tree? If not, it means `other`
    /// is shared with another tree, and hence we need to increase the
    /// reference count of entries coming from `other`.
    pub other_is_mutable: bool,
    }
    /// If `value` is `None`, delete the first entry for `key` from the
    /// database, if present. Else, delete the entry for `(key, value)`,
    /// if present.
    pub async fn del<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized + PartialEq + Sync,
    V: Storable + ?Sized + PartialEq + Sync,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    key: &K,
    value: Option<&V>,
    ) -> Result<bool, T::Error> {
    let mut cursor = Cursor::new(txn, db).await?;
    // If the key and value weren't found, return.
    match (cursor.set(txn, key, value).await?, value) {
    (Some((k, v)), Some(value)) if k == key && v == value => {}
    (Some((k, _)), None) if k == key => {}
    _ => return Ok(false),
    }
    // Else, the cursor is set on `(key, value)` if `value.is_some()`
    // or on the first entry with key `key` else.
    //
    // We delete that position, which might be in a leaf or in an
    // internal node.
    del_at_cursor(txn, db, &mut cursor, true).await
    }
    /// If `value` is `None`, delete the first entry for `key` from the
    /// database, if present. Else, delete the entry for `(key, value)`,
    /// if present.
    pub async fn del_no_decr<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized + PartialEq + Sync,
    V: Storable + ?Sized + PartialEq + Sync,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    key: &K,
    value: Option<&V>,
    ) -> Result<bool, T::Error> {
    let mut cursor = Cursor::new(txn, db).await?;
    // If the key and value weren't found, return.
    match (cursor.set(txn, key, value).await?, value) {
    (Some((k, v)), Some(value)) if k == key && v == value => {}
    (Some((k, _)), None) if k == key => {}
    _ => return Ok(false),
    }
    // Else, the cursor is set on `(key, value)` if `value.is_some()`
    // or on the first entry with key `key` else.
    //
    // We delete that position, which might be in a leaf or in an
    // internal node.
    del_at_cursor(txn, db, &mut cursor, false).await
    }
    /// Delete the entry pointed to by `cursor`. The cursor can't be used
    /// after this.
    #[doc(hidden)]
    pub async fn del_at_cursor<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized + PartialEq + Sync,
    V: Storable + ?Sized + Sync,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    cursor: &mut Cursor<K, V, P>,
    decr_del_rc: bool,
    ) -> Result<bool, T::Error> {
    let p0 = cursor.len();
    // If p0 < cursor.first_rc_level, the page containing the deleted
    // element is not shared with another table. Therefore, the RC of
    // the pages referenced by the deleted element needs to be
    // decreased.
    //
    // Else, that RC doesn't need to be changed, since it's still
    // referenced by the same pages as before, just not by the pages
    // that are cloned by this deletion.
    //
    // Note in particular that if p0 == cursor.first_rc_len(), then we
    // don't need to touch the RC, since we're cloning this page, but
    // without including a reference to the deleted entry.
    if decr_del_rc && p0 < cursor.first_rc_len() {
    let (delk, delv, _) = {
    let cur = cursor.cur();
    P::current(cur.page.as_page(), &cur.cursor).unwrap()
    };
    delk.drop(txn).await?;
    delv.drop(txn).await?;
    }
    // Find the leftmost page in the right subtree of the deleted
    // element, in order to find a replacement.
    let cur = cursor.cur();
    let right_subtree = P::right_child(cur.page.as_page(), &cur.cursor);
    cursor.set_first_leaf(txn, right_subtree).await?;
    let leaf_is_shared = cursor.len() >= cursor.first_rc_len();
    // In the leaf, mark the replacement for deletion, and keep a
    // reference to it in k0v0 if the deletion happens in an internal
    // node (k0v0 is `Option::None` else).
    let (mut last_op, k0v0) = leaf_delete(txn, cursor, p0).await?;
    // If the leaf is shared with another tree, then since we're
    // cloning the leaf, update the reference counts of all the
    // references contained in the leaf.
    if leaf_is_shared {
    modify_rc(txn, &last_op).await?;
    }
    let mut free = [[0, 0]; N_CURSORS];
    // Then, climb up the stack, performing the operations lazily. At
    // each step, we are one level above the page that we plan to
    // modify, since `last_op` is the result of popping the
    // stack.
    //
    // We iterate up to the root. The last iteration builds a modified
    // page for the root, but doesn't actually execute it.
    while cursor.len() > 0 {
    // Prepare a plan for merging the current modified page (which
    // is stored in `last_op`) with one of its neighbours.
    let concat = concat(txn, cursor, p0, &k0v0, last_op).await?;
    let mil = concat.mod_is_left;
    let incr_page = if !concat.other_is_mutable {
    Some(CowPage {
    offset: concat.other.offset,
    data: concat.other.data,
    })
    } else {
    None
    };
    let incr_mid = if cursor.len() >= cursor.first_rc_len() {
    Some(concat.mid)
    } else {
    None
    };
    // Execute the modification plan, resulting in one of four
    // different outcomes (described in the big match in
    // `handle_merge`). This mutates or clones the current
    // modified page, returning an `Op` describing what happened
    // (between update, merge, rebalance and split).
    let merge = P::merge_or_rebalance(txn, concat).await?;
    // Prepare a description (`last_op`) of the next page
    // modification, without mutating nor cloning that page.
    last_op = handle_merge(txn, cursor, p0, &k0v0, incr_page, incr_mid, mil, merge, &mut free).await?;
    // If the modified page is shared with another tree, we will
    // clone it in the next round. Therefore, update the reference
    // counts of all the references in the modified page.
    //
    // Since `handle_merge` pops the stack, the modified page is
    // at level `cursor.len() + 1`.
    if cursor.len() + 1 >= cursor.first_rc_len() {
    modify_rc(txn, &last_op).await?;
    }
    }
    // The last modification plan was on the root, and still needs to
    // be executed.
    let root_is_shared = cursor.first_rc_len() == 1;
    update_root(txn, db, last_op, k0v0, root_is_shared, &mut free).await?;
    // Finally, free all the pages marked as free during the deletion,
    // now that we don't need to read them anymore.
    for p in free.iter() {
    if p[0] & 1 == 1 {
    txn.decr_rc_owned(p[0] ^ 1).await?;
    } else if p[0] > 0 {
    txn.decr_rc(p[0]).await?;
    }
    if p[1] & 1 == 1 {
    txn.decr_rc_owned(p[1] ^ 1).await?;
    } else if p[1] > 0 {
    txn.decr_rc(p[1]).await?;
    }
    }
    Ok(true)
    }
    /// Preparing a modified page for the leaf.
    async fn leaf_delete<
    'a,
    T: AllocPage + LoadPage,
    K: Storable + ?Sized + 'a,
    V: Storable + ?Sized + 'a,
    P: BTreeMutPage<K, V> + 'a,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    p0: usize,
    ) -> Result<(ModifiedPage<'a, K, V, P>, Option<P::Saved>), T::Error> {
    let is_rc = cursor.len() >= cursor.first_rc_len();
    let del_at_internal = p0 < cursor.len();
    let curs0 = cursor.pop().unwrap();
    let (c0, c1) = P::split_at(&curs0.cursor);
    let mut deleted = None;
    if del_at_internal {
    // In this case, we need to save the replacement, and if this
    // leaf is shared with another tree, we also need increment
    // the replacement's references, since we are copying it.
    let (k, v, _) = P::current(curs0.page.as_page(), &c1).unwrap();
    if is_rc {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    }
    deleted = Some(P::save_deleted_leaf_entry(k, v))
    }
    Ok((
    ModifiedPage {
    page: curs0.page,
    mutable: !is_rc,
    c0,
    c1,
    skip_first: true,
    l: 0,
    r: 0,
    ins: None,
    ins2: None,
    // The following (`mod_is_left`) is meaningless in this
    // context, and isn't actually used: indeed, the only
    // place where this field is used is when modifying the
    // root, when `ins.is_some()`.
    mod_is_left: true,
    },
    deleted,
    ))
    }
    /// From a cursor at level `p = cursor.len()`, form the
    /// concatenation of `last_op` (which is a modified page at level p +
    /// 1`, and its left or right sibling, depending on the case.
    async fn concat<
    'a,
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    p0: usize,
    k0v0: &Option<P::Saved>,
    last_op: ModifiedPage<'a, K, V, P>,
    ) -> Result<Concat<'a, K, V, P>, T::Error> {
    let p = cursor.len();
    let rc_level = cursor.first_rc_len();
    let curs = cursor.current_mut();
    if p == p0 {
    // Here, p == p0, meaning that we're deleting at an internal
    // node. If that's the case, k0v0 is `Some`, hence we can
    // safely unwrap the replacement.
    let (k0, v0) = unsafe { P::from_saved(k0v0.as_ref().unwrap()) };
    // Since we've picked the leftmost entry of the right child of
    // the deleted entry, the other page to consider in the
    // concatenation is the left child of the deleted entry.
    let other = txn.load_page(P::left_child(curs.page.as_page(), &curs.cursor)).await?;
    // Then, if the page at level `p` or one of its ancestor, is
    // pointed at least twice (i.e. if `p >= rc_level`, or
    // alternatively if `other` is itself pointed at least twice,
    // the page is immutable, meaning that we can't delete on it
    // when rebalancing.
    let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;
    Ok(Concat {
    modified: last_op,
    mid: (k0, v0),
    other,
    mod_is_left: false,
    other_is_mutable,
    })
    } else {
    // Here, `p` is not a leaf (but `last_op` might be), and does
    // not contain the deleted entry.
    // In this case, the visited page at level `p+1` is always on
    // the left-hand side of the cursor at level `p` (this is an
    // invariant of cursors). However, if the cursor at level `p`
    // is past the end of the page, we need to move one step back
    // to find a middle element for the concatenation, in which
    // case the visited page becomes the right-hand side of the
    // cursor.
    let ((k, v, r), mod_is_left) =
    if let Some(x) = P::current(curs.page.as_page(), &curs.cursor) {
    // Not the last element of the page.
    (x, true)
    } else {
    // Last element of the page.
    let (k, v, _) = P::prev(curs.page.as_page(), &mut curs.cursor).unwrap();
    let l = P::left_child(curs.page.as_page(), &curs.cursor);
    ((k, v, l), false)
    };
    let other = txn.load_page(r).await?;
    let other_is_mutable = (p < rc_level) && txn.rc(other.offset).await? < 2;
    Ok(Concat {
    modified: last_op,
    // The middle element comes from the page above this
    // concatenation, and hence has the same lifetime as that
    // page. However, our choice of lifetimes can't express
    // this, but we also know that we are not going to mutate
    // the parent before rebalancing or merging the
    // concatenation, and hence this pointer will be alive
    // during these operations (i.e. during the merge or
    // rebalance).
    mid: unsafe { (core::mem::transmute(k), core::mem::transmute(v)) },
    other,
    mod_is_left,
    other_is_mutable,
    })
    }
    }
    /// Prepare a modified version of the page at the current level `p =
    /// cursor.len()`.
    async fn handle_merge<
    'a,
    T: AllocPage + LoadPage,
    K: Storable + ?Sized + PartialEq,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    p0: usize,
    k0v0: &Option<P::Saved>,
    incr_other: Option<CowPage>,
    incr_mid: Option<(&K, &V)>,
    mod_is_left: bool, // The modified page in the `merge` is the left one.
    merge: Op<'a, T, K, V>,
    free: &mut [[u64; 2]; N_CURSORS],
    ) -> Result<ModifiedPage<'a, K, V, P>, T::Error> {
    let mutable = cursor.len() < cursor.first_rc_len();
    let mut last_op = {
    // Beware, a stack pop happens here, all subsequent references
    // to the pointer must be updated.
    let curs = cursor.pop().unwrap();
    let (c0, c1) = P::split_at(&curs.cursor);
    ModifiedPage {
    page: curs.page,
    mutable,
    c0,
    l: 0,
    r: 0,
    ins: None,
    ins2: None,
    c1,
    skip_first: true,
    mod_is_left,
    }
    };
    // For merges and rebalances, take care of the reference counts of
    // pages and key/values.
    match merge {
    Op::Merged {.. } | Op::Rebalanced { .. } => {
    // Increase the RC of the "other page's" descendants. In
    // the case of a rebalance, this has the effect of
    // increasing the RC of the new middle entry if that entry
    // comes from a shared page, which is what we want.
    if let Some(other) = incr_other {
    let mut curs = P::cursor_first(&other);
    let left = P::left_child(other.as_page(), &curs);
    if left > 0 {
    txn.incr_rc(left).await?;
    }
    while let Some((k0, v0, r)) = P::next(other.as_page(), &mut curs) {
    for o in k0.page_references().chain(v0.page_references()) {
    txn.incr_rc(o).await?;
    }
    if r != 0 {
    txn.incr_rc(r).await?;
    }
    }
    }
    // If the middle element comes from a shared page,
    // increment its references.
    if let Some((ref k, ref v)) = incr_mid {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    }
    }
    _ => {}
    }
    // Here, we match on `merge`, the operation that just happened at
    // level `cursor.len() + 1`, and build the modification plan
    // for the page at the current level (i.e. at level
    // `cursor.len()`).
    let freed = match merge {
    Op::Merged {
    page,
    freed,
    marker: _,
    } => {
    // If we're deleting at an internal node, the
    // replacement has already been included in the merged
    // page.
    last_op.l = page.0.offset;
    freed
    }
    Op::Rebalanced {
    k,
    v,
    l,
    r,
    freed,
    } => {
    // If we're deleting at an internal node, the
    // replacement is already in pages `l` or `r`.
    last_op.l = l;
    last_op.r = r;
    last_op.ins = Some((k, v));
    freed
    }
    Op::Put(Put::Ok(Ok { page, freed })) => {
    // No merge, split or rebalance has happened. If we're
    // deleting at an internal node (i.e. if cursor.len ==
    // p0), we need to mark the replacement here.
    if cursor.len() + 1 == p0 {
    // We have already incremented the references of the
    // replacement at the leaf.
    if let Some(k0v0) = k0v0 {
    last_op.ins = Some(unsafe { P::from_saved(k0v0) });
    }
    last_op.r = page.0.offset;
    } else {
    last_op.skip_first = false;
    if mod_is_left {
    last_op.l = page.0.offset;
    } else {
    last_op.r = page.0.offset;
    }
    }
    [freed, 0]
    }
    Op::Put(Put::Split {
    left,
    right,
    split_key,
    split_value,
    freed,
    }) => {
    // This case only happens if `(K, V)` is not `Sized`, when
    // either (1) a rebalance replaces a key/value pair with a
    // larger one or (2) another split has happened in a page
    // below.
    let split_key_is_k0 = if cursor.len() == p0 {
    // In this case, ins2 comes after ins, since the
    // replacement is in the right child of the deleted
    // key.
    if let Some(k0v0) = k0v0 {
    last_op.ins = Some(unsafe { P::from_saved(k0v0) });
    }
    last_op.ins2 = Some((split_key, split_value));
    false
    } else {
    last_op.ins = Some((split_key, split_value));
    last_op.skip_first = false;
    // If the page modified in the last step is the one on
    // the right of the current entry, move right one step
    // before inserting the split key/value.
    //
    // (remember that we popped the stack upon entering
    // this function).
    //
    // We need to do this because the split key/value is
    // inserted immediately *before* the current cursor
    // position, which is incorrect if the split key/value
    // comes from the right-hand side of the current
    // cursor position.
    //
    // This can happen in exactly two situations:
    // - when the element we are deleting is the one we are
    // skipping here.
    // - when we are deleting in the rightmost child of a
    // page.
    if cursor.len() > 0 && !mod_is_left {
    P::move_next(&mut last_op.c1);
    }
    // If the split key is the replacement element, we have
    // already increased its counter when we deleted it from
    // its original position at the bottom of the tree.
    //
    // This can happen if we replaced an element and that
    // replacement caused a split with the replacement as the
    // middle element.
    if let Some(k0v0) = k0v0 {
    assert!(cursor.len() + 1 < p0);
    let (k0, _) = unsafe { P::from_saved(k0v0) };
    core::ptr::eq(k0, split_key)
    } else {
    false
    }
    };
    // The `l` and `r` fields are relative to `ins2` if
    // `ins2.is_some()` or to `ins` else.
    last_op.l = left.0.offset;
    last_op.r = right.0.offset;
    if cursor.len() + 2 >= cursor.first_rc_len() && !split_key_is_k0 {
    for o in split_key
    .page_references()
    .chain(split_value.page_references())
    {
    txn.incr_rc(o).await?;
    }
    }
    [freed, 0]
    }
    };
    // Free the page(s) at level `cursor.len() + 1` if it isn't
    // shared with another tree, or if it is the first level shared
    // with another tree.
    if cursor.len() + 1 < cursor.first_rc_len() {
    free[cursor.len() + 1] = freed;
    }
    Ok(last_op)
    }
    // This function modifies the reference counts of references of the
    // modified page, which is the page *about to be* modified.
    //
    // This function is always called when `m` is an internal node.
    async fn modify_rc<
    'a,
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V>,
    >(
    txn: &mut T,
    m: &ModifiedPage<'a, K, V, P>,
    ) -> Result<(), T::Error> {
    let mut c0 = m.c0.clone();
    let mut c1 = m.c1.clone();
    let mut left = P::left_child(m.page.as_page(), &c0);
    while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c0) {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    if left != 0 {
    txn.incr_rc(left).await?;
    }
    left = r;
    }
    // The insertions are taken care of in `handle_merge` above,
    // so we can directly move to the `c1` part of the
    // modification.
    if m.skip_first {
    P::move_next(&mut c1);
    }
    // If we are not updating the left child of c1's first
    // element, increment that left child.
    if m.l == 0 {
    if left != 0 {
    txn.incr_rc(left).await?;
    }
    }
    // If we are not updating the right child of the first
    // element of `c1`, increment that right child's RC.
    if let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    // If `m.skip_first`, we have already skipped the entry above,
    // so this `r` has nothing to do with any update.
    //
    // Else, if we aren't skipping, but also aren't updating the
    // right child of the current entry, also increase the RC.
    if (m.skip_first || m.r == 0) && r != 0 {
    txn.incr_rc(r).await?;
    }
    }
    // Finally, increment the RCs of all other elements in `c1`.
    while let Some((k, v, r)) = P::next(m.page.as_page(), &mut c1) {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    if r != 0 {
    txn.incr_rc(r).await?;
    }
    }
    Ok(())
    }
    async fn update_root<
    'a,
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V>,
    >(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    last_op: ModifiedPage<'a, K, V, P>,
    k0v0: Option<P::Saved>,
    is_rc: bool,
    free: &mut [[u64; 2]; N_CURSORS],
    ) -> Result<(), T::Error> {
    if let Some(d) = single_child(&last_op) {
    // If the root had only one element, and the last operation
    // was a merge, this decreases the depth of the tree.
    //
    // We don't do this if the table is empty (i.e. if `d == 0`),
    // in order to be consistent with put and drop.
    if d > 0 {
    if last_op.page.is_dirty() {
    txn.decr_rc_owned(last_op.page.offset).await?;
    } else {
    txn.decr_rc(last_op.page.offset).await?;
    }
    db.db = d;
    } else {
    // The page becomes empty.
    let (page, freed) = P::del(txn, last_op.page, last_op.mutable, &last_op.c1, d).await?;
    free[0][0] = freed;
    db.db = page.0.offset
    }
    return Ok(());
    }
    // Else, the last operation `last_op` was relative to the root,
    // but it hasn't been applied yet. We apply it now:
    match modify::<_, K, V, P>(txn, last_op).await? {
    Put::Ok(Ok { page, freed }) => {
    // Here, the root was simply updated (i.e. some elements
    // might have been deleted/inserted/updated), so we just
    // need to update the Db.
    free[0][0] = freed;
    db.db = page.0.offset
    }
    Put::Split {
    split_key: k,
    split_value: v,
    left: MutPage(CowPage { offset: l, .. }),
    right: MutPage(CowPage { offset: r, .. }),
    freed,
    } => {
    // Else, the root has split, and we need to allocate a new
    // page to hold the split key/value and the two sides of
    // the split.
    free[0][0] = freed;
    let mut page = txn.alloc_page().await?;
    P::init(&mut page);
    let mut c = P::cursor_before(&page.0);
    P::move_next(&mut c);
    let page = if let Put::Ok(p) = P::put(txn, page.0, true, false, &c, k, v, None, l, r).await? {
    p.page
    } else {
    unreachable!()
    };
    let split_key_is_k0 = if let Some(ref k0v0) = k0v0 {
    let (k0, _) = unsafe { P::from_saved(k0v0) };
    core::ptr::eq(k0, k)
    } else {
    false
    };
    // If the root isn't shared with another tree, and the
    // split key is different from the replacement, increment
    // the RCs of the references contained in the split
    // key/value.
    //
    // The replacement need not be incremented again, since it
    // was already increment when we took it from the page in
    // `leaf_delete`.
    if is_rc && !split_key_is_k0 {
    for o in k.page_references().chain(v.page_references()) {
    txn.incr_rc(o).await?;
    }
    }
    // Finally, update the database.
    db.db = page.0.offset
    }
    }
    Ok(())
    }
    fn single_child<K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
    m: &ModifiedPage<K, V, P>,
    ) -> Option<u64> {
    let mut c1 = m.c1.clone();
    if m.skip_first {
    P::move_next(&mut c1);
    }
    if P::is_empty(&m.c0) && m.ins.is_none() && P::is_empty(&c1) {
    Some(m.l)
    } else {
    None
    }
    }
    /// Apply the modifications to a page. This is exclusively used for
    /// the root page, because other modifications are applied during a
    /// merge/rebalance attempts.
    async fn modify<'a, T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
    txn: &mut T,
    m: ModifiedPage<'a, K, V, P>,
    ) -> Result<super::put::Put<'a, K, V>, T::Error> {
    if let Some((k, v)) = m.ins {
    // The following call might actually replace the first element
    // of `m.c1` if `m.skip_first`.
    let mut c1 = m.c1.clone();
    if !m.skip_first && !m.mod_is_left {
    // This means that the page below just split, since we
    // have to insert an extra entry on the root page.
    //
    // However, the extra entry is to be inserted (by
    // `P::put`) *before* `c1`'s first element, which is
    // incorrect since the page that split is the right child
    // of `c1`'s first element. Therefore, we need to move
    // `c1` one notch to the right.
    assert!(m.ins2.is_none());
    P::move_next(&mut c1);
    }
    P::put(
    txn,
    m.page,
    m.mutable,
    m.skip_first,
    &c1,
    k,
    v,
    m.ins2,
    m.l,
    m.r,
    ).await
    } else {
    // If there's no insertion to do, we might still need to
    // delete elements, or update children.
    if m.skip_first {
    let (page, freed) = P::del(txn, m.page, m.mutable, &m.c1, m.l).await?;
    Ok(Put::Ok(Ok { freed, page }))
    } else if m.l > 0 {
    // If the left page of the current entry has changed,
    // update it.
    Ok(Put::Ok(P::update_left_child(
    txn, m.page, m.mutable, &m.c1, m.l,
    ).await?))
    } else {
    // If there's no insertion, and `m.l == 0`, we need to
    // update the right child of the current entry. The
    // following moves one step to the right and updates the
    // left child:
    let mut c1 = m.c1.clone();
    P::move_next(&mut c1);
    Ok(Put::Ok(P::update_left_child(
    txn, m.page, m.mutable, &c1, m.r,
    ).await?))
    }
    }
    }
  • file addition: cursor.rs (----------)
    [0.12617]
    use super::*;
    use crate::{CowPage, LoadPage};
    use core::mem::MaybeUninit;
    #[derive(Debug)]
    pub(super) struct PageCursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    pub page: CowPage,
    pub cursor: P::Cursor,
    }
    unsafe impl<K, V, P: BTreePage<K, V>> Send for PageCursor<K, V, P>{}
    unsafe impl<K, V, P: BTreePage<K, V>> Sync for PageCursor<K, V, P>{}
    // This is 1 + the maximal depth of a tree.
    //
    // Since pages are of size 2^12, there are at most 2^52 addressable
    // pages (potentially less depending on the platform). Since each page
    // of a B tree below the root has at least 2 elements (because each
    // page is at least half-full, and elements are at most 1/4th of a
    // page), the arity is at least 3, except for the root. Since 3^33 is
    // the smallest power of 3 larger than 2^52, the maximum depth is 33.
    pub(crate) const N_CURSORS: usize = 33;
    #[derive(Debug)]
    /// A position in a B tree.
    pub struct Cursor<K: ?Sized, V: ?Sized, P: BTreePage<K, V>> {
    /// Invariant: the first `len` items are initialised.
    stack: [core::mem::MaybeUninit<PageCursor<K, V, P>>; N_CURSORS],
    /// The length of the stack at the position of the first page
    /// shared with another tree. This definition is a little bit
    /// convoluted, but allows for efficient comparisons between
    /// `first_rc_len` and `len` to check whether a page is shared
    /// with another tree.
    first_rc_len: usize,
    /// Number of initialised items on the stack.
    len: usize,
    }
    unsafe impl<K, V, P: BTreePage<K, V>> Send for Cursor<K, V, P>{}
    unsafe impl<K, V, P: BTreePage<K, V>> Sync for Cursor<K, V, P>{}
    impl<'a, K: 'a + ?Sized + Storable, V: 'a + ?Sized + Storable, P: BTreePage<K, V>> Cursor<K, V, P> {
    /// Create a new cursor, initialised to the first entry of the B tree.
    pub async fn new<T: LoadPage>(txn: &T, db: &Db_<K, V, P>) -> Result<Self, T::Error> {
    // Looking forward to getting array initialisation stabilised :)
    let mut stack = [
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    core::mem::MaybeUninit::uninit(),
    ];
    let page = txn.load_page(db.db).await?;
    stack[0] = MaybeUninit::new(PageCursor {
    cursor: P::cursor_before(&page),
    page,
    });
    Ok(Cursor {
    stack,
    first_rc_len: N_CURSORS,
    len: 1,
    })
    }
    pub(super) fn push(&mut self, p: PageCursor<K, V, P>) {
    self.stack[self.len] = MaybeUninit::new(p);
    self.len += 1;
    }
    pub(super) fn cur(&self) -> &PageCursor<K, V, P> {
    assert!(self.len > 0);
    unsafe { &*self.stack[self.len - 1].as_ptr() }
    }
    pub(super) fn len(&self) -> usize {
    self.len
    }
    pub(super) fn first_rc_len(&self) -> usize {
    self.first_rc_len
    }
    pub(super) fn pop(&mut self) -> Option<PageCursor<K, V, P>> {
    if self.len > 0 {
    self.len -= 1;
    let result = core::mem::replace(&mut self.stack[self.len], MaybeUninit::uninit());
    Some(unsafe { result.assume_init() })
    } else {
    None
    }
    }
    pub(super) fn current_mut(&mut self) -> &mut PageCursor<K, V, P> {
    assert!(self.len > 0);
    unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() }
    }
    /// Push the leftmost path starting at page `left_page` onto the
    /// stack.
    pub(super) async fn set_first_leaf<T: LoadPage>(
    &mut self,
    txn: &T,
    mut left_page: u64,
    ) -> Result<(), T::Error> {
    while left_page > 0 {
    if self.first_rc_len >= N_CURSORS && txn.rc(left_page).await? >= 2 {
    self.first_rc_len = self.len + 1
    }
    let page = txn.load_page(left_page).await?;
    let curs = P::cursor_first(&page);
    left_page = P::left_child(page.as_page(), &curs);
    self.push(PageCursor { page, cursor: curs });
    }
    Ok(())
    }
    /// Reset the cursor to the first element of the database.
    pub fn reset<T: LoadPage>(&mut self) {
    self.len = 1;
    let init = unsafe { &mut *self.stack[0].as_mut_ptr() };
    init.cursor = P::cursor_before(&init.page);
    }
    // An invariant of cursors, fundamental to understanding the
    // `next` and `prev` functions below, is that the lower levels (in
    // the tree, upper levels on the stack) are the left children of
    // the cursor's position on a page.
    /// Set the cursor to the first position larger than or equal to
    /// the specified key and value.
    pub async fn set<T: LoadPage>(
    &mut self,
    txn: &'a T,
    k: &K,
    v: Option<&V>,
    ) -> Result<Option<(&'a K, &'a V)>, T::Error> {
    // Set the "cursor stack" by setting a cursor in each page
    // on a path from the root to the appropriate leaf.
    // Start from the bottom of the stack, which is also the root
    // of the tree.
    self.len = 1;
    self.first_rc_len = N_CURSORS;
    let init = unsafe { &mut *self.stack[0].as_mut_ptr() };
    init.cursor = P::cursor_before(&init.page);
    let mut last_matching_page = N_CURSORS;
    let mut last_match = None;
    while self.len < N_CURSORS {
    let current = unsafe { &mut *self.stack.get_unchecked_mut(self.len - 1).as_mut_ptr() };
    if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {
    self.first_rc_len = self.len
    }
    let ref mut cursor = current.cursor;
    if let Ok((kk, vv, _)) = P::set_cursor(txn, current.page.as_page(), cursor, k, v).await {
    if v.is_some() {
    return Ok(Some((kk, vv)));
    }
    last_match = Some((kk, vv));
    last_matching_page = self.len
    }
    let next_page = P::left_child(current.page.as_page(), cursor);
    if next_page > 0 {
    let page = txn.load_page(next_page).await?;
    self.push(PageCursor {
    cursor: P::cursor_before(&page),
    page,
    });
    } else {
    break;
    }
    }
    if last_matching_page < N_CURSORS {
    self.len = last_matching_page;
    Ok(last_match)
    } else {
    Ok(None)
    }
    }
    /// Set the cursor after the last element, so that [`Self::next`]
    /// returns `None`, and [`Self::prev`] returns the last element.
    pub async fn set_last< T: LoadPage>(
    &mut self,
    txn: &'a T,
    ) -> Result<(), T::Error> {
    self.len = 1;
    self.first_rc_len = N_CURSORS;
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    current.cursor = P::cursor_after(&current.page);
    loop {
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    if self.first_rc_len >= N_CURSORS && txn.rc(current.page.offset).await? >= 2 {
    self.first_rc_len = self.len
    }
    let l = P::left_child(current.page.as_page(), &current.cursor);
    if l > 0 {
    let page = txn.load_page(l).await?;
    self.push(PageCursor {
    cursor: P::cursor_after(&page),
    page,
    })
    } else {
    break;
    }
    }
    Ok(())
    }
    /// Return the current position of the cursor.
    pub fn current<T: LoadPage>(
    &mut self,
    _txn: &'a T,
    ) -> Result<Option<(&'a K, &'a V)>, T::Error> {
    loop {
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    if P::is_init(&current.cursor) {
    // The cursor hasn't been set.
    return Ok(None)
    } else if let Some((k, v, _)) = P::current(current.page.as_page(), &current.cursor)
    {
    unsafe {
    return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
    }
    } else if self.len > 1 {
    self.len -= 1
    } else {
    // We're past the last element at the root.
    return Ok(None);
    }
    }
    }
    /// Return the current position of the cursor, and move the cursor
    /// to the next position.
    pub async fn next< T: LoadPage>(
    &mut self,
    txn: &'a T,
    ) -> Result<Option<(&'a K, &'a V)>, T::Error> {
    loop {
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    if P::is_empty(&current.cursor) {
    if self.len > 1 {
    if self.first_rc_len == self.len {
    self.first_rc_len = N_CURSORS
    }
    self.len -= 1
    } else {
    return Ok(None);
    }
    } else {
    let (cur_entry, r) = if let Some((k, v, r)) = P::current(current.page.as_page(), &current.cursor) {
    (Some((k, v)), r)
    } else {
    (None, P::right_child(current.page.as_page(), &current.cursor))
    };
    P::move_next(&mut current.cursor);
    if r != 0 {
    let page = txn.load_page(r).await?;
    self.push(PageCursor {
    cursor: P::cursor_before(&page),
    page,
    });
    if self.first_rc_len >= N_CURSORS && txn.rc(r).await? >= 2 {
    self.first_rc_len = self.len
    }
    }
    if let Some((k, v)) = cur_entry {
    unsafe {
    return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
    }
    }
    }
    }
    }
    /// Move the cursor to the previous entry, and return the entry
    /// that was current before the move. If the cursor is initially
    /// after all the entries, this moves it back by two steps.
    pub async fn prev< T: LoadPage>(
    &mut self,
    txn: &'a T,
    ) -> Result<Option<(&'a K, &'a V)>, T::Error> {
    loop {
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    if P::is_init(&current.cursor) {
    if self.len > 1 {
    if self.first_rc_len == self.len {
    self.first_rc_len = N_CURSORS
    }
    self.len -= 1;
    let current = unsafe { &mut *self.stack[self.len - 1].as_mut_ptr() };
    P::move_prev(&mut current.cursor);
    } else {
    return Ok(None);
    }
    } else {
    let cur_entry = P::current(current.page.as_page(), &current.cursor);
    let left = P::left_child(current.page.as_page(), &current.cursor);
    if left != 0 {
    let page = txn.load_page(left).await?;
    self.push(PageCursor {
    cursor: P::cursor_after(&page),
    page,
    });
    if self.first_rc_len >= N_CURSORS && txn.rc(left).await? >= 2 {
    self.first_rc_len = self.len
    }
    } else {
    // We are at a leaf.
    P::move_prev(&mut current.cursor);
    }
    if let Some((k, v, _)) = cur_entry {
    unsafe {
    return Ok(Some((core::mem::transmute(k), core::mem::transmute(v))));
    }
    }
    }
    }
    }
    }
    pub struct Iter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>> {
    txn: &'a T,
    cursor: Cursor<K, V, P>,
    }
    pub async fn iter<'a, T, K, V, P>(
    txn: &'a T,
    db: &super::Db_<K, V, P>,
    origin: Option<(&K, Option<&V>)>,
    ) -> Result<Iter<'a, T, K, V, P>, T::Error>
    where
    T: LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V>,
    {
    let mut cursor = Cursor::new(txn, db).await?;
    if let Some((k, v)) = origin {
    cursor.set(txn, k, v).await?;
    }
    Ok(Iter { cursor, txn })
    }
    impl<
    'a,
    T: LoadPage,
    K: Storable + ?Sized + 'a,
    V: Storable + ?Sized + 'a,
    P: BTreePage<K, V> + 'a,
    > Iter<'a, T, K, V, P>
    {
    pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{
    self.cursor.prev(self.txn).await
    }
    }
    pub struct RevIter<'a, T: LoadPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreePage<K, V>>
    {
    txn: &'a T,
    cursor: Cursor<K, V, P>,
    }
    pub async fn rev_iter<'a, T, K, V, P>(
    txn: &'a T,
    db: &super::Db_<K, V, P>,
    origin: Option<(&K, Option<&V>)>,
    ) -> Result<RevIter<'a, T, K, V, P>, T::Error>
    where
    T: LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V>,
    {
    let mut cursor = Cursor::new(txn, db).await?;
    if let Some((k, v)) = origin {
    cursor.set(txn, k, v).await?;
    } else {
    cursor.set_last(txn).await?;
    }
    Ok(RevIter { cursor, txn })
    }
    impl<
    'a,
    T: LoadPage,
    K: Storable + ?Sized + 'a,
    V: Storable + ?Sized + 'a,
    P: BTreePage<K, V> + 'a,
    > RevIter<'a, T, K, V, P>
    {
    pub async fn next(&mut self) -> Result<Option<(&'a K, &'a V)>, T::Error>{
    self.cursor.prev(self.txn).await
    }
    }
  • file addition: Cargo.toml (----------)
    [0.32]
    [package]
    name = "sanakirja-core-async"
    version = "0.0.1"
    edition = "2018"
    description = "Copy-on-write datastructures, storable on disk (or elsewhere) with a stable format."
    authors = ["Pierre-Étienne Meunier", "Florent Becker"]
    license = "MIT/Apache-2.0"
    documentation = "https://docs.rs/sanakirja-core-async"
    repository = "https://nest.pijul.com/pijul/sanakirja"
    include = [
    "Cargo.toml",
    "src/lib.rs",
    "src",
    "src/btree",
    "src/btree/page_cursor.rs",
    "src/btree/page_unsized",
    "src/btree/page_unsized/alloc.rs",
    "src/btree/page_unsized/rebalance.rs",
    "src/btree/page_unsized/header.rs",
    "src/btree/page_unsized/put.rs",
    "src/btree/page_unsized/cursor.rs",
    "src/btree/put.rs",
    "src/btree/page",
    "src/btree/page/alloc.rs",
    "src/btree/page/rebalance.rs",
    "src/btree/page/put.rs",
    "src/btree/del.rs",
    "src/btree/mod.rs",
    "src/btree/page_unsized.rs",
    "src/btree/cursor.rs",
    "src/btree/page.rs"
    ]
    [features]
    crc32 = [ "crc32fast" ]
    std = []
    ed25519 = [ "ed25519-zebra", "ed25519-zebra/serde" ]
    [dependencies]
    crc32fast = { version = "1.2", optional = true, default-features = false }
    uuid = { version = "0.8", optional = true }
    ed25519-zebra = { version = "2.2", optional = true }
    async-trait = "0.1"
  • replacement in Cargo.toml at line 2
    [3.108848][3.108848:108891]()
    members = [ "sanakirja-core", "sanakirja" ]
    [3.108848]
    members = [ "sanakirja-core", "sanakirja", "sanakirja-core-async" ]