use super::header::{header, header_mut, HDR};
use super::*;

pub(super) async fn put<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc + AllocWrite<K, V>,
>(
    txn: &mut T,
    page: CowPage,
    mutable: bool,
    replace: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l: u64,
    r: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    // Size of the new insertions, not counting the offsets.
    let size = alloc_size(k0, v0)
        + if let Some((k1, v1)) = k1v1 {
            alloc_size(k1, v1)
        } else {
            0
        };
    // Sized occupied by the data in these insertions (not counting
    // the offsets), if we need to compact the page.
    let size_replaced = if replace {
        // We're always in an internal node if we replace.
        let cur_size = Internal::current_size::<K, V>(page.as_page(), u as isize);
        // Since `size` isn't counting the size of the 64-bit offset,
        // and `cur_size` is, we need to add 8.
        8 + size as isize - cur_size as isize
    } else {
        size as isize
    };
    // Number of extra offsets.
    let n_ins = {
        let n_ins = if k1v1.is_some() { 2 } else { 1 };
        if replace {
            n_ins - 1
        } else {
            n_ins
        }
    };
    let hdr = header(page.as_page());
    if mutable && page.is_dirty() && L::can_alloc(header(page.as_page()), size, n_ins) {
        // First, if the page is dirty and mutable, mark it mutable.
        let mut page = MutPage(page);
        let mut n = u as isize;

        // If we replace, we first need to "unlink" the previous
        // value, by erasing its offset.
        if replace {
            let p = page.0.data;
            let hdr = header_mut(&mut page);
            // In this case, we know we're not in a leaf, so offsets
            // are of size 8.
            assert!(!hdr.is_leaf());
            unsafe {
                let ptr = p.add(HDR + n as usize * 8) as *mut u64;
                let off = (u64::from_le(*ptr) & 0xfff) as usize;
                let kv_ptr = p.add(off);
                let size = entry_size::<K, V>(kv_ptr);
                core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - n as usize - 1);
                // Decreasing these figures here, we'll increase them
                // again in the calls to `alloc_write` below.
                hdr.decr(8 + size);
                hdr.set_n(hdr.n() - 1);
            }
        }
        // Do the actual insertions.
        if let Some((k1, v1)) = k1v1 {
            L::alloc_write(txn, &mut page, k0, v0, 0, 0, &mut n).await;
            L::alloc_write(txn, &mut page, k1, v1, l, r, &mut n).await;
        } else {
            L::alloc_write(txn, &mut page, k0, v0, l, r, &mut n).await;
        }
        // Return: no page was freed.
        Ok(Put::Ok(Ok { page, freed: 0 }))
    } else if L::can_compact(hdr, size_replaced, n_ins) {
        // Allocate a new page, split the offsets at the position of
        // the insertion, and each side of the split onto the new
        // page, with the insertion between them.
        let mut new = unsafe { txn.alloc_page().await? };
        <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);

        // Clone the leftmost child.
        L::set_right_child(&mut new, -1, hdr.left_page() & !0xfff);

        // Split the offsets.
        let s = L::offset_slice(page.as_page());
        let (s0, s1) = s.split_at(u as usize);
        // Drop the first element of `s1` if we're replacing it.
        let s1 = if replace { s1.split_at(1).1 } else { s1 };

        // Finally, clone and insert.
        let mut n = 0;
        clone::<K, V, L>(page.as_page(), &mut new, s0, &mut n);
        if let Some((k1, v1)) = k1v1 {
            L::alloc_write(txn, &mut new, k0, v0, 0, l, &mut n).await;
            L::alloc_write(txn, &mut new, k1, v1, 0, r, &mut n).await;
        } else {
            L::alloc_write(txn, &mut new, k0, v0, l, r, &mut n).await;
        }
        clone::<K, V, L>(page.as_page(), &mut new, s1, &mut n);

        // And return, freeing the old version of the page.
        Ok(Put::Ok(Ok {
            page: new,
            freed: page.offset | if page.is_dirty() { 1 } else { 0 },
        }))
    } else {
        split_unsized::<_, _, _, L>(txn, page, replace, u, k0, v0, k1v1, l, r).await
    }
}

// Surprisingly, the following function is simpler than in the sized
// case, because we can't be too efficient in this case, since we have
// to go through all the elements linearly anyway to get their sizes.
async fn split_unsized<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc + AllocWrite<K, V>,
>(
    txn: &mut T,
    page: CowPage,
    replace: bool,
    u: usize,
    k0: &'a K,
    v0: &'a V,
    k1v1: Option<(&'a K, &'a V)>,
    l0: u64,
    r0: u64,
) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
    let hdr = header(page.as_page());
    let mut left = unsafe { txn.alloc_page().await? };
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut left);

    let mut right = unsafe { txn.alloc_page().await? };
    <Page<K, V> as BTreeMutPage<K, V>>::init(&mut right);

    // Clone the leftmost child. If we're inserting at position 0,
    // this means this leftmost child must, and will be replaced.
    let left_child = hdr.left_page() & !0xfff;
    L::set_right_child(&mut left, -1, left_child);

    // Pointer to the split key and value.
    let mut split = None;

    // We start filling the left page, and we will change to filling
    // the right page when the left one is at least half-full.
    let mut current_page = &mut left;

    // There are two synchronised counters here: `hdr.n()` and
    // `n`. The reason for this is that operations on `hdr.n()` are
    // somewhat cumbersome, as they might involve extra operations to
    // convert to/from the little-endian encoding of `hdr.n()`.
    let mut n = 0;
    let s = L::offset_slice(page.as_page());
    let mut total = HDR;
    for (uu, off) in s.0.iter().enumerate() {
        // If the current position of the cursor is the insertion,
        // (i.e. `uu == u`), insert.
        if uu == u {
            // The following is tricky and makes assumptions on the
            // rest of the code. If we are inserting two elements
            // (i.e. if `k1v1.is_some()`), this means we're replacing
            // one on the page.
            header_mut(current_page).set_n(n as u16);
            if let Some((k1, v1)) = k1v1 {
                // As explained in the documentation for `put` in the
                // definition of `BTreeMutPage`, `l0` and `r0` are the
                // left and right children of k1v1, since this means
                // the right child of a deleted entry has split, and
                // we must replace the deleted entry with `(k0, v0)`.
                L::alloc_write(txn, current_page, k0, v0, 0, l0, &mut n).await;
                total += alloc_size(k0, v0) + L::OFFSET_SIZE;
                L::alloc_write(txn, current_page, k1, v1, 0, r0, &mut n).await;
                total += alloc_size(k1, v1) + L::OFFSET_SIZE;

                // Replacing the next element:
                debug_assert!(replace);
                continue;
            } else {
                L::alloc_write(txn, current_page, k0, v0, l0, r0, &mut n).await;
                total += alloc_size(k0, v0) + L::OFFSET_SIZE;
                if replace {
                    continue;
                }
            }
        }
        // Then, i.e. either if we're not at the position of an
        // insertion, or else if we're not replacing the current
        // position, just copy the current entry to the appropriate
        // page (left or right).
        let (r, off): (u64, usize) = (*off).into();
        assert_ne!(off, 0);
        unsafe {
            let ptr = Const(page.data.add(off));
            let size = entry_size::<K, V>(ptr.0);

            // If the left side of the split is at least half-full, we
            // know that we can do all the insertions we need on the
            // right-hand side (even if there are two of them, and the
            // replaced value is of size 0), so we switch.
            if split.is_none() && total >= PAGE_SIZE / 2 {
                // Don't write the next entry onto any page, keep it
                // as the split entry.
                let (k, v) = {
                    let (k, v) = read::<K, V>(ptr.0);
                    (Const(k), Const(v))
                };
                split = Some((K::from_raw_ptr(txn, k).await, V::from_raw_ptr(txn, v).await));
                // Set the entry count for the current page, before
                // switching and resetting the counter.
                header_mut(current_page).set_n(n as u16);

                // And set the leftmost child of the right page to
                // `r`.
                L::set_right_child(&mut right, -1, r);

                // Then, switch page and reset the counter.
                current_page = &mut right;
                n = 0;
            } else {
                // Else, we're simply allocating a new entry on the
                // current page.
                let off_new = {
                    let hdr = header_mut(current_page);
                    let data = hdr.data();
                    assert!(
                        HDR + hdr.n() as usize * L::OFFSET_SIZE + L::OFFSET_SIZE + size
                            < data as usize
                    );
                    // Move the data pointer `size` bytes to the left.
                    hdr.set_data(data - size as u16);
                    // And increase the number of entries, and
                    // occupied size of the page.
                    hdr.incr(size + L::OFFSET_SIZE);
                    data - size as u16
                };
                // Copy the entry.
                core::ptr::copy_nonoverlapping(
                    ptr.0,
                    current_page.0.data.offset(off_new as isize),
                    size,
                );
                // And set the offset.
                L::set_offset(current_page, n, r | (off_new as u64));
                n += 1;
            }
            total += size + L::OFFSET_SIZE;
        }
    }
    header_mut(current_page).set_n(n as u16);

    // Finally, it is possible that we haven't had a chance to do the
    // insertions yet: this is the case iff we're inserting at the end
    // of all the entries, so handle this now.
    if u == s.0.len() {
        if let Some((k1, v1)) = k1v1 {
            L::alloc_write(txn, current_page, k0, v0, 0, l0, &mut n).await;
            L::alloc_write(txn, current_page, k1, v1, 0, r0, &mut n).await;
        } else {
            L::alloc_write(txn, current_page, k0, v0, l0, r0, &mut n).await;
        }
    }
    let (split_key, split_value) = split.unwrap();
    Ok(Put::Split {
        split_key,
        split_value,
        left,
        right,
        freed: if page.is_dirty() {
            page.offset | 1
        } else {
            page.offset
        },
    })
}