52X5P7NDBQHIJDIYNY3XUPDHHOO3PDPPNKGO2PGLXKVNM3EVECTQC
TSMS6W4DOKQNUQ4PEMTLOIODR33VFPN6MMNS73ZPSU4BOQVRGPNAC
WS4ZQM4RMIHZ6XZKSDQJGHN5SSSWFL4H236USOPUA33S6RC53RFAC
OP6SVMOD2GTQ7VNJ4E5KYFG4MIYA7HBMXJTADALMZH4PY7OQRMZQC
OTWDDJE7TTE73D6BGF4ZN6BH2NFUFLPME2VJ3CPALH463UGWLEIQC
QYDGYIZRNFRIQD7RUCY5YAN3F2THZA74E5UOHPIFWSULEJFAFVJQC
H3FVSQIQGFCFKCPXVOSFHP4OSUOBBURJESCZNGQTNDAAD3WQSBEQC
6UVFCERMGSGNRWCVC3GWO5HWV6MSWE433DXBJVC7KRPP6LLJLCSQC
RV2L6CZWTMUQ2A52YDAFVHDFGURZL3H4SSCDC347UGN23D3J5KZQC
LSQ6V7M66TEGLJ7QBLRVDX4E7UKJTDQTEXZOS3KGPGFKVXNLPKBQC
XEU2QVLCHPYOOD4TQIPEEVYOVSFMKFPLJYWEJYXYJAZ7S54KWDZAC
APPY2E7M5NHNC6MFYXSVEKJVAILK7YAZVTVE3W75EK2JNFVS3XBQC
OFINGD26ZWCRDVVDI2ZIBLMHXKEMJA6MRNLANJYUHQPIJLPA7J2AC
HN6Z5DU4WYMAIOOSNVHLIIMNF6Q53TNJ7YC27SLKWNXVYCTACQKQC
ESUI5EUZUBDPHNN3APU33IFORYPYR6J3WEMEZG57FKF3EH66ZBHAC
Q7DRIBBRE4MNG4NP3PVIXAJF5PQYLFWYIVK2O4VVLEO6XY3BOSFQC
MSRWB47YP6L5BVTS53QQPBOHY5SXTSTR5KD6IIF35UWCTEUOCQWQC
7P43FPFAXMDYUIQV7U2DGPN4UI3RNHYFWEUEA22UBDSQTYBJRAQQC
73Z2UB3JGRLFNFORE7D64O4IHIFSZASD4G4FLJ4FJLHANT75MGIAC
KX3WVNZW5KHVEH6EOQTZ4RBEFFJ3SGF5I467X3JWZ74PURRK4HVAC
LROAI3NBBSCU4T2YA6EHJYKKKL75AU5A7C7WIRCGIQ56S6HPLRXQC
AOX2XQISHGWNNAFBYRN44Q6AWG7H5DPBK5YMFHK42HQNZ2TMHEJQC
T73WR2BX2QDQ6APOREBXUKNH52FDLJNBGWPQUYB2TAF2PT7XCL2AC
SQ7MD7OWKFYNQR525YNOG7APHDNMKB7PPVWLIYFJJQPL3MWNFXLQC
W26CFMAQOXMUK4ZOJMAN4SMBXMWFQHO7HCTEVW73FQSRMJZFGJIQC
JEHCE5FNOINH47N5MZ2I7JP7DKGN24PW5TVDRVALRPZHTIQCMMDQC
QEUTVAZ4F4EJXRDMWDMYXF6XEDMX7YPVG4IIXEKPIR3K54E5W5OAC
X3QVVQIS7B7L3XYZAWL3OOBUXOJ6RMOKQ45YMLLGAHYPEEKZ45ZAC
DV4A2LR7Q5LAEGAQHLO34PZCHGJUHPAMRZFGT7GUFNKVQKPJNOYQC
EAAYH6BQWDK52EC5RG3BEZQU3FJPN5RRRN4U5KDKDVPKXBVJMNDAC
6DCQHIFPEH4GZKSRRS32GMKDRPZH4MTCGOUEI7YEUVKWENBF3JWAC
UAQX27N4PI4LHEW6LSHJETIE5MV7JTEMPLTJFYUBMYVPC43H7VOAC
}
fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
let al = K::ALIGN.max(V::ALIGN);
(s + al - 1) & !(al - 1)
//! Implementation of B tree pages for Unsized types, or types with an
//! dynamically-sized representation (for example enums with widely
//! different sizes).
//!
//! This module follows the same organisation as the sized
//! implementation, and contains types shared between the two
//! implementations.
//!
//! The types that can be used with this implementation must implement
//! the [`UnsizedStorable`] trait, which essentially replaces the
//! [`core::mem`] functions for determining the size and alignment of
//! values.
//!
//! One key difference is the implementation of leaves (internal nodes
//! have the same format): indeed, in this implementation, leaves have
//! almost the same format as internal nodes, except that their
//! offsets are written on the page as little-endian-encoded [`u16`]
//! (with only the 12 LSBs used, i.e. 4 bits unused).
}
// Many of these functions are the same as in the Sized implementations.
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
for Page<K, V>
{
fn is_empty(c: &Self::Cursor) -> bool {
c.cur >= c.total as isize
}
fn is_init(c: &Self::Cursor) -> bool {
c.cur < 0
}
type Cursor = PageCursor;
fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
PageCursor::new(p, -1)
}
fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
PageCursor::after(p)
}
// Split a cursor, returning two cursors `(a, b)` where b is the
// same as `c`, and `a` is a cursor running from the first element
// of the page to `c` (excluding `c`).
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
PageCursor {
cur: 0,
total: c.cur.max(0) as usize,
is_leaf: c.is_leaf,
},
*c,
)
}
fn move_next(c: &mut Self::Cursor) -> bool {
if c.cur < c.total as isize {
c.cur += 1;
true
} else {
false
}
}
fn move_prev(c: &mut Self::Cursor) -> bool {
if c.cur > 0 {
c.cur -= 1;
true
} else {
c.cur = -1;
false
}
}
// This function is the same as the sized implementation for
// internal nodes, since the only difference between leaves and
// internal nodes in this implementation is the size of offsets (2
// bytes for leaves, 8 bytes for internal nodes).
fn current<'a, T: LoadPage>(
txn: &T,
page: crate::Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)> {
if c.cur < 0 || c.cur >= c.total as isize {
None
} else if c.is_leaf {
unsafe {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
Some((
K::from_raw_ptr(txn, k as *const u8),
V::from_raw_ptr(txn, v as *const u8),
0,
))
}
} else {
unsafe {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add((off & 0xfff) as usize));
Some((
K::from_raw_ptr(txn, k as *const u8),
V::from_raw_ptr(txn, v as *const u8),
off & !0xfff,
))
}
}
}
// These methods are the same as in the sized implementation.
fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur >= 0);
if c.is_leaf {
0
} else {
assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
let off =
unsafe { *(page.data.as_ptr().add((HDR + c.cur as usize * 8) - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur < c.total as isize);
if c.is_leaf {
0
} else {
assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);
let off = unsafe { *(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn set_cursor<'a, T: LoadPage>(
txn: &'a T,
page: crate::Page,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize> {
unsafe {
// `lookup` has the same semantic as
// `core::slice::binary_search`, i.e. it returns either
// `Ok(n)`, where `n` is the position where `(k0, v0)` was
// found, or `Err(n)` where `n` is the position where
// `(k0, v0)` can be inserted to preserve the order.
match lookup(txn, page, c, k0, v0) {
Ok(n) => {
c.cur = n as isize;
if c.is_leaf {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
Ok((K::from_raw_ptr(txn, k), V::from_raw_ptr(txn, v), 0))
} else {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
let (k, v) =
read::<T, K, V>(txn, page.data.as_ptr().add(off as usize & 0xfff));
Ok((
K::from_raw_ptr(txn, k),
V::from_raw_ptr(txn, v),
off & !0xfff,
))
}
}
Err(n) => {
c.cur = n as isize;
Err(n)
}
}
}
}
}
// There quite some duplicated code in the following function, because
// we're forming a slice of offsets, and the using the core library's
// `binary_search_by` method on slices.
unsafe fn lookup<T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
page: crate::Page,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<usize, usize> {
let hdr = header(page);
c.total = hdr.n() as usize;
c.is_leaf = hdr.is_leaf();
if c.is_leaf {
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u16,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(txn, k as *const u8);
match k.compare(txn, k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v as *const u8);
v.compare(txn, v0)
}
cmp => cmp,
}
})
} else {
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(txn, k);
k.compare(txn, k0)
})
}
} else {
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u64,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(txn, k);
match k.compare(txn, k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v);
v.compare(txn, v0)
}
cmp => cmp,
}
})
} else {
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(txn, k);
k.compare(txn, k0)
})
}
}
// Deletions in internal nodes of a B tree need to replace the
// deleted value with a value from a leaf.
//
// In this implementation of pages, we never actually wipe any
// data from pages, we're even only appending data, or cloning the
// pages to compact them. Therefore, raw pointers to leaves are
// always valid for as long as the page isn't freed, which can
// only happen at the very last step of an insertion or deletion.
let s = Internal::offset_slice::<K, V>(page.as_page());
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut n);
let b = if page.is_dirty() { 1 } else { 0 };
freed = page.offset | b;
// Copy all the entries
let s = Internal::offset_slice(page.as_page());
clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
// Mark the old version for freeing.
freed = page.offset | if page.is_dirty() { 1 } else { 0 };
let kv_ptr = p.add((*ptr) as usize);
let size = entry_size::<K, V>(kv_ptr);
core::ptr::copy(ptr.offset(1), ptr, n - c.cur as usize - 1);
hdr.decr(size);
} else {
let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
let off = (u64::from_le(*ptr) & 0xfff) as usize;
let kv_ptr = p.add(off);
let size = entry_size::<K, V>(kv_ptr);
core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - c.cur as usize - 1);
(&mut *hdr).decr(size);
// Get the offset in the page of the key/value data.
let off = u16::from_le(*ptr);
assert_eq!(off & 0xf000, 0);
// Erase the offset by shifting the last (`n -
// c.cur - 1`) offsets. The reasoning against
// "off-by-one errors" is that if the cursor is
// positioned on the first element (`c.cur == 0`),
// there are `n-1` elements to shift.
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
// Remove the size of the actualy key/value, plus
// 2 bytes (the offset).
hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));
let off = (p.add(HDR) as *mut u64).offset(c.cur as isize - 1);
*off = (l | (u64::from_le(*off) & 0xfff)).to_le();
let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
// Offset to the key/value data.
let off = u64::from_le(*ptr);
// Shift the offsets like in the leaf case above.
core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
if l > 0 {
// In an internal node, we may have to update
// the left child of the current
// position. After the move, the current
// offset is at `ptr`, so we need to find the
// offset one step to the left.
let p = ptr.offset(-1);
*p = (l | (u64::from_le(*p) & 0xfff)).to_le();
}
// Remove the size of the key/value, plus 8 bytes
// (the offset).
hdr.decr(8 + entry_size::<K, V>(p.add(off as usize)));
unsafe {
let mut new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
if c.is_leaf {
let s = Leaf::offset_slice::<K, V>(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
} else {
let s = Internal::offset_slice::<K, V>(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
if l > 0 {
// If the page cannot be mutated, we allocate a new page and clone.
let mut new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
if c.is_leaf {
// In a leaf, this is just a matter of getting the
// offset slice, removing the current position and
// cloning.
let s = Leaf::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
} else {
// In an internal node, things are a bit trickier,
// since we might need to update the left child.
//
// First, clone the leftmost child of the page.
let hdr = header(page.as_page());
let left = hdr.left_page() & !0xfff;
unsafe {
*(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();
}
// Then, clone the first half of the page.
let s = Internal::offset_slice(page.as_page());
let (s0, s1) = s.split_at(c.cur as usize);
let (_, s1) = s1.split_at(1);
let mut n = 0;
clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
// Update the left child, which was written by the
// call to `clone` as the right child of the last
// entry in `s0`.
if l > 0 {
unsafe {
let size = hdr_size + mod_size + mid_size + occupied;
debug!(
"size = {:?} {:?} {:?} {:?} {:?}",
mod_size, mid_size, occupied, hdr_size, size
);
if size <= PAGE_SIZE {
// merge
let mut new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
let freed = {
let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
let b1 = if m.other.is_dirty() { 1 } else { 0 };
[m.modified.page.offset | b0, m.other.offset | b1]
};
if m.modified.c0.is_leaf {
merge::<_, _, _, Leaf>(txn, &mut new, m)
if mod_size + mid_size + occupied <= PAGE_SIZE {
// If the concatenation fits on a page, merge.
return if m.modified.c0.is_leaf {
merge::<_, _, _, _, Leaf>(txn, m)
merge::<_, _, _, Internal>(txn, &mut new, m)
}
return Ok(Op::Merged {
page: new,
freed,
marker: core::marker::PhantomData,
});
merge::<_, _, _, _, Internal>(txn, m)
};
let rc = PageCursor::new(&m.other, 0);
let first_size = <Page<K, V>>::current_size(m.other.as_page(), &rc);
// If we can't rebalance, modify and return.
if mod_size >= PAGE_SIZE / 2 || occupied - first_size < PAGE_SIZE / 2 {
// If we can't merge, evaluate the size of the first binding
// on the other page, to see if we can rebalance.
let first_other = PageCursor::new(&m.other, 0);
let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);
// If the modified page is at least half-full, or if removing
// the first entry on the other page would make that other
// page less than half-full, don't rebalance. See the Sized
// implementation to see cases where this happens.
if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
for Page<K, V>
{
fn is_empty(c: &Self::Cursor) -> bool {
c.cur >= c.total as isize
}
fn is_init(c: &Self::Cursor) -> bool {
c.cur < 0
}
type Cursor = PageCursor;
fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
PageCursor::new(p, -1)
}
fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
PageCursor::after(p)
}
fn current<'a, T: LoadPage>(
txn: &T,
page: crate::Page<'a>,
c: &Self::Cursor,
) -> Option<(&'a K, &'a V, u64)> {
if c.cur < 0 || c.cur >= c.total as isize {
None
} else if c.is_leaf {
unsafe {
let off = {
u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16))
as usize
};
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off));
Some((
K::from_raw_ptr(txn, k as *const u8),
V::from_raw_ptr(txn, v as *const u8),
0,
))
}
} else {
unsafe {
let off = u64::from_le(*(page.data.as_ptr().add(HDR) as *const u64).offset(c.cur));
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add((off & 0xfff) as usize));
Some((
K::from_raw_ptr(txn, k as *const u8),
V::from_raw_ptr(txn, v as *const u8),
off & !0xfff,
))
}
}
}
fn current_size(page: crate::Page, c: &Self::Cursor) -> usize {
if c.is_leaf {
Leaf::current_size::<K, V>(page, c.cur)
} else {
Internal::current_size::<K, V>(page, c.cur)
}
}
fn move_next(c: &mut Self::Cursor) -> bool {
if c.cur < c.total as isize {
c.cur += 1;
true
} else {
false
}
}
fn move_prev(c: &mut Self::Cursor) -> bool {
if c.cur > 0 {
c.cur -= 1;
true
} else {
c.cur = -1;
false
}
}
fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur >= 0);
if c.is_leaf {
0
} else {
let off =
unsafe { *(page.data.as_ptr().add((HDR + c.cur as usize * 8) - 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
assert!(c.cur < c.total as isize);
if c.is_leaf {
0
} else {
let off = unsafe { *(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64) };
u64::from_le(off) & !0xfff
}
}
fn set_cursor<'a, T: LoadPage>(
txn: &'a T,
page: crate::Page,
c: &mut PageCursor,
k0: &K,
v0: Option<&V>,
) -> Result<(&'a K, &'a V, u64), usize> {
unsafe {
// `lookup` has the same semantic as
// `core::slice::binary_search`, i.e. it returns either
// `Ok(n)`, where `n` is the position where `(k0, v0)` was
// found, or `Err(n)` where `n` is the position where
// `(k0, v0)` can be inserted to preserve the order.
let lookup = lookup(txn, page, c, k0, v0);
let result;
c.cur = match lookup {
Ok(n) => {
result = if c.is_leaf {
let off = {
let off =
u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
(0, off)
};
Ok(Leaf::kv(txn, page, off))
} else {
let off =
u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
Ok(Internal::kv(
txn,
page,
(off & !0xfff, (off & 0xfff) as u16),
))
};
n as isize
}
Err(n) => {
result = Err(n);
n as isize
}
};
result
}
}
/// Split a cursor, returning two cursors `(a, b)` where b is the
/// same as `c`, and `a` is a cursor running from the first
/// element of the page to `c` (excluding `c`).
fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
(
PageCursor {
cur: 0,
total: c.cur.max(0) as usize,
is_leaf: c.is_leaf,
},
*c,
)
}
// Size of a pair of type `(K, V)`. This is computed in the same way
// as a struct with fields of type `K` and `V` in C.
fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
let al = K::ALIGN.max(V::ALIGN);
(s + al - 1) & !(al - 1)
unsafe fn lookup<T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
// Total size of the entry for that cursor position, including the
// offset size.
fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u16,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(txn, k as *const u8);
match k.compare(txn, k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v as *const u8);
v.compare(txn, v0)
}
cmp => cmp,
}
})
} else {
s.binary_search_by(|&off| {
let off = u16::from_le(off);
let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
let k = K::from_raw_ptr(txn, k);
k.compare(txn, k0)
})
}
Leaf::current_size::<K, V>(page, c.cur)
let s = core::slice::from_raw_parts(
page.data.as_ptr().add(HDR) as *const u64,
hdr.n() as usize,
);
if let Some(v0) = v0 {
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(txn, k);
match k.compare(txn, k0) {
Ordering::Equal => {
let v = V::from_raw_ptr(txn, v);
v.compare(txn, v0)
}
cmp => cmp,
}
})
} else {
s.binary_search_by(|&off| {
let off = u64::from_le(off) & 0xfff;
let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
let k = K::from_raw_ptr(txn, k);
k.compare(txn, k0)
})
}
Internal::current_size::<K, V>(page, c.cur)
#[derive(Debug, Clone, Copy)]
pub struct PageCursor {
pub(super) cur: isize,
pub(super) total: usize,
pub(super) is_leaf: bool,
pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize);
impl PageCursor {
pub(super) fn new(p: &crate::CowPage, cur: isize) -> Self {
let hdr = unsafe { &*(p.data as *const Header) };
assert!(cur < hdr.n() as isize);
PageCursor {
cur,
total: hdr.n() as usize,
is_leaf: hdr.is_leaf(),
}
}
pub(super) fn after(p: &crate::CowPage) -> Self {
let hdr = unsafe { &*(p.data as *const Header) };
let total = hdr.n();
PageCursor {
cur: total as isize,
total: total as usize,
is_leaf: hdr.is_leaf(),
}
}
pub(super) fn last(p: crate::Page) -> Self {
let hdr = header(p);
let total = hdr.n();
PageCursor {
cur: (total - 1) as isize,
total: total as usize,
is_leaf: hdr.is_leaf(),
}
}
}
fn modify<
T: LoadPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc,
>(
/// Perform the modifications on a page, by copying it onto page `new`.
fn modify<T: LoadPage, K: ?Sized, V: ?Sized, P: BTreeMutPage<K, V>, L: AllocWrite<K, V>>(
debug!("modify {:?}", m);
let mut l = <Page<K, V>>::left_child(m.page.as_page(), &m.c0);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.page.as_page(), &mut m.c0) {
alloc::<_, _, L>(new, k, v, l, r, n);
// This is the exact same implementation as in the sized
// module. I'm not convinced
let mut l = P::left_child(m.page.as_page(), &m.c0);
while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut m.c0) {
L::alloc_write(new, k, v, l, r, n);
let mut is_first = m.skip_first;
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.page.as_page(), &mut m.c1) {
if is_first {
is_first = false;
continue;
}
alloc::<_, _, L>(new, k, v, l, r, n);
let mut c1 = m.c1.clone();
if m.skip_first {
P::move_next(&mut c1);
}
while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut c1) {
L::alloc_write(new, k, v, l, r, n);
fn merge<
T: LoadPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc,
/// The very unsurprising `merge` function
pub(super) fn merge<
'a,
T: AllocPage + LoadPage,
K: ?Sized,
V: ?Sized,
P: BTreeMutPage<K, V>,
L: AllocWrite<K, V>,
txn: &T,
new: &mut MutPage,
mut m: Concat<K, V, Page<K, V>>,
) {
txn: &mut T,
mut m: Concat<K, V, P>,
) -> Result<Op<'a, T, K, V>, T::Error> {
// This algorithm is probably a bit naive, especially for leaves,
// where if the left page is mutable, we can copy the other page
// onto it.
// Indeed, we first allocate a page, then clone both pages onto
// it, in a different order depending on whether the modified page
// is the left or the right child.
let mut new = txn.alloc_page()?;
P::init(&mut new);
modify::<_, _, _, L>(txn, new, &mut m.modified, &mut n);
let mut rc = PageCursor::new(&m.other, 0);
let l = <Page<K, V>>::left_child(m.other.as_page(), &rc);
alloc::<_, _, L>(new, m.mid.0, m.mid.1, 0, l, &mut n);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.other.as_page(), &mut rc) {
alloc::<_, _, L>(new, k, v, 0, r, &mut n);
modify::<_, _, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
let mut rc = P::cursor_first(&m.other);
let l = P::left_child(m.other.as_page(), &rc);
L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, l, &mut n);
while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc) {
L::alloc_write(&mut new, k, v, 0, r, &mut n);
let mut rc = PageCursor::new(&m.other, 0);
let mut l = <Page<K, V>>::left_child(m.other.as_page(), &rc);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.other.as_page(), &mut rc) {
alloc::<_, _, L>(new, k, v, l, r, &mut n);
let mut rc = P::cursor_first(&m.other);
let mut l = P::left_child(m.other.as_page(), &rc);
while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc) {
L::alloc_write(&mut new, k, v, l, r, &mut n);
alloc::<_, _, L>(new, m.mid.0, m.mid.1, 0, 0, &mut n);
modify::<_, _, _, L>(txn, new, &mut m.modified, &mut n);
L::alloc_write(&mut new, m.mid.0, m.mid.1, 0, 0, &mut n);
modify::<_, _, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
match s {
Offsets::Slice(s) => {
debug!("clone: {:?} {:?}", s, s.len());
for off in s.iter() {
let (r, off): (u64, u16) = (*off).into();
debug!("clone: {:?} {:?}", r, off);
unsafe {
let ptr = page.data.as_ptr().add(off as usize);
let size = entry_size::<K, V>(ptr);
debug!("size: {:?}", size);
let hdr = header_mut(new);
let off_new = L::alloc(hdr, size, K::ALIGN.max(V::ALIGN));
debug!("off_new: {:?}", off_new);
core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
L::set_offset(new, *n, r, off_new);
}
*n += 1;
for off in s.0.iter() {
let (r, off): (u64, usize) = (*off).into();
unsafe {
let ptr = page.data.as_ptr().add(off);
let size = entry_size::<K, V>(ptr);
// Reserve the space on the page
let hdr = header_mut(new);
let data = hdr.data() as u16;
let off_new = data - size as u16;
hdr.set_data(off_new);
hdr.set_n(hdr.n() + 1);
if hdr.is_leaf() {
hdr.incr(2 + size);
} else {
hdr.incr(8 + size);
// Set the offset to this new entry in the offset
// array, along with the right child page.
let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
*ptr = (r | off_new as u64).to_le();
}
}
fn alloc<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let size = alloc_size(k0, v0);
let off_new = L::alloc_insert::<K, V>(new, n, size, r);
unsafe {
let new_ptr = new.0.data.add(off_new);
k0.write_to_page(new_ptr);
let ks = k0.size();
let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));
v0.write_to_page(v_ptr);
*n += 1;
let b = {
let hdr = &*header(m.other.as_page());
if hdr.is_dirty() {
1
} else {
0
}
};
// We extend the pointer's lifetime here, because we know the
// current deletion (we only rebalance during deletions) won't
// touch this page anymore after this.
let k = unsafe { core::mem::transmute(k) };
let v = unsafe { core::mem::transmute(v) };
// If this frees the old "other" page, add it to the "freed"
// array.
let is_dirty = m.other.is_dirty();
pub(crate) fn rebalance_right<
'a,
T: AllocPage,
K: UnsizedStorable + ?Sized + core::fmt::Debug,
V: UnsizedStorable + ?Sized + core::fmt::Debug,
L: Alloc,
>(
// Surprisingly, the `rebalance_right` function is simpler,
// since:
//
// - if we call it to rebalance two internal nodes, we're in the easy
// case of rebalance_left.
//
// - Else, the middle element is the last one on the left page, and
// isn't erased be leaf deletions, because these just move entries to
// the left.
//
// This implementation is shared with the sized one.
pub(crate) fn rebalance_right<'a, T: AllocPage, K: ?Sized, V: ?Sized, P: BTreeMutPage<K, V>>(
let lc = super::PageCursor::last(m.other.as_page());
let (k0, v0, r0) = <Page<K, V>>::current(txn, m.other.as_page(), &lc).unwrap();
let lc = P::cursor_last(&m.other);
let (k0, v0, r0) = P::current(txn, m.other.as_page(), &lc).unwrap();
let new_right = if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
// Add the middle element of the concatenation as the first
// element of the right page. We know the right page is mutable,
// since we just modified it (hence the `assert_eq!(freed, 0)`.
let new_right = if let Put::Ok(Ok { freed, page }) = P::put(
let b = {
let hdr = &*header(m.other.as_page());
if hdr.is_dirty() {
1
} else {
0
}
};
let (new_left, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &lc, 0)?;
let is_dirty = m.other.is_dirty();
let (new_left, freed) = P::del(txn, m.other, m.other_is_mutable, &lc, 0)?;
let is_dirty = hdr.is_dirty();
debug!("put {:?} {:?} {:?}", u, mutable, is_dirty);
if mutable && is_dirty && L::can_alloc(header(page.as_page()), size) {
debug!("can alloc");
if mutable && page.is_dirty() && L::can_alloc(header(page.as_page()), size, n_ins) {
// First, if the page is dirty and mutable, mark it mutable.
} else if L::can_compact(hdr, size_replaced) {
} else if L::can_compact(hdr, size_replaced, n_ins) {
// Allocate a new page, split the offsets at the position of
// the insertion, and each side of the split onto the new
// page, with the insertion between them.
// There are two synchronised counters here: `hdr.n()` and
// `n`. The reason for this is that operations on `hdr.n()` are
// somewhat cumbersome, as they might involve extra operations to
// convert to/from the little-endian encoding of `hdr.n()`.
alloc::<K, V, L>(current_page, k0, v0, 0, l0, &mut n);
total += alloc_size(k0, v0) + L::extra_size();
alloc::<K, V, L>(current_page, k1, v1, 0, r0, &mut n);
total += alloc_size(k1, v1) + L::extra_size();
} else {
alloc::<K, V, L>(current_page, k0, v0, l0, r0, &mut n);
total += alloc_size(k0, v0) + L::extra_size();
}
if replace {
// As explained in the documentation for `put` in the
// definition of `BTreeMutPage`, `l0` and `r0` are the
// left and right children of k1v1, since this means
// the right child of a deleted entry has split, and
// we must replace the deleted entry with `(k0, v0)`.
L::alloc_write(current_page, k0, v0, 0, l0, &mut n);
total += alloc_size(k0, v0) + L::OFFSET_SIZE;
L::alloc_write(current_page, k1, v1, 0, r0, &mut n);
total += alloc_size(k1, v1) + L::OFFSET_SIZE;
// Replacing the next element:
debug_assert!(replace);
let (r, off): (u64, u16) = (*off).into();
// Then, i.e. either if we're not at the position of an
// insertion, or else if we're not replacing the current
// position, just copy the current entry to the appropriate
// page (left or right).
let (r, off): (u64, usize) = (*off).into();
if is_left && total >= PAGE_SIZE / 2 {
is_left = false;
split = read::<T, K, V>(txn, ptr);
// If the left side of the split is at least half-full, we
// know that we can do all the insertions we need on the
// right-hand side (even if there are two of them, and the
// replaced value is of size 0), so we switch.
if split.is_none() && total >= PAGE_SIZE / 2 {
// Don't write the next entry onto any page, keep it
// as the split entry.
let (k, v) = read::<T, K, V>(txn, ptr);
split = Some((K::from_raw_ptr(txn, k), V::from_raw_ptr(txn, v)));
// Set the entry count for the current page, before
// switching and resetting the counter.
header_mut(current_page).set_n(n as u16);
// And set the leftmost child of the right page to
// `r`.
let off_new = L::alloc(header_mut(current_page), size, K::ALIGN.max(V::ALIGN));
// Else, we're simply allocating a new entry on the
// current page.
let off_new = {
let hdr = header_mut(current_page);
let data = hdr.data();
assert!(
HDR + hdr.n() as usize * L::OFFSET_SIZE + L::OFFSET_SIZE + size
< data as usize
);
// Move the data pointer `size` bytes to the left.
hdr.set_data(data - size as u16);
// And increase the number of entries, and
// occupied size of the page.
hdr.incr(size + L::OFFSET_SIZE);
data - size as u16
};
// Copy the entry.
fn can_alloc(hdr: &Header, size: usize) -> bool;
fn can_compact(hdr: &Header, size: isize) -> bool;
fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16;
/// Can we allocate an entry of `size` bytes, where `size` doesn't
/// include the offset bytes?
fn can_alloc(hdr: &Header, size: usize, n: usize) -> bool {
(HDR as usize) + (hdr.n() as usize) * Self::OFFSET_SIZE + n * Self::OFFSET_SIZE + size
<= hdr.data() as usize
}
/// If we cloned the page, could we allocate an entry of `size`
/// bytes, where `size` doesn't include the offset bytes?
fn can_compact(hdr: &Header, size: isize, n: usize) -> bool {
(HDR as isize)
+ (hdr.n() as isize) * Self::OFFSET_SIZE as isize
+ ((hdr.left_page() & 0xfff) as isize)
+ (n * Self::OFFSET_SIZE) as isize
+ size
<= PAGE_SIZE as isize
}
/// Set the right child of the `n`th entry to `r`. If `n == -1`,
/// this method can be used to set the leftmost child of a page.
fn set_right_child(_new: &mut MutPage, _n: isize, _r: u64) {}
// n = number of items to remove
fn truncate_left<T, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)>;
/// Set the n^th offset on the page to `r`, which encodes a page
/// offset in its 52 MSBs, and an offset on the page in its 12 LSBs.
fn set_offset(new: &mut MutPage, n: isize, r: u64);
fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
new: &mut MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize;
fn set_offset(new: &mut MutPage, n: isize, r: u64, off: u16);
fn set_right_child(new: &mut MutPage, n: isize, r: u64);
type Offset: Into<(u64, u16)> + Copy + core::fmt::Debug;
fn offset_slice<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
) -> Offsets<'a, Self::Offset>;
fn kv<'a, T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
page: crate::Page,
off: (u64, u16),
) -> (&'a K, &'a V, u64);
/// The type of offsets, u64 in internal nodes, u16 in leaves.
type Offset: Into<(u64, usize)> + Copy;
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset>;
impl<'a, A: Into<(u64, u16)> + Copy> Offsets<'a, A> {
pub fn split_at(&self, mid: usize) -> (Self, Self) {
match self {
Offsets::Slice(s) if mid < s.len() => {
debug!("split_at: {:?} {:?}", s.len(), mid);
let (a, b) = s.split_at(mid);
(Offsets::Slice(a), Offsets::Slice(b))
}
Offsets::Slice(s) => {
debug_assert_eq!(mid, s.len());
(Offsets::Slice(s), Offsets::Slice(&[][..]))
}
impl<'a, A: Copy> Offsets<'a, A> {
pub fn split_at(self, mid: usize) -> (Self, Self) {
if mid < self.0.len() {
let (a, b) = self.0.split_at(mid);
(Offsets(a), Offsets(b))
} else {
debug_assert_eq!(mid, self.0.len());
(self, Offsets(&[][..]))
2 + entry_size::<K, V>(page.data.as_ptr().add(u16::from_le(
*(page.data.as_ptr().add(HDR) as *const u16).offset(cur),
) as usize))
debug_assert!(cur >= 0);
let off = *(page.data.as_ptr().add(HDR + 2 * cur as usize) as *const u16);
Self::OFFSET_SIZE
+ entry_size::<K, V>(page.data.as_ptr().add(u16::from_le(off) as usize))
fn can_alloc(hdr: &Header, size: usize) -> bool {
debug!(
"can_alloc, leaf: {:?} {:?} {:?} {:?}",
HDR,
hdr.n() * 2,
hdr.data(),
size
);
(HDR as usize) + (hdr.n() as usize) * 2 + 2 + size <= hdr.data() as usize
}
fn can_compact(hdr: &Header, size: isize) -> bool {
debug!(
"can_compact, leaf: {:?} {:?} {:?}",
HDR,
hdr.left_page() & 0xfff,
size
);
(HDR as isize) + (hdr.n() as isize) * 2 + ((hdr.left_page() & 0xfff) as isize) + 2 + size
<= PAGE_SIZE as isize
}
fn truncate_left<T, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
_txn: &T,
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)> {
debug!("truncate_left {:?} {:?}", page, n);
let hdr_n = header_mut(page).n();
fn set_offset(new: &mut MutPage, n: isize, off: u64) {
core::ptr::copy(
page.0.data.add(HDR + n * 2),
page.0.data.add(HDR),
(hdr_n as usize - n) * 2,
);
}
let deleted_offsets =
unsafe { core::slice::from_raw_parts(page.0.data.add(HDR) as *const u16, n as usize) };
let deleted_size: u64 = deleted_offsets
.iter()
.map(|&off| 2 + unsafe { entry_size::<K, V>(page.0.data.add(off as usize)) } as u64)
.sum();
let hdr = header_mut(page);
hdr.set_n(hdr_n - n as u16);
hdr.decr(deleted_size as usize);
None
}
fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16 {
assert_eq!(size % align, 0);
let data = hdr.data();
assert!(HDR + hdr.n() as usize * 2 + 2 + size < data as usize);
hdr.set_data(data - size as u16);
hdr.set_n(hdr.n() + 1);
hdr.incr(size);
data - size as u16
}
fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
new: &mut MutPage,
n: &mut isize,
size: usize,
_: u64,
) -> usize {
let (hdr_n, off_new) = {
let hdr = header_mut(new);
(
hdr.n(),
Self::alloc(&mut *hdr, size, K::ALIGN.max(V::ALIGN)),
)
};
// Making space for the new offset
unsafe {
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * 2),
new.0.data.add(HDR + (*n as usize) * 2 + 2),
(hdr_n as usize - (*n as usize)) * 2,
);
}
Self::set_offset(new, *n, 0, off_new);
off_new as usize
}
fn set_offset(new: &mut MutPage, n: isize, _: u64, off: u16) {
unsafe {
fn offset_slice<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
) -> Offsets<'a, Self::Offset> {
let hdr = header(page);
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
let hdr_n = header(page).n();
fn kv<'a, T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
page: crate::Page,
(_, off): (u64, u16),
) -> (&'a K, &'a V, u64) {
unsafe {
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
(K::from_raw_ptr(txn, k), V::from_raw_ptr(txn, v), 0)
}
}
fn can_alloc(hdr: &Header, size: usize) -> bool {
debug!(
"can_alloc, internal: {:?} {:?} {:?} {:?}",
HDR,
hdr.n() * 8,
hdr.data(),
size
);
(HDR as usize) + (hdr.n() as usize) * 8 + 8 + size <= hdr.data() as usize
}
fn can_compact(hdr: &Header, size: isize) -> bool {
debug!(
"can_compact, internal: {:?} {:?} {:?}",
HDR,
hdr.left_page() & 0xfff,
size
);
(HDR as isize) + (hdr.n() as isize) * 8 + ((hdr.left_page() & 0xfff) as isize) + 8 + size
<= PAGE_SIZE as isize
}
fn truncate_left<T, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
_txn: &T,
page: &mut MutPage,
n: usize,
) -> Option<(*const K, *const V)> {
// The following line copies the left child of the last entry
// (hence the `- 8` and `- 1`)
let hdr_n = header_mut(page).n();
/// Set the right-hand child in the offset array, preserving the
/// current offset.
fn set_right_child(page: &mut MutPage, n: isize, r: u64) {
core::ptr::copy(
page.0.data.add(HDR + (n - 1) * 8),
page.0.data.add(HDR - 8),
(hdr_n as usize - n + 1) * 8,
);
debug_assert_eq!(r & 0xfff, 0);
let ptr = page.0.data.offset(HDR as isize + n * 8) as *mut u64;
let off = u64::from_le(*ptr) & 0xfff;
*ptr = (r | off as u64).to_le();
let size = {
let offsets = unsafe {
core::slice::from_raw_parts(
page.0.data.add(HDR + n * 8) as *const u16,
hdr_n as usize - n as usize,
)
};
offsets
.iter()
.map(|&off| 8 + unsafe { entry_size::<K, V>(page.0.data.add(off as usize)) } as u64)
.sum()
};
let hdr = header_mut(page);
hdr.set_n(hdr_n - n as u16);
debug!("truncate_left {:?} {:?}", hdr.left_page(), size);
hdr.set_occupied(size);
None
}
fn alloc(hdr: &mut Header, size: usize, align: usize) -> u16 {
assert_eq!(size % align, 0);
let data = hdr.data();
assert!(HDR + hdr.n() as usize * 8 + 8 + size <= data as usize);
hdr.set_data(data - size as u16);
hdr.set_n(hdr.n() + 1);
hdr.incr(size);
data - size as u16
}
fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
new: &mut MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize {
let (hdr_n, off_new) = {
let hdr = header_mut(new);
(
hdr.n(),
Self::alloc(&mut *hdr, size, K::ALIGN.max(V::ALIGN)),
)
};
unsafe {
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * 8),
new.0.data.add(HDR + (*n as usize) * 8 + 8),
(hdr_n as usize - (*n as usize)) * 8,
);
}
Self::set_offset(new, *n, r, off_new);
off_new as usize
fn offset_slice<'a, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
page: crate::Page<'a>,
) -> Offsets<'a, Self::Offset> {
let hdr = header(page);
debug!("internal page {:?} {:?}", page, hdr.n());
fn offset_slice<'a>(page: crate::Page<'a>) -> Offsets<'a, Self::Offset> {
let hdr_n = header(page).n() as usize;
}
}
fn kv<'a, T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
txn: &T,
page: crate::Page,
(r, off): (u64, u16),
) -> (&'a K, &'a V, u64) {
unsafe {
let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
(K::from_raw_ptr(txn, k), V::from_raw_ptr(txn, v), r)
}
}
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Leaf {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
}
impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> AllocWrite<K, V> for Internal {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
}
// Allocate a new entry and copy (k0, v0) into the new slot.
fn alloc_write<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let size = alloc_size(k0, v0);
let off_new = alloc_insert::<K, V, L>(new, n, size, r);
unsafe {
let new_ptr = new.0.data.add(off_new);
k0.write_to_page(new_ptr);
let ks = k0.size();
let v_ptr = new_ptr.add((ks + V::ALIGN - 1) & !(V::ALIGN - 1));
v0.write_to_page(v_ptr);
}
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
/// Reserve space on the page for `size` bytes of data (i.e. not
/// counting the offset), set the right child of the new position
/// to `r`, add the offset to the new data to the offset array,
/// and return the new offset.
fn alloc_insert<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
new: &mut MutPage,
n: &mut isize,
size: usize,
r: u64,
) -> usize {
let hdr = header_mut(new);
let data = hdr.data();
// If we're here, the following assertion has been checked by the
// `can_alloc` method.
debug_assert!(HDR + (hdr.n() as usize + 1) * L::OFFSET_SIZE + size <= data as usize);
hdr.set_data(data - size as u16);
hdr.set_n(hdr.n() + 1);
hdr.incr(L::OFFSET_SIZE + size);
let off_new = data - size as u16;
let hdr_n = hdr.n();
// Making space for the new offset
unsafe {
core::ptr::copy(
new.0.data.add(HDR + (*n as usize) * L::OFFSET_SIZE),
new.0
.data
.add(HDR + (*n as usize) * L::OFFSET_SIZE + L::OFFSET_SIZE),
(hdr_n as usize - (*n as usize)) * L::OFFSET_SIZE,
);
//! length `n` of Little-Endian-encoded `u64`, where the 12 least
//! significant bits of each `u64` are an offset to a `Tuple<K, V>` in
//! length `n` of Little-Endian-encoded [`u64`], where the 12 least
//! significant bits of each [`u64`] are an offset to a `Tuple<K, V>` in
}
fn current_size(_page: crate::Page, c: &Self::Cursor) -> usize {
assert!(c.cur >= 0 && c.cur < c.total as isize);
if c.is_leaf {
core::mem::size_of::<Tuple<K, V>>()
} else {
8 + core::mem::size_of::<Tuple<K, V>>()
}
put::put::<_, _, _, Leaf>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
0,
0,
)
put::put::<_, _, _, Leaf>(txn, page, mutable, replace, c.cur as usize, k0, v0, 0, 0)
put::put::<_, _, _, Internal>(
txn,
page,
mutable,
replace,
c.cur as usize,
k0,
v0,
k1v1,
l,
r,
)
put::put::<_, _, _, Internal>(txn, page, mutable, replace, c.cur as usize, k0, v0, l, r)
/// Perform the modifications on a page, by copying it onto page `new`.
fn modify<T: LoadPage, K: Storable, V: Storable, L: Alloc>(
txn: &T,
new: &mut MutPage,
m: &mut ModifiedPage<K, V, Page<K, V>>,
n: &mut isize,
) {
// First trick: in order to save code lines, set `l` to the left
// page, and let `alloc` set it in `new` in the while loop
// (setting `l = 0` in subsequent iterations).
let mut l = <Page<K, V>>::left_child(m.page.as_page(), &m.c0);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.page.as_page(), &mut m.c0) {
alloc::<_, _, L>(new, k, v, l, r, n);
l = 0;
}
// If there's an insertion on two in the modified page, do them.
if let Some((k, v)) = m.ins {
if let Some((k2, v2)) = m.ins2 {
alloc::<_, _, L>(new, k, v, 0, 0, n);
alloc::<_, _, L>(new, k2, v2, m.l, m.r, n);
} else {
alloc::<_, _, L>(new, k, v, m.l, m.r, n);
}
} else {
// If there's no insertion, we have had no opportunity to set
// the updated left chlid, so set it here.
l = m.l
}
// Then, clone `m.c1`, possibly skipping the first element.
let mut c1 = m.c1.clone();
if m.skip_first {
<Page<K, V>>::move_next(&mut c1);
}
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.page.as_page(), &mut c1) {
alloc::<_, _, L>(new, k, v, l, r, n);
l = 0;
}
}
/// The very unsurprising `merge` function
fn merge<
T: AllocPage + LoadPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc,
>(
txn: &mut T,
mut m: Concat<K, V, Page<K, V>>,
) -> Result<(MutPage, [u64; 2]), T::Error> {
// This algorithm is probably a bit naive, especially for leaves,
// where if the left page is mutable, we can copy the other page
// onto it.
// Indeed, we first allocate a page, then clone both pages onto
// it, in a different order depending on whether the modified page
// is the left or the right child.
let mut new = txn.alloc_page()?;
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
let mut n = 0;
if m.mod_is_left {
modify::<_, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
let mut rc = PageCursor::new(&m.other, 0);
let l = <Page<K, V>>::left_child(m.other.as_page(), &rc);
alloc::<_, _, L>(&mut new, m.mid.0, m.mid.1, 0, l, &mut n);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.other.as_page(), &mut rc) {
alloc::<_, _, L>(&mut new, k, v, 0, r, &mut n);
}
} else {
let mut rc = PageCursor::new(&m.other, 0);
let mut l = <Page<K, V>>::left_child(m.other.as_page(), &rc);
while let Some((k, v, r)) = <Page<K, V>>::next(txn, m.other.as_page(), &mut rc) {
alloc::<_, _, L>(&mut new, k, v, l, r, &mut n);
l = 0;
}
alloc::<_, _, L>(&mut new, m.mid.0, m.mid.1, 0, 0, &mut n);
modify::<_, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
}
let freed = {
let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
let b1 = if m.other.is_dirty() { 1 } else { 0 };
[m.modified.page.offset | b0, m.other.offset | b1]
};
Ok((new, freed))
}
/// Simply allocate an entry in the page, copy it, and set
/// its right and left children.
fn alloc<K: Storable, V: Storable, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let off_new = L::alloc_insert::<K, V>(new, n, r);
unsafe {
let new_ptr = &mut *(new.0.data.add(off_new as usize) as *mut Tuple<K, V>);
core::ptr::copy_nonoverlapping(k0, &mut new_ptr.k, 1);
core::ptr::copy_nonoverlapping(v0, &mut new_ptr.v, 1);
}
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
freed: freed_,
})
}
// Surprisingly, the `rebalance_right` function is simpler,
// since:
//
// - if we call it to rebalance two internal nodes, we're in the easy
// case of rebalance_left.
//
// - Else, the middle element is the last one on the left page, and
// isn't erased be leaf deletions, because these just move entries to
// the left.
pub(crate) fn rebalance_right<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc,
>(
txn: &mut T,
m: Concat<'a, K, V, Page<K, V>>,
) -> Result<Op<'a, T, K, V>, T::Error> {
assert!(!m.mod_is_left);
// Take the last element of the left page.
let lc = PageCursor::last(m.other.as_page());
let (k0, v0, r0) = <Page<K, V>>::current(txn, m.other.as_page(), &lc).unwrap();
// First element of the right page.
let rc = super::PageCursor::new(&m.modified.page, 0);
let rl = <Page<K, V>>::left_child(m.modified.page.as_page(), &rc);
let mut freed_ = [0, 0];
// Perform the modification on the modified page.
let new_right = if let Some((k, v)) = m.modified.ins {
let is_dirty = m.modified.page.is_dirty();
if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
txn,
m.modified.page,
m.modified.mutable,
m.modified.skip_first,
&m.modified.c1,
k,
v,
m.modified.ins2,
m.modified.l,
m.modified.r,
)? {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
page
} else {
unreachable!()
}
} else {
let is_dirty = m.modified.page.is_dirty();
let (page, freed) = <Page<K, V>>::del(
txn,
m.modified.page,
m.modified.mutable,
&m.modified.c1,
m.modified.l,
)?;
if freed > 0 {
let b = if is_dirty { 1 } else { 0 };
freed_[0] = freed | b;
}
page
};
// Add the middle element of the concatenation as the first
// element of the right page. We know the right page is mutable,
// since we just modified it (hence the `assert_eq!(freed, 0)`.
let new_right = if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
txn,
new_right.0,
true,
false,
&rc,
m.mid.0,
m.mid.1,
None,
r0,
rl,
)? {
assert_eq!(freed, 0);
page
} else {
unreachable!()
};
// As explained in the general comment on this function, this
// entry isn't erased by the deletion in `m.other` below, so we
// can safely extend its lifetime.
let k = unsafe { core::mem::transmute(k0) };
let v = unsafe { core::mem::transmute(v0) };
let is_dirty = m.other.is_dirty();
let (new_left, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &lc, 0)?;
if freed > 0 {
freed_[1] = freed | if is_dirty { 1 } else { 0 }
}
Ok(Op::Rebalanced {
l: new_left.0.offset,
r: new_right.0.offset,
k,
v,
incr_kv_rc: !m.modified.mutable,
let is_dirty = hdr.is_dirty();
if mutable && is_dirty && L::can_alloc::<K, V>(header(page.as_page()), size) {
let is_dirty = page.is_dirty();
if mutable && is_dirty && L::can_alloc::<K, V>(header(page.as_page())) {
// If the page is mutable (i.e. not shared with another tree)
// and dirty, here's what we do:
if let Some((k1, v1)) = k1v1 {
alloc::<_, _, L>(&mut page, k0, v0, 0, 0, &mut n);
alloc::<_, _, L>(&mut page, k1, v1, l, r, &mut n);
} else {
debug!("lrn = {:?} {:?} {:?}", l, r, n);
alloc::<_, _, L>(&mut page, k0, v0, l, r, &mut n);
}
// Do the actual insertions.
L::alloc_write(&mut page, k0, v0, l, r, &mut n);
// No page was freed, return the page.
debug!("alloc: {:?} {:?} {:?}", l, r, n);
if let Some((k1, v1)) = k1v1 {
alloc::<_, _, L>(&mut new, k0, v0, 0, 0, &mut n);
alloc::<_, _, L>(&mut new, k1, v1, l, r, &mut n);
} else {
alloc::<_, _, L>(&mut new, k0, v0, l, r, &mut n);
}
debug!("alloc: {:?}", new);
debug!("alloc: {:?}", header(new.0.as_page()).left_page());
L::alloc_write(&mut new, k0, v0, l, r, &mut n);
let s = core::mem::size_of::<Tuple<K, V>>();
assert!(k1v1.is_none());
split::<_, _, _, L>(txn, page, mutable, s, u, k0, v0, l, r)
// Else, split the page.
split::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r)
if mutable && hdr.is_dirty() {
right = unsafe { core::mem::transmute(page) };
// Then, for the right page:
if mutable && page.is_dirty() {
// If we can mutate the original page, remove the first
// `k` entries, and return a pointer to the split key (at
// position `k` on the page)
right = MutPage(page);
fn can_alloc<K: Storable, V: Storable>(hdr: &Header, n: usize) -> bool {
(HDR as usize) + (hdr.n() as usize) * 8 + n * (8 + core::mem::size_of::<Tuple<K, V>>())
fn can_alloc<K: Storable, V: Storable>(hdr: &Header) -> bool {
(HDR as usize) + (hdr.n() as usize + 1) * 8 + core::mem::size_of::<Tuple<K, V>>()
impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Internal {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
}
impl<K: Storable, V: Storable> crate::btree::page_unsized::AllocWrite<K, V> for Leaf {
fn alloc_write(new: &mut MutPage, k0: &K, v0: &V, l: u64, r: u64, n: &mut isize) {
alloc_write::<K, V, Self>(new, k0, v0, l, r, n)
}
}
/// Simply allocate an entry in the page, copy it, and set its right
/// and left children.
fn alloc_write<K: Storable, V: Storable, L: Alloc>(
new: &mut MutPage,
k0: &K,
v0: &V,
l: u64,
r: u64,
n: &mut isize,
) {
let off_new = L::alloc_insert::<K, V>(new, n, r);
unsafe {
let new_ptr = &mut *(new.0.data.add(off_new as usize) as *mut Tuple<K, V>);
core::ptr::copy_nonoverlapping(k0, &mut new_ptr.k, 1);
core::ptr::copy_nonoverlapping(v0, &mut new_ptr.v, 1);
}
if l > 0 {
L::set_right_child(new, *n - 1, l);
}
*n += 1;
}
///
/// Makes the assumption that `k1v1.is_some()` implies
/// `replace`. The "double insertion" is only ever used when
/// deleting, and when the right child of a deleted entry (in an
/// internal node) has split while we were looking for a
/// replacement for the deleted entry.
///
/// Since we only look for replacements in the right child, the
/// left child of the insertion isn't modified, in which case `l`
/// and `r` are interpreted as the left and right child of the new
/// entry, `k1v1`.