use super::*;
pub(super) async fn put<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
mutable: bool,
replace: bool,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
let hdr = header(page.as_page());
let is_dirty = page.is_dirty();
if mutable && is_dirty && L::can_alloc::<K, V>(hdr) {
let mut page = MutPage(page);
let p = Mut(page.0.data);
let hdr = header_mut(&mut page);
let mut n = u as isize;
if replace {
assert!(!hdr.is_leaf());
unsafe {
let ptr = p.0.add(HDR + n as usize * 8) as *mut u64;
core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
hdr.decr(8 + core::mem::size_of::<Tuple<K, V>>());
hdr.set_n(hdr.n() - 1);
}
}
L::alloc_write(txn, &mut page, k0, v0, l, r, &mut n).await;
Ok(Put::Ok(Ok { page, freed: 0 }))
} else if replace || L::can_compact::<K, V>(hdr) {
let mut new = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);
let s = L::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(u as usize);
let s1 = if replace { s1.split_at(1).1 } else { s1 };
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
L::alloc_write(txn, &mut new, k0, v0, l, r, &mut n).await;
clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);
Ok(Put::Ok(Ok {
page: new,
freed: if is_dirty {
page.offset | 1
} else {
page.offset
},
}))
} else {
split::<_, _, _, L>(txn, page, mutable, u, k0, v0, l, r).await
}
}
async fn split<
'a,
T: AllocPage,
K: Storable + core::fmt::Debug,
V: Storable + core::fmt::Debug,
L: Alloc + super::super::page_unsized::AllocWrite<K, V>,
>(
txn: &mut T,
page: CowPage,
mutable: bool,
u: usize,
k0: &'a K,
v0: &'a V,
l: u64,
r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
let mut left;
let hdr = header(page.as_page());
let page_is_dirty = if page.is_dirty() { 1 } else { 0 };
let mut n = hdr.n();
let k = n / 2;
n += 1;
let s = L::offset_slice::<T, K, V>(page.as_page());
let (s0, s1) = s.split_at(k as usize);
let (mut split_key, mut split_value, mid_child, s1) = if u == k as usize {
(k0, v0, r, s1)
} else {
let (s1a, s1b) = s1.split_at(1);
let (k, v, r) = L::kv(page.as_page(), L::first::<K, V>(&s1a));
(k, v, r, s1b)
};
let mut freed = 0;
if u >= k as usize {
let mut right = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
if u > k as usize {
let mut n = 0;
let kk = u as usize - k as usize - 1;
let (s1a, s1b) = s1.split_at(kk);
L::set_right_child(&mut right, -1, mid_child);
clone::<K, V, L>(page.as_page(), &mut right, s1a, &mut n);
L::alloc_write(txn, &mut right, k0, v0, l, r, &mut n).await;
clone::<K, V, L>(page.as_page(), &mut right, s1b, &mut n);
} else {
clone::<K, V, L>(page.as_page(), &mut right, s1, &mut 0);
}
if mutable && page.is_dirty() {
left = MutPage(page);
let hdr = header_mut(&mut left);
hdr.set_n(k);
hdr.decr((n - 1 - k) as usize * core::mem::size_of::<Tuple<K, V>>());
} else {
left = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
let mut n = 0;
L::set_right_child(&mut left, -1, hdr.left_page() & !0xfff);
clone::<K, V, L>(page.as_page(), &mut left, s0, &mut n);
freed = page.offset | page_is_dirty
}
if u == k as usize {
L::set_right_child(&mut left, k as isize - 1, l);
L::set_right_child(&mut right, -1, mid_child);
}
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed,
})
} else {
left = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);
let ll = header(page.as_page()).left_page() & !0xfff;
L::set_right_child(&mut left, -1, ll);
let (s0a, s0b) = s0.split_at(u as usize);
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut left, s0a, &mut n);
L::alloc_write(txn, &mut left, k0, v0, l, r, &mut n).await;
clone::<K, V, L>(page.as_page(), &mut left, s0b, &mut n);
let mut right;
let freed;
if mutable && page.is_dirty() {
right = MutPage(page);
if let Some((k, v)) = L::truncate_left::<K, V>(&mut right, k as usize + 1) {
unsafe {
split_key = &*k;
split_value = &*v;
}
}
freed = 0;
} else {
right = unsafe { txn.alloc_page().await? };
<Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);
L::set_right_child(&mut right, -1, mid_child);
let mut n = 0;
clone::<K, V, L>(page.as_page(), &mut right, s1, &mut n);
freed = page.offset | page_is_dirty
}
Ok(Put::Split {
split_key,
split_value,
left,
right,
freed,
})
}
}