//! Insertions into a B tree.
use super::cursor::*;
use super::*;

/// The representation of the update to a page.
#[derive(Debug)]
pub struct Ok {
    /// The new page, possibly the same as the previous version.
    pub page: MutPage,
    /// The freed page, or 0 if no page was freed.
    pub freed: u64,
}

/// The result of an insertion, i.e. either an update or a split.
#[derive(Debug)]
pub enum Put<'a, K: ?Sized, V: ?Sized> {
    Ok(Ok),
    Split {
        split_key: &'a K,
        split_value: &'a V,
        left: MutPage,
        right: MutPage,
        freed: u64,
    },
}

/// Insert an entry into a database, returning `false` if and only if
/// the exact same entry (key *and* value) was already in the
/// database.
pub async fn put<
    T: AllocPage,
    K: Storable + ?Sized + Sync,
    V: Storable + ?Sized + Sync,
    P: BTreeMutPage<K, V>,
>(
    txn: &mut T,
    db: &mut Db_<K, V, P>,
    key: &K,
    value: &V,
) -> Result<bool, T::Error> {
    // Set the cursor to the first element larger than or equal to
    // `(key, value)`. We will insert at that position (where "insert"
    // is meant in the sense of Rust's `Vec::insert`.
    let mut cursor = unsafe { Cursor::new(txn, db).await? };
    if cursor.set(txn, key, Some(value)).await?.is_some() {
        // If `(key, value)` already exists in the tree, return.
        return Ok(false);
    }
    // Else, we are at a leaf, since cursors that don't find an
    // element go all the way to the leaves.
    //
    // We put on the leaf page, resulting in a `Put`, i.e. either
    // `Put::Ok` or `Put::Split`.
    let p = cursor.len(); // Save the position of the leaf cursor.
    let is_owned = p < cursor.first_rc_len();

    // Insert the key and value at the leaf, i.e. pop the top level of
    // the stack (the leaf) and insert in there.
    let cur = cursor.pop().unwrap();
    let put = P::put(
        txn,
        cur.page,
        is_owned,
        false,
        &cur.cursor,
        key,
        value,
        None,
        0,
        0,
    )
    .await?;

    // In both cases (`Ok` and `Split`), we need to walk up the tree
    // and update each node.

    // Moreover, since the middle elements of the splits may be on
    // pages that must be freed at the end of this call, we collect
    // them in the `free` array below, and free them only at the end
    // of this function.
    //
    // If we didn't hold on to these pages, they could be reallocated
    // in subsequent splits, and the middle element could be
    // lost. This is important when the middle element climbs up more
    // than one level (i.e. the middle element of the split of a leaf
    // is also the middle element of the split of the leaf's parent,
    // etc).
    let mut free = [0; N_CURSORS];
    db.db = put_cascade(txn, &mut cursor, put, &mut free)
        .await?
        .0
        .offset;
    unsafe {
        for f in &free[..p] {
            if *f & 1 != 0 {
                txn.decr_rc_owned((*f) ^ 1).await?;
            } else if *f > 0 {
                txn.decr_rc(*f).await?;
            }
        }
    }
    Ok(true)
}

async fn put_cascade<
    'a,
    T: AllocPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreeMutPage<K, V> + Send,
>(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    mut put: Put<'a, K, V>,
    free: &mut [u64; N_CURSORS],
) -> Result<MutPage, T::Error> {
    loop {
        match put {
            Put::Split {
                split_key,
                split_value,
                left,
                right,
                freed,
            } => {
                // Here, we are copying all children of the freed
                // page, except possibly the last freed page (which is
                // a child of the current node, if we aren't at a
                // leaf). This includes the split key/value, also
                // incremented in the following call:
                incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
                // Then, insert the split key/value in the relevant page:
                let is_owned = cursor.len() < cursor.first_rc_len();
                if let Some(cur) = cursor.pop() {
                    // In this case, there's a page above the page
                    // that just split (since we can pop the stack),
                    // so the page that just split isn't the root (but
                    // `cur` might be).
                    put = P::put(
                        txn,
                        cur.page,
                        is_owned,
                        false,
                        &cur.cursor,
                        split_key,
                        split_value,
                        None,
                        left.0.offset,
                        right.0.offset,
                    )
                    .await?;
                } else {
                    // No page above the split, so the root has just
                    // split. Insert the split key/value into a new
                    // page above the entire tree.
                    let mut p = unsafe { txn.alloc_page().await? };
                    let cursor = P::cursor_first(&p.0);
                    P::init(&mut p);
                    if let Put::Ok(p) = P::put(
                        txn,
                        p.0,
                        true,
                        false,
                        &cursor,
                        split_key,
                        split_value,
                        None,
                        left.0.offset,
                        right.0.offset,
                    )
                    .await?
                    {
                        // Return the new root.
                        return Ok(p.page);
                    } else {
                        unreachable!()
                    }
                }
            }
            Put::Ok(Ok { page, freed }) => {
                // Same as above: increment the relevant reference
                // counters.
                incr_descendants::<T, K, V, P>(txn, cursor, free, freed).await?;
                // And update the left child of the current cursor,
                // since the main invariant of cursors is that we're
                // always visiting the left child (if we're visiting
                // the last child of a page, the cursor is set to the
                // position strictly after the entries).
                let is_owned = cursor.len() < cursor.first_rc_len();
                if let Some(curs) = cursor.pop() {
                    // If we aren't at the root, just update the child.
                    put = Put::Ok(
                        P::update_left_child(txn, curs.page, is_owned, &curs.cursor, page.0.offset)
                            .await?,
                    )
                } else {
                    // If we are at the root, `page` is the updated root.
                    return Ok(page);
                }
            }
        }
    }
}

/// This function does all the memory management for `put`.
async fn incr_descendants<
    T: AllocPage + LoadPage,
    K: Storable + ?Sized,
    V: Storable + ?Sized,
    P: BTreePage<K, V> + Send,
>(
    txn: &mut T,
    cursor: &mut Cursor<K, V, P>,
    free: &mut [u64; N_CURSORS],
    mut freed: u64,
) -> Result<(), T::Error> {
    // The freed page is on the page below.
    if cursor.len() < cursor.first_rc_len() {
        // If a page has split or was immutable (allocated by a
        // previous transaction) and has been updated, we need to free
        // the old page.
        free[cursor.len()] = freed;
    } else {
        // This means that the new version of the page (either split
        // or not) contains references to all the children of the
        // freed page, except the last freed page (because the new
        // version of that page isn't shared).
        let cur = cursor.cur();

        // Start a cursor at the leftmost position and increase the
        // leftmost child page's RC (if we aren't at a leaf, and if
        // that child isn't the last freed page).
        let mut c = P::cursor_first(&cur.page);
        let left = P::left_child(cur.page.as_page(), &c);
        if left != 0 {
            if left != (freed & !1) {
                txn.incr_rc(left).await?;
            } else if cursor.len() == cursor.first_rc_len() {
                freed = 0
            }
        }
        // Then, for each entry of the page, increase the RCs of
        // references contained in the keys and values, and increase
        // the RC of the right child.
        while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c).await {
            for o in k.page_references().chain(v.page_references()) {
                txn.incr_rc(o).await?;
            }
            if r != 0 {
                if r != (freed & !1) {
                    txn.incr_rc(r).await?;
                } else if cursor.len() == cursor.first_rc_len() {
                    freed = 0
                }
            }
        }
        // Else, the "freed" page is shared with another tree, and
        // hence we just need to decrement its RC.
        if freed > 0 && cursor.len() == cursor.first_rc_len() {
            free[cursor.len()] = freed;
        }
    }
    Ok(())
}