use super::cursor::*;
use super::*;

// The implementation here is mostly the same as in the sized case,
// except for leaves, which makes it hard to refactor.
pub(crate) async fn rebalance_left<
    'a,
    T: AllocPage,
    K: UnsizedStorable + ?Sized + core::fmt::Debug,
    V: UnsizedStorable + ?Sized + core::fmt::Debug,
    L: Alloc,
>(
    txn: &mut T,
    m: Concat<'a, K, V, Page<K, V>>,
) -> Result<Op<'a, T, K, V>, T::Error> {
    assert!(m.mod_is_left);
    // First element of the right page. We'll rotate it to the left
    // page, i.e. return a middle element, and append the current
    // middle element to the left page.
    let rc = super::cursor::PageCursor::new(&m.other, 0);
    let rl = <Page<K, V>>::left_child(m.other.as_page(), &rc);
    let (k, v, r) = <Page<K, V>>::current(txn, m.other.as_page(), &rc)
        .await
        .unwrap();

    let mut freed_ = [0, 0];

    // First, perform the modification on the modified page, which we
    // know is the left page, and return the resulting mutable page
    // (the modified page may or may not be mutable before we do this).
    let new_left = if let Some((k, v)) = m.modified.ins {
        let is_dirty = m.modified.page.is_dirty();
        let new_left = if let Put::Ok(Ok { page, freed }) = <Page<K, V>>::put(
            txn,
            m.modified.page,
            m.modified.mutable,
            m.modified.skip_first,
            &m.modified.c1,
            k,
            v,
            m.modified.ins2,
            m.modified.l,
            m.modified.r,
        )
        .await?
        {
            if freed > 0 {
                let b = if is_dirty { 1 } else { 0 };
                freed_[0] = freed | b;
            }
            page
        } else {
            unreachable!()
        };
        // Append the middle element of the concatenation at the end
        // of the left page. We know the left page to be mutable by
        // now, and we also know there's enough space to do this.
        let lc = PageCursor::after(&new_left.0);
        if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
            txn, new_left.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl,
        )
        .await?
        {
            if freed > 0 {
                let b = if is_dirty { 1 } else { 0 };
                freed_[0] = freed | b;
            }
            page
        } else {
            unreachable!()
        }
    } else if m.modified.skip_first {
        let is_dirty = m.modified.page.is_dirty();
        let (page, freed) = <Page<K, V>>::del(
            txn,
            m.modified.page,
            m.modified.mutable,
            &m.modified.c1,
            m.modified.l,
        )
        .await?;
        if freed > 0 {
            let b = if is_dirty { 1 } else { 0 };
            freed_[0] = freed | b;
        }

        // Append the middle element of the concatenation at the end
        // of the left page. We know the left page to be mutable by
        // now, and we also know there's enough space to do this.
        let lc = PageCursor::after(&page.0);
        if let Put::Ok(Ok { freed, page }) =
            <Page<K, V>>::put(txn, page.0, true, false, &lc, m.mid.0, m.mid.1, None, 0, rl).await?
        {
            if freed > 0 {
                let b = if is_dirty { 1 } else { 0 };
                freed_[0] = freed | b;
            }
            page
        } else {
            unreachable!()
        }
    } else {
        // Append the middle element of the concatenation at the end
        // of the left page. We know the left page to be mutable by
        // now, and we also know there's enough space to do this.
        let is_dirty = m.modified.page.is_dirty();
        let lc = PageCursor::after(&m.modified.page);
        if let Put::Ok(Ok { freed, page }) = <Page<K, V>>::put(
            txn,
            m.modified.page,
            m.modified.mutable,
            false,
            &lc,
            m.mid.0,
            m.mid.1,
            None,
            0,
            rl,
        )
        .await?
        {
            if m.modified.l > 0 {
                assert_eq!(m.modified.r, 0);
                unsafe {
                    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur - 1);
                    *off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
                }
            } else if m.modified.r > 0 {
                unsafe {
                    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
                    *off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
                }
            }
            if freed > 0 {
                let b = if is_dirty { 1 } else { 0 };
                freed_[0] = freed | b;
            }
            page
        } else {
            unreachable!()
        }
    };

    // We extend the pointer's lifetime here, because we know the
    // current deletion (we only rebalance during deletions) won't
    // touch this page anymore after this.
    let k = unsafe { core::mem::transmute(k) };
    let v = unsafe { core::mem::transmute(v) };

    // If this frees the old "other" page, add it to the "freed"
    // array.
    let is_dirty = m.other.is_dirty();
    let (new_right, freed) = <Page<K, V>>::del(txn, m.other, m.other_is_mutable, &rc, r).await?;
    if freed > 0 {
        freed_[1] = if is_dirty { freed | 1 } else { freed }
    }
    Ok(Op::Rebalanced {
        l: new_left.0.offset,
        r: new_right.0.offset,
        k,
        v,
        freed: freed_,
    })
}

// Surprisingly, the `rebalance_right` function is simpler,
// since:
//
// - if we call it to rebalance two internal nodes, we're in the easy
// case of rebalance_left.
//
// - Else, the middle element is the last one on the left page, and
// isn't erased be leaf deletions, because these just move entries to
// the left.
//
// This implementation is shared with the sized one.
pub(crate) async fn rebalance_right<
    'a,
    T: AllocPage,
    K: ?Sized,
    V: ?Sized,
    P: BTreeMutPage<K, V, Cursor = super::PageCursor>,
>(
    txn: &mut T,
    m: Concat<'a, K, V, P>,
) -> Result<Op<'a, T, K, V>, T::Error> {
    assert!(!m.mod_is_left);
    // Take the last element of the left page.
    let lc = P::cursor_last(&m.other);
    let (k0, v0, r0) = P::current(txn, m.other.as_page(), &lc).await.unwrap();

    let mut freed_ = [0, 0];

    // Perform the modification on the modified page.
    let new_right = if let Some((k, v)) = m.modified.ins {
        let is_dirty = m.modified.page.is_dirty();
        let new_right = if let Put::Ok(Ok { page, freed }) = P::put(
            txn,
            m.modified.page,
            m.modified.mutable,
            m.modified.skip_first,
            &m.modified.c1,
            k,
            v,
            m.modified.ins2,
            m.modified.l,
            m.modified.r,
        )
        .await?
        {
            if freed > 0 {
                freed_[0] = if is_dirty { freed | 1 } else { freed };
            }
            page
        } else {
            unreachable!()
        };
        // Add the middle element of the concatenation as the first
        // element of the right page. We know the right page is
        // mutable, since we just modified it (hence the
        // `assert_eq!(freed, 0)`.

        // First element of the right page (after potential
        // modification by `put` above).
        let rc = P::cursor_first(&new_right.0);
        let rl = P::left_child(new_right.0.as_page(), &rc);
        if let Put::Ok(Ok { freed, page }) = P::put(
            txn,
            new_right.0,
            true,
            false,
            &rc,
            m.mid.0,
            m.mid.1,
            None,
            r0,
            rl,
        )
        .await?
        {
            debug_assert_eq!(freed, 0);
            page
        } else {
            unreachable!()
        }
    } else if m.modified.skip_first {
        let is_dirty = m.modified.page.is_dirty();
        let (page, freed) = P::del(
            txn,
            m.modified.page,
            m.modified.mutable,
            &m.modified.c1,
            m.modified.l,
        )
        .await?;
        if freed > 0 {
            freed_[0] = if is_dirty { freed | 1 } else { freed };
        }
        // Add the middle element of the concatenation as the first
        // element of the right page. We know the right page is
        // mutable, since we just modified it. Moreover, if it is
        // compacted by the `put` below, we know that the `del` didn't
        // free anything, hence we can reuse the slot 0..

        // First element of the right page (after potential
        // modification by `del` above).
        let rc = P::cursor_first(&page.0);
        let rl = P::left_child(page.0.as_page(), &rc);
        if let Put::Ok(Ok { freed, page }) = P::put(
            txn, page.0, true, false, &rc, m.mid.0, m.mid.1, None, r0, rl,
        )
        .await?
        {
            if freed > 0 {
                freed_[0] = if is_dirty { freed | 1 } else { freed };
            }
            page
        } else {
            unreachable!()
        }
    } else {
        let is_dirty = m.modified.page.is_dirty();
        let rc = P::cursor_first(&m.modified.page);
        let rl = P::left_child(m.modified.page.as_page(), &rc);
        if let Put::Ok(Ok { freed, page }) = P::put(
            txn,
            m.modified.page,
            m.modified.mutable,
            false,
            &rc,
            m.mid.0,
            m.mid.1,
            None,
            r0,
            rl,
        )
        .await?
        {
            // Update the left and right offsets. We know that at
            // least one of them is 0, but it can be either one (or
            // both), depending on what happened on the page below.
            //
            // Since we inserted an entry at the beginning of the
            // page, we need to add 1 to the index given by
            // `m.modified.c1.cur`.
            if m.modified.l > 0 {
                assert_eq!(m.modified.r, 0);
                unsafe {
                    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur);
                    *off = (m.modified.l | (u64::from_le(*off) & 0xfff)).to_le();
                }
            } else if m.modified.r > 0 {
                unsafe {
                    let off = (page.0.data.add(HDR) as *mut u64).offset(m.modified.c1.cur + 1);
                    *off = (m.modified.r | (u64::from_le(*off) & 0xfff)).to_le();
                }
            }
            if freed > 0 {
                freed_[0] = if is_dirty { freed | 1 } else { freed };
            }
            page
        } else {
            unreachable!()
        }
    };

    // As explained in the general comment on this function, this
    // entry isn't erased by the deletion in `m.other` below, so we
    // can safely extend its lifetime.
    let k = unsafe { core::mem::transmute(k0) };
    let v = unsafe { core::mem::transmute(v0) };

    let is_dirty = m.other.is_dirty();
    let (new_left, freed) = P::del(txn, m.other, m.other_is_mutable, &lc, 0).await?;
    if freed > 0 {
        freed_[1] = if is_dirty { freed | 1 } else { freed }
    }
    Ok(Op::Rebalanced {
        l: new_left.0.offset,
        r: new_right.0.offset,
        k,
        v,
        freed: freed_,
    })
}