use super::*;
use crate::HashMap;
use ::sanakirja::*;
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::sync::Arc;

/// A Sanakirja pristine.
pub struct Pristine {
    pub env: Arc<::sanakirja::Env>,
}

pub(crate) type P<K, V> = btree::page::Page<K, V>;
pub type Db<K, V> = btree::Db<K, V>;
pub(crate) type UP<K, V> = btree::page_unsized::Page<K, V>;
pub type UDb<K, V> = btree::Db_<K, V, UP<K, V>>;

#[derive(Debug, Error)]
pub enum SanakirjaError {
    #[error(transparent)]
    Sanakirja(#[from] ::sanakirja::Error),
    #[error("Pristine locked")]
    PristineLocked,
    #[error("Pristine corrupt")]
    PristineCorrupt,
    #[error(transparent)]
    Borrow(#[from] std::cell::BorrowError),
    #[error("Cannot dropped a borrowed channel: {:?}", c)]
    ChannelRc { c: String },
    #[error("Pristine version mismatch. Cloning over the network can fix this.")]
    Version,
}

impl std::convert::From<::sanakirja::CRCError> for SanakirjaError {
    fn from(_: ::sanakirja::CRCError) -> Self {
        SanakirjaError::PristineCorrupt
    }
}

impl std::convert::From<::sanakirja::CRCError> for TxnErr<SanakirjaError> {
    fn from(_: ::sanakirja::CRCError) -> Self {
        TxnErr(SanakirjaError::PristineCorrupt)
    }
}

impl std::convert::From<::sanakirja::Error> for TxnErr<SanakirjaError> {
    fn from(e: ::sanakirja::Error) -> Self {
        TxnErr(e.into())
    }
}

impl std::convert::From<::sanakirja::Error> for TreeErr<SanakirjaError> {
    fn from(e: ::sanakirja::Error) -> Self {
        TreeErr(e.into())
    }
}

impl std::convert::From<TxnErr<::sanakirja::Error>> for TxnErr<SanakirjaError> {
    fn from(e: TxnErr<::sanakirja::Error>) -> Self {
        TxnErr(e.0.into())
    }
}

impl std::convert::From<TreeErr<::sanakirja::Error>> for TreeErr<SanakirjaError> {
    fn from(e: TreeErr<::sanakirja::Error>) -> Self {
        TreeErr(e.0.into())
    }
}

impl Pristine {
    #[cfg(feature = "mmap")]
    pub fn new<P: AsRef<Path>>(name: P) -> Result<Self, SanakirjaError> {
        Self::new_with_size(name, 1 << 20)
    }
    #[cfg(feature = "mmap")]
    pub unsafe fn new_nolock<P: AsRef<Path>>(name: P) -> Result<Self, SanakirjaError> {
        Self::new_with_size_nolock(name, 1 << 20)
    }

    #[cfg(feature = "mmap")]
    pub fn new_with_size<P: AsRef<Path>>(name: P, size: u64) -> Result<Self, SanakirjaError> {
        let env = ::sanakirja::Env::new(name, size, 2);
        match env {
            Ok(env) => Ok(Pristine { env: Arc::new(env) }),
            Err(::sanakirja::Error::IO(e)) => {
                if let std::io::ErrorKind::WouldBlock = e.kind() {
                    Err(SanakirjaError::PristineLocked)
                } else {
                    Err(SanakirjaError::Sanakirja(::sanakirja::Error::IO(e)))
                }
            }
            Err(e) => Err(SanakirjaError::Sanakirja(e)),
        }
    }
    #[cfg(feature = "mmap")]
    pub unsafe fn new_with_size_nolock<P: AsRef<Path>>(
        name: P,
        size: u64,
    ) -> Result<Self, SanakirjaError> {
        Ok(Pristine {
            env: Arc::new(::sanakirja::Env::new_nolock(name, size, 2)?),
        })
    }
    pub fn new_anon() -> Result<Self, SanakirjaError> {
        Self::new_anon_with_size(1 << 20)
    }
    pub fn new_anon_with_size(size: u64) -> Result<Self, SanakirjaError> {
        Ok(Pristine {
            env: Arc::new(::sanakirja::Env::new_anon(size, 2)?),
        })
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(usize)]
pub enum Root {
    Version,
    Tree,
    RevTree,
    Inodes,
    RevInodes,
    Internal,
    External,
    RevDep,
    Channels,
    TouchedFiles,
    Dep,
    RevTouchedFiles,
    Partials,
    Remotes,
}

const VERSION: u64 = 1u64;

impl Pristine {
    pub fn txn_begin(&self) -> Result<Txn, SanakirjaError> {
        let txn = ::sanakirja::Env::txn_begin(self.env.clone())?;
        if txn.root(Root::Version as usize) != VERSION {
            return Err(SanakirjaError::Version);
        }
        debug!("txn_begin");
        fn begin(txn: ::sanakirja::Txn<Arc<::sanakirja::Env>>) -> Option<Txn> {
            Some(Txn {
                channels: txn.root_db(Root::Channels as usize)?,
                external: txn.root_db(Root::External as usize)?,
                internal: txn.root_db(Root::Internal as usize)?,
                inodes: txn.root_db(Root::Inodes as usize)?,
                revinodes: txn.root_db(Root::RevInodes as usize)?,
                tree: txn.root_db(Root::Tree as usize)?,
                revtree: txn.root_db(Root::RevTree as usize)?,
                revdep: txn.root_db(Root::RevDep as usize)?,
                touched_files: txn.root_db(Root::TouchedFiles as usize)?,
                rev_touched_files: txn.root_db(Root::RevTouchedFiles as usize)?,
                partials: txn.root_db(Root::Partials as usize)?,
                dep: txn.root_db(Root::Dep as usize)?,
                remotes: txn.root_db(Root::Remotes as usize)?,
                open_channels: Mutex::new(HashMap::default()),
                open_remotes: Mutex::new(HashMap::default()),
                txn,
                counter: 0,
                cur_channel: None,
            })
        }
        debug!("txn begin done");
        if let Some(txn) = begin(txn) {
            Ok(txn)
        } else {
            Err(SanakirjaError::PristineCorrupt)
        }
    }

    pub fn arc_txn_begin(&self) -> Result<ArcTxn<MutTxn<()>>, SanakirjaError> {
        Ok(ArcTxn(Arc::new(RwLock::new(self.mut_txn_begin()?))))
    }

    pub fn mut_txn_begin(&self) -> Result<MutTxn<()>, SanakirjaError> {
        unsafe {
            let mut txn = ::sanakirja::Env::mut_txn_begin(self.env.clone()).unwrap();
            if let Some(version) = txn.root(Root::Version as usize) {
                if version != VERSION {
                    return Err(SanakirjaError::Version.into());
                }
            } else {
                txn.set_root(Root::Version as usize, VERSION);
            }
            Ok(MutTxn {
                channels: if let Some(db) = txn.root_db(Root::Channels as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                external: if let Some(db) = txn.root_db(Root::External as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                internal: if let Some(db) = txn.root_db(Root::Internal as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                inodes: if let Some(db) = txn.root_db(Root::Inodes as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                revinodes: if let Some(db) = txn.root_db(Root::RevInodes as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                tree: if let Some(db) = txn.root_db(Root::Tree as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                revtree: if let Some(db) = txn.root_db(Root::RevTree as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                revdep: if let Some(db) = txn.root_db(Root::RevDep as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                dep: if let Some(db) = txn.root_db(Root::Dep as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                touched_files: if let Some(db) = txn.root_db(Root::TouchedFiles as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                rev_touched_files: if let Some(db) = txn.root_db(Root::RevTouchedFiles as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                partials: if let Some(db) = txn.root_db(Root::Partials as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                remotes: if let Some(db) = txn.root_db(Root::Remotes as usize) {
                    db
                } else {
                    btree::create_db_(&mut txn)?
                },
                open_channels: Mutex::new(HashMap::default()),
                open_remotes: Mutex::new(HashMap::default()),
                txn,
                counter: 0,
                cur_channel: None,
            })
        }
    }
}

pub type Txn = GenericTxn<::sanakirja::Txn<Arc<::sanakirja::Env>>>;
pub type MutTxn<T> = GenericTxn<::sanakirja::MutTxn<Arc<::sanakirja::Env>, T>>;

/// A transaction, used both for mutable and immutable transactions,
/// depending on type parameter `T`.
///
/// In Sanakirja, both `sanakirja::Txn` and `sanakirja::MutTxn`
/// implement `sanakirja::Transaction`, explaining our implementation
/// of `TxnT` for `Txn<T>` for all `T: sanakirja::Transaction`. This
/// covers both mutable and immutable transactions in a single
/// implementation.
pub struct GenericTxn<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage>
{
    #[doc(hidden)]
    pub txn: T,
    #[doc(hidden)]
    pub internal: UDb<SerializedHash, ChangeId>,
    #[doc(hidden)]
    pub external: UDb<ChangeId, SerializedHash>,
    pub inodes: Db<Inode, Position<ChangeId>>,
    pub revinodes: Db<Position<ChangeId>, Inode>,

    pub tree: UDb<PathId, Inode>,
    pub revtree: UDb<Inode, PathId>,

    revdep: Db<ChangeId, ChangeId>,
    dep: Db<ChangeId, ChangeId>,

    touched_files: Db<Position<ChangeId>, ChangeId>,
    rev_touched_files: Db<ChangeId, Position<ChangeId>>,

    partials: UDb<SmallStr, Position<ChangeId>>,
    channels: UDb<SmallStr, SerializedChannel>,
    remotes: UDb<RemoteId, SerializedRemote>,

    pub(crate) open_channels: Mutex<HashMap<SmallString, ChannelRef<Self>>>,
    open_remotes: Mutex<HashMap<RemoteId, RemoteRef<Self>>>,
    counter: usize,
    cur_channel: Option<String>,
}

direct_repr!(SerializedPublicKey);

/// This is actually safe because the only non-Send fields are
/// `open_channels` and `open_remotes`, but we can't do anything with
/// a `ChannelRef` whose transaction has been moved to another thread.
unsafe impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> Send
    for GenericTxn<T>
{
}

impl Txn {
    pub fn check_database(&self, refs: &mut std::collections::BTreeMap<u64, usize>) {
        unsafe {
            use ::sanakirja::debug::Check;
            debug!("check: internal 0x{:x}", self.internal.db);
            self.internal.add_refs(&self.txn, refs).unwrap();
            debug!("check: external 0x{:x}", self.external.db);
            self.external.add_refs(&self.txn, refs).unwrap();
            debug!("check: inodes 0x{:x}", self.inodes.db);
            self.inodes.add_refs(&self.txn, refs).unwrap();
            debug!("check: revinodes 0x{:x}", self.revinodes.db);
            self.revinodes.add_refs(&self.txn, refs).unwrap();
            debug!("check: tree 0x{:x}", self.tree.db);
            self.tree.add_refs(&self.txn, refs).unwrap();
            debug!("check: revtree 0x{:x}", self.revtree.db);
            self.revtree.add_refs(&self.txn, refs).unwrap();
            debug!("check: revdep 0x{:x}", self.revdep.db);
            self.revdep.add_refs(&self.txn, refs).unwrap();
            debug!("check: dep 0x{:x}", self.dep.db);
            self.dep.add_refs(&self.txn, refs).unwrap();
            debug!("check: touched_files 0x{:x}", self.touched_files.db);
            self.touched_files.add_refs(&self.txn, refs).unwrap();
            debug!("check: rev_touched_files 0x{:x}", self.rev_touched_files.db);
            self.rev_touched_files.add_refs(&self.txn, refs).unwrap();
            debug!("check: partials 0x{:x}", self.partials.db);
            self.partials.add_refs(&self.txn, refs).unwrap();
            debug!("check: channels 0x{:x}", self.channels.db);
            self.channels.add_refs(&self.txn, refs).unwrap();
            for x in btree::iter(&self.txn, &self.channels, None).unwrap() {
                let (name, tup) = x.unwrap();
                debug!("check: channel name: {:?}", name.as_str());
                let graph: Db<Vertex<ChangeId>, SerializedEdge> = Db::from_page(tup.graph.into());
                let changes: Db<ChangeId, L64> = Db::from_page(tup.changes.into());
                let revchanges: UDb<L64, Pair<ChangeId, SerializedMerkle>> =
                    UDb::from_page(tup.revchanges.into());
                let states: UDb<SerializedMerkle, L64> = UDb::from_page(tup.states.into());
                let tags: Db<L64, Pair<SerializedMerkle, SerializedMerkle>> =
                    Db::from_page(tup.tags.into());
                debug!("check: graph 0x{:x}", graph.db);
                graph.add_refs(&self.txn, refs).unwrap();
                debug!("check: changes 0x{:x}", changes.db);
                changes.add_refs(&self.txn, refs).unwrap();
                debug!("check: revchanges 0x{:x}", revchanges.db);
                revchanges.add_refs(&self.txn, refs).unwrap();
                debug!("check: states 0x{:x}", states.db);
                states.add_refs(&self.txn, refs).unwrap();
                debug!("check: tags 0x{:x}", tags.db);
                tags.add_refs(&self.txn, refs).unwrap();
            }
            debug!("check: remotes 0x{:x}", self.remotes.db);
            self.remotes.add_refs(&self.txn, refs).unwrap();
            for x in btree::iter(&self.txn, &self.remotes, None).unwrap() {
                let (name, tup) = x.unwrap();
                debug!("check: remote name: {:?}", name);
                let remote: UDb<L64, Pair<SerializedHash, SerializedMerkle>> =
                    UDb::from_page(tup.remote.into());

                let rev: UDb<SerializedHash, L64> = UDb::from_page(tup.rev.into());
                let states: UDb<SerializedMerkle, L64> = UDb::from_page(tup.states.into());
                let tags: UDb<L64, Pair<SerializedMerkle, SerializedMerkle>> =
                    UDb::from_page(tup.tags.into());
                debug!("check: remote 0x{:x}", remote.db);
                remote.add_refs(&self.txn, refs).unwrap();
                debug!("check: rev 0x{:x}", rev.db);
                rev.add_refs(&self.txn, refs).unwrap();
                debug!("check: states 0x{:x}", states.db);
                states.add_refs(&self.txn, refs).unwrap();
                debug!("check: tags 0x{:x}", tags.db);
                tags.add_refs(&self.txn, refs).unwrap();
            }
            ::sanakirja::debug::add_free_refs(&self.txn, refs).unwrap();
            ::sanakirja::debug::check_free(&self.txn, &refs);
        }
    }
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> GraphTxnT
    for GenericTxn<T>
{
    type Graph = Channel;
    type GraphError = SanakirjaError;

    fn get_graph<'txn>(
        &'txn self,
        db: &Self::Graph,
        key: &Vertex<ChangeId>,
        value: Option<&SerializedEdge>,
    ) -> Result<Option<&'txn SerializedEdge>, TxnErr<Self::GraphError>> {
        match ::sanakirja::btree::get(&self.txn, &db.graph, key, value) {
            Ok(Some((k, v))) if k == key => Ok(Some(v)),
            Ok(_) => Ok(None),
            Err(e) => {
                error!("{:?}", e);
                Err(TxnErr(SanakirjaError::PristineCorrupt))
            }
        }
    }

    fn get_external(
        &self,
        p: &ChangeId,
    ) -> Result<Option<&SerializedHash>, TxnErr<Self::GraphError>> {
        debug!("get_external {:?}", p);
        if p.is_root() {
            Ok(Some(&HASH_NONE))
        } else {
            match btree::get(&self.txn, &self.external, p, None) {
                Ok(Some((k, v))) if k == p => Ok(Some(v)),
                Ok(_) => Ok(None),
                Err(e) => {
                    error!("{:?}", e);
                    Err(TxnErr(SanakirjaError::PristineCorrupt))
                }
            }
        }
    }

    fn get_internal(
        &self,
        p: &SerializedHash,
    ) -> Result<Option<&ChangeId>, TxnErr<Self::GraphError>> {
        if p.t == HashAlgorithm::None as u8 {
            Ok(Some(&ChangeId::ROOT))
        } else {
            match btree::get(&self.txn, &self.internal, &p, None) {
                Ok(Some((k, v))) if k == p => Ok(Some(v)),
                Ok(_) => Ok(None),
                Err(e) => {
                    error!("{:?}", e);
                    Err(TxnErr(SanakirjaError::PristineCorrupt))
                }
            }
        }
    }

    type Adj = Adj;

    fn init_adj(
        &self,
        g: &Self::Graph,
        key: Vertex<ChangeId>,
        dest: Position<ChangeId>,
        min_flag: EdgeFlags,
        max_flag: EdgeFlags,
    ) -> Result<Self::Adj, TxnErr<Self::GraphError>> {
        let edge = SerializedEdge::new(min_flag, dest.change, dest.pos, ChangeId::ROOT);
        let mut cursor = btree::cursor::Cursor::new(&self.txn, &g.graph).map_err(TxnErr)?;
        cursor.set(&self.txn, &key, Some(&edge))?;
        Ok(Adj {
            cursor,
            key,
            min_flag,
            max_flag,
        })
    }

    fn next_adj<'a>(
        &'a self,
        _: &Self::Graph,
        a: &mut Self::Adj,
    ) -> Option<Result<&'a SerializedEdge, TxnErr<Self::GraphError>>> {
        next_adj(&self.txn, a).map(|x| x.map_err(|x| TxnErr(x.into())))
    }

    fn find_block(
        &self,
        graph: &Self::Graph,
        p: Position<ChangeId>,
    ) -> Result<&Vertex<ChangeId>, BlockError<Self::GraphError>> {
        Ok(find_block(&self.txn, &graph.graph, p)?)
    }

    fn find_block_end(
        &self,
        graph: &Self::Graph,
        p: Position<ChangeId>,
    ) -> Result<&Vertex<ChangeId>, BlockError<Self::GraphError>> {
        Ok(find_block_end(&self.txn, &graph.graph, p)?)
    }
}

impl std::convert::From<BlockError<::sanakirja::Error>> for BlockError<SanakirjaError> {
    fn from(e: BlockError<::sanakirja::Error>) -> Self {
        match e {
            BlockError::Txn(t) => BlockError::Txn(t.into()),
            BlockError::Block { block } => BlockError::Block { block },
        }
    }
}

#[doc(hidden)]
pub fn next_adj<'a, T: ::sanakirja::LoadPage>(
    txn: &'a T,
    a: &mut Adj,
) -> Option<Result<&'a SerializedEdge, T::Error>>
where
    T::Error: std::error::Error,
{
    loop {
        let x: Result<Option<(&Vertex<ChangeId>, &SerializedEdge)>, _> = a.cursor.next(txn);
        match x {
            Ok(Some((v, e))) => {
                if *v == a.key {
                    if e.flag() >= a.min_flag {
                        if e.flag() <= a.max_flag {
                            return Some(Ok(e));
                        } else {
                            return None;
                        }
                    }
                } else if *v > a.key {
                    return None;
                }
            }
            Err(e) => return Some(Err(e.into())),
            Ok(None) => {
                return None;
            }
        }
    }
}

#[doc(hidden)]
pub fn find_block<'a, T: ::sanakirja::LoadPage>(
    txn: &'a T,
    graph: &::sanakirja::btree::Db<Vertex<ChangeId>, SerializedEdge>,
    p: Position<ChangeId>,
) -> Result<&'a Vertex<ChangeId>, BlockError<T::Error>>
where
    T::Error: std::error::Error,
{
    if p.change.is_root() {
        return Ok(&Vertex::ROOT);
    }
    let key = Vertex {
        change: p.change,
        start: p.pos,
        end: p.pos,
    };
    let mut cursor =
        btree::cursor::Cursor::new(txn, &graph).map_err(|x| BlockError::Txn(x.into()))?;
    let mut k = if let Some((k, _)) = cursor
        .set(txn, &key, None)
        .map_err(|x| BlockError::Txn(x.into()))?
    {
        k
    } else if let Some((k, _)) = cursor.prev(txn).map_err(|x| BlockError::Txn(x.into()))? {
        k
    } else {
        debug!("find_block: BLOCK ERROR");
        return Err(BlockError::Block { block: p });
    };
    // The only guarantee here is that k is either the first key >=
    // `key`. We might need to rewind by one step if key is strictly
    // larger than the result (i.e. if `p` is in the middle of the
    // key).
    while k.change > p.change || (k.change == p.change && k.start > p.pos) {
        if let Some((k_, _)) = cursor.prev(txn).map_err(|x| BlockError::Txn(x.into()))? {
            k = k_
        } else {
            break;
        }
    }
    loop {
        if k.change == p.change && k.start <= p.pos {
            if k.end > p.pos || (k.start == k.end && k.end == p.pos) {
                return Ok(k);
            }
        } else if k.change > p.change {
            debug!("find_block: BLOCK ERROR");
            return Err(BlockError::Block { block: p });
        }
        if let Some((k_, _)) = cursor.next(txn).map_err(|x| BlockError::Txn(x.into()))? {
            k = k_
        } else {
            break;
        }
    }
    debug!("find_block: BLOCK ERROR");
    Err(BlockError::Block { block: p })
}

#[doc(hidden)]
pub fn find_block_end<'a, T: ::sanakirja::LoadPage>(
    txn: &'a T,
    graph: &::sanakirja::btree::Db<Vertex<ChangeId>, SerializedEdge>,
    p: Position<ChangeId>,
) -> Result<&'a Vertex<ChangeId>, BlockError<T::Error>>
where
    T::Error: std::error::Error,
{
    if p.change.is_root() {
        return Ok(&Vertex::ROOT);
    }
    let key = Vertex {
        change: p.change,
        start: p.pos,
        end: p.pos,
    };
    let mut cursor =
        btree::cursor::Cursor::new(txn, graph).map_err(|x| BlockError::Txn(x.into()))?;
    debug!("key {:?}", key);
    let (mut k, v) = match cursor.set(txn, &key, None) {
        Ok(Some((k, v))) => (k, v),
        Ok(None) => {
            if let Some((k, v)) = cursor.prev(txn).map_err(|x| BlockError::Txn(x.into()))? {
                (k, v)
            } else {
                debug!("find_block_end, no prev");
                return Err(BlockError::Block { block: p });
            }
        }
        Err(e) => {
            debug!("find_block_end: BLOCK ERROR 0 {:?}", e);
            return Err(BlockError::Txn(e.into()));
        }
    };
    debug!("cursor {:?} {:?}", k, v);
    loop {
        debug!("find_block_end, loop, k = {:?}, p = {:?}", k, p);
        if k.change < p.change {
            break;
        } else if k.change == p.change {
            // Here we want to create an edge pointing between `p`
            // and its successor. If k.start == p.pos, the only
            // case where that's what we want is if k.start ==
            // k.end.
            if k.start == p.pos && k.end == p.pos {
                return Ok(k);
            } else if k.start < p.pos {
                break;
            }
        }
        if let Some((k_, _)) = cursor.prev(txn).map_err(|x| BlockError::Txn(x.into()))? {
            k = k_
        } else {
            break;
        }
    }
    // We also want k.end >= p.pos, so we just call next() until
    // we have that.
    debug!("find_block_end, {:?} {:?}", k, p);
    while k.change < p.change || (k.change == p.change && p.pos > k.end) {
        if let Some((k_, _)) = cursor.next(txn).map_err(|x| BlockError::Txn(x.into()))? {
            k = k_
        } else {
            break;
        }
    }
    debug!("find_block_end, {:?} {:?}", k, p);
    if k.change == p.change
        && ((k.start < p.pos && p.pos <= k.end) || (k.start == k.end && k.start == p.pos))
    {
        Ok(k)
    } else {
        debug!("find_block_end: BLOCK ERROR");
        Err(BlockError::Block { block: p })
    }
}

pub struct Adj {
    pub cursor: ::sanakirja::btree::cursor::Cursor<
        Vertex<ChangeId>,
        SerializedEdge,
        P<Vertex<ChangeId>, SerializedEdge>,
    >,
    pub key: Vertex<ChangeId>,
    pub min_flag: EdgeFlags,
    pub max_flag: EdgeFlags,
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> GraphIter
    for GenericTxn<T>
{
    type GraphCursor = ::sanakirja::btree::cursor::Cursor<
        Vertex<ChangeId>,
        SerializedEdge,
        P<Vertex<ChangeId>, SerializedEdge>,
    >;

    fn graph_cursor(
        &self,
        g: &Self::Graph,
        s: Option<&Vertex<ChangeId>>,
    ) -> Result<Self::GraphCursor, TxnErr<Self::GraphError>> {
        let mut c = ::sanakirja::btree::cursor::Cursor::new(&self.txn, &g.graph)?;
        if let Some(s) = s {
            c.set(&self.txn, s, None)?;
        }
        Ok(c)
    }

    fn next_graph<'txn>(
        &'txn self,
        _: &Self::Graph,
        a: &mut Self::GraphCursor,
    ) -> Option<Result<(&'txn Vertex<ChangeId>, &'txn SerializedEdge), TxnErr<Self::GraphError>>>
    {
        match a.next(&self.txn) {
            Ok(Some(x)) => Some(Ok(x)),
            Ok(None) => None,
            Err(e) => {
                error!("{:?}", e);
                Some(Err(TxnErr(SanakirjaError::PristineCorrupt)))
            }
        }
    }
}

// There is a choice here: the datastructure for `revchanges` is
// intuitively a list. Moreover, when removing a change, we must
// recompute the entire merkle tree after the removed change.
//
// This seems to indicate that a linked list could be an appropriate
// structure (a growable array is excluded because amortised
// complexity is not really acceptable here).
//
// However, we want to be able to answers queries such as "when was
// change X introduced?" without having to read the entire database.
//
// Additionally, even though `SerializedMerkle` has only one
// implementation, and is therefore sized in the current
// implementation, we can't exclude that other algorithms may be
// added, which means that the pages inside linked lists won't even be
// randomly-accessible arrays.
pub struct Channel {
    pub graph: Db<Vertex<ChangeId>, SerializedEdge>,
    pub changes: Db<ChangeId, L64>,
    pub revchanges: UDb<L64, Pair<ChangeId, SerializedMerkle>>,
    pub states: UDb<SerializedMerkle, L64>,
    pub tags: Db<L64, Pair<SerializedMerkle, SerializedMerkle>>,
    pub apply_counter: ApplyTimestamp,
    pub name: SmallString,
    pub last_modified: u64,
    pub id: RemoteId,
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> ChannelTxnT
    for GenericTxn<T>
{
    type Channel = Channel;

    fn graph<'a>(&self, c: &'a Self::Channel) -> &'a Channel {
        c
    }
    fn name<'a>(&self, c: &'a Self::Channel) -> &'a str {
        c.name.as_str()
    }
    fn id<'a>(&self, c: &'a Self::Channel) -> Option<&'a RemoteId> {
        Some(&c.id)
    }
    fn apply_counter(&self, channel: &Self::Channel) -> u64 {
        channel.apply_counter.into()
    }
    fn last_modified(&self, channel: &Self::Channel) -> u64 {
        channel.last_modified.into()
    }
    fn changes<'a>(&self, channel: &'a Self::Channel) -> &'a Self::Changeset {
        &channel.changes
    }
    fn rev_changes<'a>(&self, channel: &'a Self::Channel) -> &'a Self::RevChangeset {
        &channel.revchanges
    }
    fn tags<'a>(&self, channel: &'a Self::Channel) -> &'a Self::Tags {
        &channel.tags
    }

    type Changeset = Db<ChangeId, L64>;
    type RevChangeset = UDb<L64, Pair<ChangeId, SerializedMerkle>>;

    fn get_changeset(
        &self,
        channel: &Self::Changeset,
        c: &ChangeId,
    ) -> Result<Option<&L64>, TxnErr<Self::GraphError>> {
        match btree::get(&self.txn, channel, c, None) {
            Ok(Some((k, x))) if k == c => Ok(Some(x)),
            Ok(x) => {
                debug!("get_changeset = {:?}", x);
                Ok(None)
            }
            Err(e) => {
                error!("{:?}", e);
                Err(TxnErr(SanakirjaError::PristineCorrupt))
            }
        }
    }
    fn get_revchangeset(
        &self,
        revchanges: &Self::RevChangeset,
        c: &L64,
    ) -> Result<Option<&Pair<ChangeId, SerializedMerkle>>, TxnErr<Self::GraphError>> {
        match btree::get(&self.txn, revchanges, c, None) {
            Ok(Some((k, x))) if k == c => Ok(Some(x)),
            Ok(_) => Ok(None),
            Err(e) => {
                error!("{:?}", e);
                Err(TxnErr(SanakirjaError::PristineCorrupt))
            }
        }
    }

    type ChangesetCursor = ::sanakirja::btree::cursor::Cursor<ChangeId, L64, P<ChangeId, L64>>;

    fn cursor_changeset<'a>(
        &'a self,
        channel: &Self::Changeset,
        pos: Option<ChangeId>,
    ) -> Result<Cursor<Self, &'a Self, Self::ChangesetCursor, ChangeId, L64>, TxnErr<SanakirjaError>>
    {
        let mut cursor = btree::cursor::Cursor::new(&self.txn, &channel)?;
        if let Some(k) = pos {
            cursor.set(&self.txn, &k, None)?;
        }
        Ok(Cursor {
            cursor,
            txn: self,
            k: std::marker::PhantomData,
            v: std::marker::PhantomData,
            t: std::marker::PhantomData,
        })
    }

    type RevchangesetCursor = ::sanakirja::btree::cursor::Cursor<
        L64,
        Pair<ChangeId, SerializedMerkle>,
        UP<L64, Pair<ChangeId, SerializedMerkle>>,
    >;

    fn cursor_revchangeset_ref<'a, RT: std::ops::Deref<Target = Self>>(
        txn: RT,
        channel: &Self::RevChangeset,
        pos: Option<L64>,
    ) -> Result<
        Cursor<Self, RT, Self::RevchangesetCursor, L64, Pair<ChangeId, SerializedMerkle>>,
        TxnErr<SanakirjaError>,
    > {
        let mut cursor = btree::cursor::Cursor::new(&txn.txn, channel)?;
        if let Some(k) = pos {
            cursor.set(&txn.txn, &k, None)?;
        }
        Ok(Cursor {
            cursor,
            txn,
            k: std::marker::PhantomData,
            v: std::marker::PhantomData,
            t: std::marker::PhantomData,
        })
    }

    fn rev_cursor_revchangeset<'a>(
        &'a self,
        channel: &Self::RevChangeset,
        pos: Option<L64>,
    ) -> Result<
        RevCursor<Self, &'a Self, Self::RevchangesetCursor, L64, Pair<ChangeId, SerializedMerkle>>,
        TxnErr<SanakirjaError>,
    > {
        let mut cursor = btree::cursor::Cursor::new(&self.txn, channel)?;
        if let Some(ref pos) = pos {
            cursor.set(&self.txn, pos, None)?;
        } else {
            cursor.set_last(&self.txn)?;
        };
        Ok(RevCursor {
            cursor,
            txn: self,
            k: std::marker::PhantomData,
            v: std::marker::PhantomData,
            t: std::marker::PhantomData,
        })
    }

    fn cursor_revchangeset_next(
        &self,
        cursor: &mut Self::RevchangesetCursor,
    ) -> Result<Option<(&L64, &Pair<ChangeId, SerializedMerkle>)>, TxnErr<SanakirjaError>> {
        if let Ok(x) = cursor.next(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }
    fn cursor_revchangeset_prev(
        &self,
        cursor: &mut Self::RevchangesetCursor,
    ) -> Result<Option<(&L64, &Pair<ChangeId, SerializedMerkle>)>, TxnErr<SanakirjaError>> {
        if let Ok(x) = cursor.prev(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }

    fn cursor_changeset_next(
        &self,
        cursor: &mut Self::ChangesetCursor,
    ) -> Result<Option<(&ChangeId, &L64)>, TxnErr<SanakirjaError>> {
        if let Ok(x) = cursor.next(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }
    fn cursor_changeset_prev(
        &self,
        cursor: &mut Self::ChangesetCursor,
    ) -> Result<Option<(&ChangeId, &L64)>, TxnErr<SanakirjaError>> {
        if let Ok(x) = cursor.prev(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }

    type States = UDb<SerializedMerkle, L64>;
    fn states<'a>(&self, channel: &'a Self::Channel) -> &'a Self::States {
        &channel.states
    }
    fn channel_has_state(
        &self,
        channel: &Self::States,
        m: &SerializedMerkle,
    ) -> Result<Option<L64>, TxnErr<Self::GraphError>> {
        match btree::get(&self.txn, channel, m, None)? {
            Some((k, v)) if k == m => Ok(Some(*v)),
            _ => Ok(None),
        }
    }

    type Tags = Db<L64, Pair<SerializedMerkle, SerializedMerkle>>;

    fn is_tagged(&self, tags: &Self::Tags, t: u64) -> Result<bool, TxnErr<Self::GraphError>> {
        let t: L64 = t.into();
        match btree::get(&self.txn, tags, &t, None)? {
            Some((k, _)) => Ok(k == &t),
            _ => Ok(false),
        }
    }

    type TagsCursor = ::sanakirja::btree::cursor::Cursor<
        L64,
        Pair<SerializedMerkle, SerializedMerkle>,
        P<L64, Pair<SerializedMerkle, SerializedMerkle>>,
    >;
    fn cursor_tags<'txn>(
        &'txn self,
        channel: &Self::Tags,
        k: Option<L64>,
    ) -> Result<
        crate::pristine::Cursor<
            Self,
            &'txn Self,
            Self::TagsCursor,
            L64,
            Pair<SerializedMerkle, SerializedMerkle>,
        >,
        TxnErr<Self::GraphError>,
    > {
        let mut cursor = btree::cursor::Cursor::new(&self.txn, channel)?;
        if let Some(k) = k {
            cursor.set(&self.txn, &k, None)?;
        }
        Ok(Cursor {
            cursor,
            txn: self,
            k: std::marker::PhantomData,
            v: std::marker::PhantomData,
            t: std::marker::PhantomData,
        })
    }
    fn cursor_tags_next(
        &self,
        cursor: &mut Self::TagsCursor,
    ) -> Result<Option<(&L64, &Pair<SerializedMerkle, SerializedMerkle>)>, TxnErr<Self::GraphError>>
    {
        if let Ok(x) = cursor.next(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }

    fn cursor_tags_prev(
        &self,
        cursor: &mut Self::TagsCursor,
    ) -> Result<Option<(&L64, &Pair<SerializedMerkle, SerializedMerkle>)>, TxnErr<Self::GraphError>>
    {
        if let Ok(x) = cursor.prev(&self.txn) {
            Ok(x)
        } else {
            Err(TxnErr(SanakirjaError::PristineCorrupt))
        }
    }

    fn iter_tags(
        &self,
        channel: &Self::Tags,
        from: u64,
    ) -> Result<
        super::Cursor<Self, &Self, Self::TagsCursor, L64, Pair<SerializedMerkle, SerializedMerkle>>,
        TxnErr<Self::GraphError>,
    > {
        self.cursor_tags(channel, Some(from.into()))
    }

    fn rev_iter_tags(
        &self,
        channel: &Self::Tags,
        from: Option<u64>,
    ) -> Result<
        super::RevCursor<
            Self,
            &Self,
            Self::TagsCursor,
            L64,
            Pair<SerializedMerkle, SerializedMerkle>,
        >,
        TxnErr<Self::GraphError>,
    > {
        let mut cursor = btree::cursor::Cursor::new(&self.txn, channel)?;
        if let Some(from) = from {
            cursor.set(&self.txn, &from.into(), None)?;
        } else {
            cursor.set_last(&self.txn)?;
        };
        Ok(RevCursor {
            cursor,
            txn: self,
            k: std::marker::PhantomData,
            v: std::marker::PhantomData,
            t: std::marker::PhantomData,
        })
    }
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> DepsTxnT
    for GenericTxn<T>
{
    type DepsError = SanakirjaError;
    type Dep = Db<ChangeId, ChangeId>;
    type Revdep = Db<ChangeId, ChangeId>;

    sanakirja_table_get!(dep, ChangeId, ChangeId, DepsError);
    sanakirja_table_get!(revdep, ChangeId, ChangeId, DepsError);
    type DepCursor = ::sanakirja::btree::cursor::Cursor<ChangeId, ChangeId, P<ChangeId, ChangeId>>;
    sanakirja_cursor_ref!(dep, ChangeId, ChangeId);
    fn iter_dep_ref<RT: std::ops::Deref<Target = Self> + Clone>(
        txn: RT,
        p: &ChangeId,
    ) -> Result<super::Cursor<Self, RT, Self::DepCursor, ChangeId, ChangeId>, TxnErr<Self::DepsError>>
    {
        Self::cursor_dep_ref(txn.clone(), &txn.dep, Some((p, None)))
    }

    sanakirja_table_get!(touched_files, Position<ChangeId>, ChangeId, DepsError);
    sanakirja_table_get!(rev_touched_files, ChangeId, Position<ChangeId>, DepsError);

    type Touched_files = Db<Position<ChangeId>, ChangeId>;

    type Rev_touched_files = Db<ChangeId, Position<ChangeId>>;

    type Touched_filesCursor = ::sanakirja::btree::cursor::Cursor<
        Position<ChangeId>,
        ChangeId,
        P<Position<ChangeId>, ChangeId>,
    >;
    sanakirja_iter!(touched_files, Position<ChangeId>, ChangeId);

    type Rev_touched_filesCursor = ::sanakirja::btree::cursor::Cursor<
        ChangeId,
        Position<ChangeId>,
        P<ChangeId, Position<ChangeId>>,
    >;
    sanakirja_iter!(rev_touched_files, ChangeId, Position<ChangeId>);
    fn iter_revdep(
        &self,
        k: &ChangeId,
    ) -> Result<
        super::Cursor<Self, &Self, Self::DepCursor, ChangeId, ChangeId>,
        TxnErr<Self::DepsError>,
    > {
        self.cursor_dep(&self.revdep, Some((k, None)))
    }

    fn iter_dep(
        &self,
        k: &ChangeId,
    ) -> Result<
        super::Cursor<Self, &Self, Self::DepCursor, ChangeId, ChangeId>,
        TxnErr<Self::DepsError>,
    > {
        self.cursor_dep(&self.dep, Some((k, None)))
    }

    fn iter_touched(
        &self,
        k: &Position<ChangeId>,
    ) -> Result<
        super::Cursor<Self, &Self, Self::Touched_filesCursor, Position<ChangeId>, ChangeId>,
        TxnErr<Self::DepsError>,
    > {
        self.cursor_touched_files(&self.touched_files, Some((k, None)))
    }

    fn iter_rev_touched(
        &self,
        k: &ChangeId,
    ) -> Result<
        super::Cursor<Self, &Self, Self::Rev_touched_filesCursor, ChangeId, Position<ChangeId>>,
        TxnErr<Self::DepsError>,
    > {
        self.cursor_rev_touched_files(&self.rev_touched_files, Some((k, None)))
    }
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> TreeTxnT
    for GenericTxn<T>
{
    type TreeError = SanakirjaError;
    type Inodes = Db<Inode, Position<ChangeId>>;
    type Revinodes = Db<Position<ChangeId>, Inode>;
    sanakirja_table_get!(inodes, Inode, Position<ChangeId>, TreeError, TreeErr);
    sanakirja_table_get!(revinodes, Position<ChangeId>, Inode, TreeError, TreeErr);
    sanakirja_cursor!(inodes, Inode, Position<ChangeId>, TreeErr);
    // #[cfg(debug_assertions)]
    sanakirja_cursor!(revinodes, Position<ChangeId>, Inode, TreeErr);

    type Tree = UDb<PathId, Inode>;
    sanakirja_table_get!(tree, PathId, Inode, TreeError, TreeErr);
    type TreeCursor = ::sanakirja::btree::cursor::Cursor<PathId, Inode, UP<PathId, Inode>>;
    sanakirja_iter!(tree, PathId, Inode, TreeErr);
    type RevtreeCursor = ::sanakirja::btree::cursor::Cursor<Inode, PathId, UP<Inode, PathId>>;
    sanakirja_iter!(revtree, Inode, PathId, TreeErr);

    type Revtree = UDb<Inode, PathId>;
    sanakirja_table_get!(revtree, Inode, PathId, TreeError, TreeErr);

    type Partials = UDb<SmallStr, Position<ChangeId>>;
    type PartialsCursor = ::sanakirja::btree::cursor::Cursor<
        SmallStr,
        Position<ChangeId>,
        UP<SmallStr, Position<ChangeId>>,
    >;
    sanakirja_cursor!(partials, SmallStr, Position<ChangeId>, TreeErr);
    type InodesCursor =
        ::sanakirja::btree::cursor::Cursor<Inode, Position<ChangeId>, P<Inode, Position<ChangeId>>>;
    fn iter_inodes(
        &self,
    ) -> Result<
        super::Cursor<Self, &Self, Self::InodesCursor, Inode, Position<ChangeId>>,
        TreeErr<Self::TreeError>,
    > {
        self.cursor_inodes(&self.inodes, None)
    }

    // #[cfg(debug_assertions)]
    type RevinodesCursor =
        ::sanakirja::btree::cursor::Cursor<Position<ChangeId>, Inode, P<Position<ChangeId>, Inode>>;
    // #[cfg(debug_assertions)]
    fn iter_revinodes(
        &self,
    ) -> Result<
        super::Cursor<Self, &Self, Self::RevinodesCursor, Position<ChangeId>, Inode>,
        TreeErr<SanakirjaError>,
    > {
        self.cursor_revinodes(&self.revinodes, None)
    }

    fn iter_partials<'txn>(
        &'txn self,
        k: &str,
    ) -> Result<
        super::Cursor<Self, &'txn Self, Self::PartialsCursor, SmallStr, Position<ChangeId>>,
        TreeErr<SanakirjaError>,
    > {
        let k0 = SmallString::from_str(k);
        self.cursor_partials(&self.partials, Some((&k0, None)))
    }
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> GenericTxn<T> {
    #[doc(hidden)]
    pub unsafe fn unsafe_load_channel(
        &self,
        name: SmallString,
    ) -> Result<Option<Channel>, TxnErr<SanakirjaError>> {
        debug!("unsafe load channel");
        match btree::get(&self.txn, &self.channels, &name, None)? {
            Some((name_, tup)) if name_ == name.as_ref() => {
                debug!("load_channel: {:?} {:?}", name, tup);
                Ok(Some(Channel {
                    graph: Db::from_page(tup.graph.into()),
                    changes: Db::from_page(tup.changes.into()),
                    revchanges: UDb::from_page(tup.revchanges.into()),
                    states: UDb::from_page(tup.states.into()),
                    tags: Db::from_page(tup.tags.into()),
                    apply_counter: tup.apply_counter.into(),
                    last_modified: tup.last_modified.into(),
                    id: tup.id,
                    name,
                }))
            }
            _ => {
                debug!("unsafe_load_channel: not found");
                Ok(None)
            }
        }
    }
}

impl<T: ::sanakirja::LoadPage<Error = ::sanakirja::Error> + ::sanakirja::RootPage> TxnT
    for GenericTxn<T>
{
    fn hash_from_prefix(
        &self,
        s: &str,
    ) -> Result<(Hash, ChangeId), super::HashPrefixError<Self::GraphError>> {
        let h: SerializedHash = if let Some(ref h) = Hash::from_prefix(s) {
            h.into()
        } else {
            return Err(super::HashPrefixError::Parse(s.to_string()));
        };
        let mut result = None;
        debug!("h = {:?}", h);
        for x in btree::iter(&self.txn, &self.internal, Some((&h, None)))
            .map_err(|e| super::HashPrefixError::Txn(e.into()))?
        {
            let (e, i) = x.map_err(|e| super::HashPrefixError::Txn(e.into()))?;
            debug!("{:?} {:?}", e, i);
            if e < &h {
                continue;
            } else {
                let e: Hash = e.into();
                let b32 = e.to_base32();
                debug!("{:?}", b32);
                let (b32, _) = b32.split_at(s.len().min(b32.len()));
                if b32 != s {
                    break;
                } else if result.is_none() {
                    result = Some((e, *i))
                } else {
                    return Err(super::HashPrefixError::Ambiguous(s.to_string()));
                }
            }
        }
        if let Some(result) = result {
            Ok(result)
        } else {
            Err(super::HashPrefixError::NotFound(s.to_string()))
        }
    }

    fn state_from_prefix(
        &self,
        channel: &Self::States,
        s: &str,
    ) -> Result<(Merkle, L64), super::HashPrefixError<Self::GraphError>> {
        let h: SerializedMerkle = if let Some(ref h) = Merkle::from_prefix(s) {
            h.into()
        } else {
            return Err(super::HashPrefixError::Parse(s.to_string()));
        };
        let mut result = None;
        debug!("h = {:?}", h);
        for x in btree::iter(&self.txn, &channel, Some((&h, None)))
            .map_err(|e| super::HashPrefixError::Txn(e.into()))?
        {
            let (e, i) = x.map_err(|e| super::HashPrefixError::Txn(e.into()))?;
            debug!("{:?} {:?}", e, i);
            if e < &h {
                continue;
            } else {
                let e: Merkle = e.into();
                let b32 = e.to_base32();
                debug!("{:?}", b32);
                let (b32, _) = b32.split_at(s.len().min(b32.len()));
                if b32 != s {
                    break;
                } else if result.is_none() {
                    result = Some((e, *i))
                } else {
                    return Err(super::HashPrefixError::Ambiguous(s.to_string()));
                }
            }
        }
        if let Some(result) = result {
            Ok(result)
        } else {
            Err(super::HashPrefixError::NotFound(s.to_string()))
        }
    }

    fn hash_from_prefix_remote<'txn>(
        &'txn self,
        remote: &RemoteRef<Self>,
        s: &str,
    ) -> Result<Hash, super::HashPrefixError<Self::GraphError>> {
        let remote = remote.db.lock();
        let h: SerializedHash = if let Some(h) = Hash::from_prefix(s) {
            (&h).into()
        } else {
            return Err(super::HashPrefixError::Parse(s.to_string()));
        };
        let mut result = None;
        debug!("h = {:?}", h);
        for x in btree::iter(&self.txn, &remote.rev, Some((&h, None)))
            .map_err(|e| super::HashPrefixError::Txn(e.into()))?
        {
            let (e, _) = x.map_err(|e| super::HashPrefixError::Txn(e.into()))?;
            debug!("{:?}", e);
            if e < &h {
                continue;
            } else {
                let e: Hash = e.into();
                let b32 = e.to_base32();
                debug!("{:?}", b32);
                let (b32, _) = b32.split_at(s.len().min(b32.len()));
                if b32 != s {
                    break;
                } else if result.is_none() {
                    result = Some(e)
                } else {
                    return Err(super::HashPrefixError::Ambiguous(s.to_string()));
                }
            }
        }
        if let Some(result) = result {
            Ok(result)
        } else {
            Err(super::HashPrefixError::NotFound(s.to_string()))
        }
    }

    fn load_channel(
        &self,
        name: &str,
    ) -> Result<Option<ChannelRef<Self>>, TxnErr<Self::GraphError>> {
        let name = SmallString::from_str(name);
        match self.open_channels.lock().entry(name.clone()) {
            Entry::Vacant(v) => {
                if let Some(c) = unsafe { self.unsafe_load_channel(name)? } {
                    Ok(Some(
                        v.insert(ChannelRef {
                            r: Arc::new(RwLock::new(c)),
                        })
                        .clone(),
                    ))
                } else {
                    Ok(None)
                }
            }
            Entry::Occupied(occ) => Ok(Some(occ.get().clone())),
        }
    }

    fn load_remote(
        &self,
        name: &RemoteId,
    ) -> Result<Option<RemoteRef<Self>>, TxnErr<Self::GraphError>> {
        unsafe {
            let name = name.to_owned();
            match self.open_remotes.lock().entry(name.clone()) {
                Entry::Vacant(v) => match btree::get(&self.txn, &self.remotes, &name, None)? {
                    Some((name_, remote)) if name == *name_ => {
                        debug!("load_remote: {:?} {:?}", name_, remote);
                        let r = Remote {
                            remote: UDb::from_page(remote.remote.into()),
                            rev: UDb::from_page(remote.rev.into()),
                            states: UDb::from_page(remote.states.into()),
                            id_rev: remote.id_rev.into(),
                            tags: Db::from_page(remote.tags.into()),
                            path: remote.path.to_owned(),
                        };
                        for x in btree::iter(&self.txn, &r.remote, None).unwrap() {
                            debug!("remote -> {:?}", x);
                        }
                        for x in btree::iter(&self.txn, &r.rev, None).unwrap() {
                            debug!("rev -> {:?}", x);
                        }
                        for x in btree::iter(&self.txn, &r.states, None).unwrap() {
                            debug!("states -> {:?}", x);
                        }

                        for x in self.iter_remote(&r.remote, 0).unwrap() {
                            debug!("ITER {:?}", x);
                        }

                        let r = RemoteRef {
                            db: Arc::new(Mutex::new(r)),
                            id: name,
                        };
                        Ok(Some(v.insert(r).clone()))
                    }
                    _ => return Ok(None),
                },
                Entry::Occupied(occ) => Ok(Some(occ.get().clone())),
            }
        }
    }

    ///
    type Channels = UDb<SmallStr, SerializedChannel>;
    type ChannelsCursor = ::sanakirja::btree::cursor::Cursor<
        SmallStr,
        SerializedChannel,
        UP<SmallStr, SerializedChannel>,
    >;
    sanakirja_cursor!(channels, SmallStr, SerializedChannel);
    fn channels<'txn>(
        &'txn self,
        start: &str,
    ) -> Result<Vec<ChannelRef<Self>>, TxnErr<Self::GraphError>> {
        let name = SmallString::from_str(start);
        let mut cursor = btree::cursor::Cursor::new(&self.txn, &self.channels)?;
        cursor.set(&self.txn, &name, None)?;
        while let Ok(Some((name, _))) = self.cursor_channels_next(&mut cursor) {
            self.load_channel(name.as_str())?;
        }
        Ok(self
            .open_channels
            .lock()
            .iter()
            .map(|(_, x)| x.clone())
            .collect())
    }

    type Remotes = UDb<RemoteId, SerializedRemote>;
    type RemotesCursor = ::sanakirja::btree::cursor::Cursor<
        RemoteId,
        SerializedRemote,
        UP<RemoteId, SerializedRemote>,
    >;
    sanakirja_cursor!(remotes, RemoteId, SerializedRemote);
    fn iter_remotes<'txn>(
        &'txn self,
        start: &RemoteId,
    ) -> Result<RemotesIterator<'txn, Self>, TxnErr<Self::GraphError>> {
        let mut cursor = btree::cursor::Cursor::new(&self.txn, &self.remotes)?;
        cursor.set(&self.txn, start, None)?;
        Ok(RemotesIterator { cursor, txn: self })
    }

    type Remote = UDb<L64, Pair<SerializedHash, SerializedMerkle>>;
    type Revremote = UDb<SerializedHash, L64>;
    type Remotestates = UDb<SerializedMerkle, L64>;
    type Remotetags = UDb<L64, Pair<SerializedMerkle, SerializedMerkle>>;
    type RemoteCursor = ::sanakirja::btree::cursor::Cursor<
        L64,
        Pair<SerializedHash, SerializedMerkle>,
        UP<L64, Pair<SerializedHash, SerializedMerkle>>,
    >;
    sanakirja_cursor!(remote, L64, Pair<SerializedHash, SerializedMerkle>);
    sanakirja_rev_cursor!(remote, L64, Pair<SerializedHash, SerializedMerkle>);

    fn iter_remote<'txn>(
        &'txn self,
        remote: &Self::Remote,
        k: u64,
    ) -> Result<
        super::Cursor<
            Self,
            &'txn Self,
            Self::RemoteCursor,
            L64,
            Pair<SerializedHash, SerializedMerkle>,
        >,
        TxnErr<Self::GraphError>,
    > {
        self.cursor_remote(remote, Some((&k.into(), None)))
    }

    fn iter_rev_remote<'txn>(
        &'txn self,
        remote: &Self::Remote,
        k: Option<L64>,
    ) -> Result<
        super::RevCursor<
            Self,
            &'txn Self,
            Self::RemoteCursor,
            L64,
            Pair<SerializedHash, SerializedMerkle>,
        >,
        TxnErr<Self::GraphError>,
    > {
        self.rev_cursor_remote(remote, k.as_ref().map(|k| (k, None)))
    }

    fn get_remote(
        &mut self,
        name: RemoteId,
    ) -> Result<Option<RemoteRef<Self>>, TxnErr<Self::GraphError>> {
        unsafe {
            let name = name.to_owned();
            match self.open_remotes.lock().entry(name.clone()) {
                Entry::Vacant(v) => match btree::get(&self.txn, &self.remotes, &name, None)? {
                    Some((name_, remote)) if *name_ == name => {
                        let r = RemoteRef {
                            db: Arc::new(Mutex::new(Remote {
                                remote: UDb::from_page(remote.remote.into()),
                                rev: UDb::from_page(remote.rev.into()),
                                states: UDb::from_page(remote.states.into()),
                                id_rev: remote.id_rev.into(),
                                tags: Db::from_page(remote.tags.into()),
                                path: remote.path.to_owned(),
                            })),
                            id: name,
                        };
                        v.insert(r);
                    }
                    _ => return Ok(None),
                },
                Entry::Occupied(_) => {}
            }
            Ok(self.open_remotes.lock().get(&name).cloned())
        }
    }

    fn last_remote(
        &self,
        remote: &Self::Remote,
    ) -> Result<Option<(u64, &Pair<SerializedHash, SerializedMerkle>)>, TxnErr<Self::GraphError>>
    {
        debug!("last_remote: {:?}", remote);
        if let Some(x) = btree::rev_iter(&self.txn, remote, None)?.next() {
            let (&k, v) = x?;
            Ok(Some((k.into(), v)))
        } else {
            Ok(None)
        }
    }

    fn last_remote_tag(
        &self,
        remote: &Self::Tags,
    ) -> Result<Option<(u64, &SerializedMerkle, &SerializedMerkle)>, TxnErr<Self::GraphError>> {
        if let Some(x) = btree::rev_iter(&self.txn, remote, None)?.next() {
            let (&k, v) = x?;
            Ok(Some((k.into(), &v.a, &v.b)))
        } else {
            Ok(None)
        }
    }

    fn get_remote_state(
        &self,
        remote: &Self::Remote,
        n: u64,
    ) -> Result<Option<(u64, &Pair<SerializedHash, SerializedMerkle>)>, TxnErr<Self::GraphError>>
    {
        let n = n.into();
        for x in btree::iter(&self.txn, remote, Some((&n, None)))? {
            let (&k, m) = x?;
            if k >= n {
                return Ok(Some((k.into(), m)));
            }
        }
        Ok(None)
    }

    fn get_remote_tag(
        &self,
        remote: &Self::Tags,
        n: u64,
    ) -> Result<Option<(u64, &Pair<SerializedMerkle, SerializedMerkle>)>, TxnErr<Self::GraphError>>
    {
        let n = n.into();
        if let Some(x) = btree::rev_iter(&self.txn, remote, Some((&n, None)))?.next() {
            let (&k, m) = x?;
            Ok(Some((k.into(), m)))
        } else {
            Ok(None)
        }
    }

    fn remote_has_change(
        &self,
        remote: &RemoteRef<Self>,
        hash: &SerializedHash,
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        match btree::get(&self.txn, &remote.db.lock().rev, hash, None)? {
            Some((k, _)) if k == hash => Ok(true),
            _ => Ok(false),
        }
    }
    fn remote_has_state(
        &self,
        remote: &RemoteRef<Self>,
        m: &SerializedMerkle,
    ) -> Result<Option<u64>, TxnErr<Self::GraphError>> {
        match btree::get(&self.txn, &remote.db.lock().states, m, None)? {
            Some((k, v)) if k == m => Ok(Some((*v).into())),
            _ => Ok(None),
        }
    }
    fn current_channel(&self) -> Result<&str, Self::GraphError> {
        if let Some(ref c) = self.cur_channel {
            Ok(c)
        } else {
            unsafe {
                let b = self.txn.root_page();
                let len = b[4096 - 256] as usize;
                if len == 0 {
                    Ok("main")
                } else {
                    let s = std::slice::from_raw_parts(b.as_ptr().add(4096 - 255), len);
                    Ok(std::str::from_utf8(s).unwrap_or("main"))
                }
            }
        }
    }
}

impl GraphMutTxnT for MutTxn<()> {
    fn put_graph(
        &mut self,
        graph: &mut Self::Graph,
        k: &Vertex<ChangeId>,
        e: &SerializedEdge,
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        Ok(btree::put(&mut self.txn, &mut graph.graph, k, e)?)
    }

    fn del_graph(
        &mut self,
        graph: &mut Self::Graph,
        k: &Vertex<ChangeId>,
        e: Option<&SerializedEdge>,
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        Ok(btree::del(&mut self.txn, &mut graph.graph, k, e)?)
    }

    fn debug(&mut self, graph: &mut Self::Graph, extra: &str) {
        ::sanakirja::debug::debug(
            &self.txn,
            &[&graph.graph],
            format!("debug{}{}", self.counter, extra),
            true,
        );
    }

    sanakirja_put_del!(internal, SerializedHash, ChangeId, GraphError);
    sanakirja_put_del!(external, ChangeId, SerializedHash, GraphError);

    fn split_block(
        &mut self,
        graph: &mut Self::Graph,
        key: &Vertex<ChangeId>,
        pos: ChangePosition,
        buf: &mut Vec<SerializedEdge>,
    ) -> Result<(), TxnErr<Self::GraphError>> {
        assert!(pos > key.start);
        assert!(pos < key.end);
        let mut cursor = btree::cursor::Cursor::new(&self.txn, &graph.graph)?;
        cursor.set(&self.txn, key, None)?;
        loop {
            match cursor.next(&self.txn) {
                Ok(Some((k, v))) => {
                    if k > key {
                        break;
                    } else if k < key {
                        continue;
                    }
                    buf.push(*v)
                }
                Ok(None) => break,
                Err(e) => {
                    error!("{:?}", e);
                    return Err(TxnErr(SanakirjaError::PristineCorrupt));
                }
            }
        }
        for chi in buf.drain(..) {
            assert!(
                chi.introduced_by() != ChangeId::ROOT || chi.flag().contains(EdgeFlags::PSEUDO)
            );
            if chi.flag().contains(EdgeFlags::PARENT | EdgeFlags::BLOCK) {
                put_graph_with_rev(
                    self,
                    graph,
                    chi.flag() - EdgeFlags::PARENT,
                    Vertex {
                        change: key.change,
                        start: key.start,
                        end: pos,
                    },
                    Vertex {
                        change: key.change,
                        start: pos,
                        end: key.end,
                    },
                    chi.introduced_by(),
                )?;
            }

            self.del_graph(graph, key, Some(&chi))?;
            self.put_graph(
                graph,
                &if chi.flag().contains(EdgeFlags::PARENT) {
                    Vertex {
                        change: key.change,
                        start: key.start,
                        end: pos,
                    }
                } else {
                    Vertex {
                        change: key.change,
                        start: pos,
                        end: key.end,
                    }
                },
                &chi,
            )?;
        }
        Ok(())
    }
}

impl ChannelMutTxnT for MutTxn<()> {
    fn graph_mut(c: &mut Self::Channel) -> &mut Self::Graph {
        c
    }
    fn touch_channel(&mut self, channel: &mut Self::Channel, t: Option<u64>) {
        use std::time::SystemTime;
        debug!("touch_channel: {:?}", t);
        if let Some(t) = t {
            channel.last_modified = t.into()
        } else if let Ok(duration) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
            channel.last_modified = (duration.as_millis() as u64 / 1000) * 1000
        }
    }

    fn put_changes(
        &mut self,
        channel: &mut Self::Channel,
        p: ChangeId,
        t: ApplyTimestamp,
        h: &Hash,
    ) -> Result<Option<Merkle>, TxnErr<Self::GraphError>> {
        debug!("put_changes {:?} {:?}", p, h);
        if let Some(m) = self.get_changeset(&channel.changes, &p)? {
            debug!("found m = {:?}, p = {:?}", m, p);
            Ok(None)
        } else {
            channel.apply_counter += 1;
            debug!("put_changes {:?} {:?}", t, p);
            let m = if let Some(x) = btree::rev_iter(&self.txn, &channel.revchanges, None)?.next() {
                let (a, b) = x?;
                let a: u64 = (*a).into();
                assert!(a < t);
                (&b.b).into()
            } else {
                Merkle::zero()
            };
            let m = m.next(h);
            assert!(self
                .get_revchangeset(&channel.revchanges, &t.into())?
                .is_none());
            assert!(btree::put(
                &mut self.txn,
                &mut channel.changes,
                &p,
                &t.into()
            )?);
            assert!(btree::put(
                &mut self.txn,
                &mut channel.revchanges,
                &t.into(),
                &Pair { a: p, b: m.into() }
            )?);
            assert!(btree::put(
                &mut self.txn,
                &mut channel.states,
                &m.into(),
                &t.into(),
            )?);
            Ok(Some(m.into()))
        }
    }

    fn del_changes(
        &mut self,
        channel: &mut Self::Channel,
        p: ChangeId,
        t: ApplyTimestamp,
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        let mut repl = Vec::new();
        let tl = t.into();
        for x in btree::iter(&self.txn, &channel.revchanges, Some((&tl, None)))? {
            let (t_, p) = x?;
            if *t_ >= tl {
                repl.push((*t_, p.a, p.b))
            }
        }
        let mut m = Merkle::zero();
        for x in btree::rev_iter(&self.txn, &channel.revchanges, Some((&tl, None)))? {
            let (t_, mm) = x?;
            if t_ < &tl {
                m = (&mm.b).into();
                break;
            }
        }
        for (t_, p, m0) in repl.iter() {
            debug!("del_changes {:?} {:?}", t_, p);
            btree::del(&mut self.txn, &mut channel.revchanges, t_, None)?;
            btree::del(&mut self.txn, &mut channel.states, m0, None)?;
            if *t_ > tl {
                m = m.next(self.get_external(p)?.unwrap());
                btree::put(
                    &mut self.txn,
                    &mut channel.revchanges,
                    t_,
                    &Pair { a: *p, b: m.into() },
                )?;
                btree::put(&mut self.txn, &mut channel.states, &m.into(), t_)?;
            }
        }
        btree::del(&mut self.txn, &mut channel.tags, &t.into(), None)?;
        Ok(btree::del(
            &mut self.txn,
            &mut channel.changes,
            &p,
            Some(&t.into()),
        )?)
    }

    fn tags_mut<'a>(&mut self, channel: &'a mut Self::Channel) -> &'a mut Self::Tags {
        &mut channel.tags
    }

    fn put_tags(
        &mut self,
        channel: &mut Self::Tags,
        n: u64,
        m: &Merkle,
    ) -> Result<(), TxnErr<Self::GraphError>> {
        debug!("put_tags {:?}", m);
        let mm: SerializedMerkle = m.into();
        if btree::get(&self.txn, &channel, &n.into(), None)?.is_some() {
            debug!("already tagged");
            Ok(())
        } else {
            let tl = n.into();
            let mut repl = vec![(tl, mm)];
            replay_tags(self, channel, tl, &mut repl)?;
            Ok(())
        }
    }

    fn del_tags(
        &mut self,
        channel: &mut Self::Tags,
        t: u64,
    ) -> Result<(), TxnErr<Self::GraphError>> {
        replay_tags(self, channel, t.into(), &mut Vec::new())?;
        Ok(())
    }
}

fn replay_tags(
    txn: &mut MutTxn<()>,
    channel: &mut Db<L64, Pair<SerializedMerkle, SerializedMerkle>>,
    tl: L64,
    repl: &mut Vec<(L64, SerializedMerkle)>,
) -> Result<(), TxnErr<SanakirjaError>> {
    let del = repl.is_empty();
    for x in btree::iter(&txn.txn, &channel, Some((&tl, None)))? {
        let (t_, p) = x?;
        if *t_ >= tl {
            repl.push((*t_, p.a))
        }
    }
    let mut m = Merkle::zero();
    for x in btree::rev_iter(&txn.txn, &channel, Some((&tl, None)))? {
        let (t_, mm) = x?;
        if t_ < &tl {
            m = (&mm.b).into();
            break;
        }
    }
    for (t_, p) in repl.iter() {
        debug!("del_tags {:?} {:?}", t_, p);
        btree::del(&mut txn.txn, channel, t_, None)?;
        if *t_ > tl || !del {
            m = m.next(p);
            btree::put(&mut txn.txn, channel, t_, &Pair { a: *p, b: m.into() })?;
        }
    }
    Ok(())
}

impl DepsMutTxnT for MutTxn<()> {
    sanakirja_put_del!(dep, ChangeId, ChangeId, DepsError);
    sanakirja_put_del!(revdep, ChangeId, ChangeId, DepsError);
    sanakirja_put_del!(touched_files, Position<ChangeId>, ChangeId, DepsError);
    sanakirja_put_del!(rev_touched_files, ChangeId, Position<ChangeId>, DepsError);
}

impl TreeMutTxnT for MutTxn<()> {
    sanakirja_put_del!(inodes, Inode, Position<ChangeId>, TreeError, TreeErr);
    sanakirja_put_del!(revinodes, Position<ChangeId>, Inode, TreeError, TreeErr);

    sanakirja_put_del!(tree, PathId, Inode, TreeError, TreeErr);
    sanakirja_put_del!(revtree, Inode, PathId, TreeError, TreeErr);

    fn put_partials(
        &mut self,
        k: &str,
        e: Position<ChangeId>,
    ) -> Result<bool, TreeErr<Self::TreeError>> {
        let k = SmallString::from_str(k);
        btree::put(&mut self.txn, &mut self.partials, &k, &e).map_err(|e| TreeErr(e.into()))
    }

    fn del_partials(
        &mut self,
        k: &str,
        e: Option<Position<ChangeId>>,
    ) -> Result<bool, TreeErr<Self::TreeError>> {
        let k = SmallString::from_str(k);
        btree::del(&mut self.txn, &mut self.partials, &k, e.as_ref()).map_err(|e| TreeErr(e.into()))
    }
}

impl MutTxnT for MutTxn<()> {
    fn put_remote(
        &mut self,
        remote: &mut RemoteRef<Self>,
        k: u64,
        v: (Hash, Merkle),
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        let mut remote = remote.db.lock();
        let h = (&v.0).into();
        let m: SerializedMerkle = (&v.1).into();
        btree::put(
            &mut self.txn,
            &mut remote.remote,
            &k.into(),
            &Pair { a: h, b: m.clone() },
        )?;
        debug!("remote.remote after put: {:?}", remote.remote);
        btree::put(&mut self.txn, &mut remote.states, &m, &k.into())?;
        // if v.2 {
        //     self.put_tags(&mut remote.tags, k, &v.1)?;
        // }
        Ok(btree::put(&mut self.txn, &mut remote.rev, &h, &k.into())?)
    }

    fn del_remote(
        &mut self,
        remote: &mut RemoteRef<Self>,
        k: u64,
    ) -> Result<bool, TxnErr<Self::GraphError>> {
        let mut remote = remote.db.lock();
        let k = k.into();
        match btree::get(&self.txn, &remote.remote, &k, None)? {
            Some((k0, p)) if k0 == &k => {
                debug!("del_remote {:?} {:?}", k0, p);
                let p = p.clone();
                btree::del(&mut self.txn, &mut remote.rev, &p.a, None)?;
                btree::del(&mut self.txn, &mut remote.states, &p.b, None)?;
                Ok(btree::del(
                    &mut self.txn,
                    &mut remote.remote,
                    &k.into(),
                    None,
                )?)
            }
            x => {
                debug!("not found, {:?}", x);
                Ok(false)
            }
        }
    }

    fn open_or_create_channel(&mut self, name: &str) -> Result<ChannelRef<Self>, Self::GraphError> {
        unsafe {
            let name = crate::small_string::SmallString::from_str(name);
            let mut commit = None;
            let result = match self.open_channels.lock().entry(name.clone()) {
                Entry::Vacant(v) => {
                    let r = match btree::get(&self.txn, &self.channels, &name, None)? {
                        Some((name_, b)) if name_ == name.as_ref() => ChannelRef {
                            r: Arc::new(RwLock::new(Channel {
                                graph: Db::from_page(b.graph.into()),
                                changes: Db::from_page(b.changes.into()),
                                revchanges: UDb::from_page(b.revchanges.into()),
                                states: UDb::from_page(b.states.into()),
                                tags: Db::from_page(b.tags.into()),
                                apply_counter: b.apply_counter.into(),
                                last_modified: b.last_modified.into(),
                                id: b.id,
                                name: name.clone(),
                            })),
                        },
                        _ => {
                            let br = ChannelRef {
                                r: Arc::new(RwLock::new(Channel {
                                    graph: btree::create_db_(&mut self.txn)?,
                                    changes: btree::create_db_(&mut self.txn)?,
                                    revchanges: btree::create_db_(&mut self.txn)?,
                                    states: btree::create_db_(&mut self.txn)?,
                                    tags: btree::create_db_(&mut self.txn)?,
                                    id: {
                                        let mut rng = rand::thread_rng();
                                        use rand::Rng;
                                        let mut x = RemoteId([0; 16]);
                                        for x in x.0.iter_mut() {
                                            *x = rng.gen()
                                        }
                                        x
                                    },
                                    apply_counter: 0,
                                    last_modified: 0,
                                    name: name.clone(),
                                })),
                            };
                            commit = Some(br.clone());
                            br
                        }
                    };
                    v.insert(r).clone()
                }
                Entry::Occupied(occ) => occ.get().clone(),
            };
            if let Some(commit) = commit {
                self.put_channel(commit)?;
            }
            Ok(result)
        }
    }

    fn fork(
        &mut self,
        channel: &ChannelRef<Self>,
        new_name: &str,
    ) -> Result<ChannelRef<Self>, ForkError<Self::GraphError>> {
        let channel = channel.r.read();
        let name = SmallString::from_str(new_name);
        match btree::get(&self.txn, &self.channels, &name, None)
            .map_err(|e| ForkError::Txn(e.into()))?
        {
            Some((name_, _)) if name_ == name.as_ref() => {
                Err(super::ForkError::ChannelNameExists(new_name.to_string()))
            }
            _ => {
                let br = ChannelRef {
                    r: Arc::new(RwLock::new(Channel {
                        graph: btree::fork_db(&mut self.txn, &channel.graph)
                            .map_err(|e| ForkError::Txn(e.into()))?,
                        changes: btree::fork_db(&mut self.txn, &channel.changes)
                            .map_err(|e| ForkError::Txn(e.into()))?,
                        revchanges: btree::fork_db(&mut self.txn, &channel.revchanges)
                            .map_err(|e| ForkError::Txn(e.into()))?,
                        states: btree::fork_db(&mut self.txn, &channel.states)
                            .map_err(|e| ForkError::Txn(e.into()))?,
                        tags: btree::fork_db(&mut self.txn, &channel.tags)
                            .map_err(|e| ForkError::Txn(e.into()))?,
                        name: name.clone(),
                        apply_counter: channel.apply_counter,
                        last_modified: channel.last_modified,
                        id: {
                            let mut rng = rand::thread_rng();
                            use rand::Rng;
                            let mut x = RemoteId([0; 16]);
                            for x in x.0.iter_mut() {
                                *x = rng.gen()
                            }
                            x
                        },
                    })),
                };
                self.open_channels.lock().insert(name, br.clone());
                Ok(br)
            }
        }
    }

    fn rename_channel(
        &mut self,
        channel: &mut ChannelRef<Self>,
        new_name: &str,
    ) -> Result<(), ForkError<Self::GraphError>> {
        let name = SmallString::from_str(new_name);
        match btree::get(&self.txn, &self.channels, &name, None)
            .map_err(|e| ForkError::Txn(e.into()))?
        {
            Some((name_, _)) if name_ == name.as_ref() => {
                Err(super::ForkError::ChannelNameExists(new_name.to_string()))
            }
            _ => {
                btree::del(
                    &mut self.txn,
                    &mut self.channels,
                    &channel.r.read().name,
                    None,
                )
                .map_err(|e| ForkError::Txn(e.into()))?;
                std::mem::drop(
                    self.open_channels
                        .lock()
                        .remove(&channel.r.read().name)
                        .unwrap(),
                );
                channel.r.write().name = name.clone();
                self.open_channels.lock().insert(name, channel.clone());
                Ok(())
            }
        }
    }

    fn drop_channel(&mut self, name0: &str) -> Result<bool, Self::GraphError> {
        unsafe {
            debug!(target: "drop_channel", "drop channel {:?}", name0);
            let name = SmallString::from_str(name0);
            let channel = if let Some(channel) = self.open_channels.lock().remove(&name) {
                let channel = Arc::try_unwrap(channel.r)
                    .map_err(|_| SanakirjaError::ChannelRc {
                        c: name0.to_string(),
                    })?
                    .into_inner();
                Some((
                    channel.graph,
                    channel.changes,
                    channel.revchanges,
                    channel.states,
                    channel.tags,
                ))
            } else if let Some((name_, chan)) = btree::get(&self.txn, &self.channels, &name, None)?
            {
                if name_ == name.as_ref() {
                    Some((
                        Db::from_page(chan.graph.into()),
                        Db::from_page(chan.changes.into()),
                        UDb::from_page(chan.revchanges.into()),
                        UDb::from_page(chan.states.into()),
                        Db::from_page(chan.tags.into()),
                    ))
                } else {
                    None
                }
            } else {
                None
            };
            btree::del(&mut self.txn, &mut self.channels, &name, None)?;
            if let Some((a, b, c, d, e)) = channel {
                let mut unused_changes = Vec::new();
                'outer: for x in btree::rev_iter(&self.txn, &c, None)? {
                    let (_, p) = x?;
                    debug!(target: "drop_channel", "testing unused change: {:?}", p);
                    for chan in self.channels("").map_err(|e| e.0)? {
                        debug!(target: "drop_channel", "channel: {:?}", name);
                        let chan = chan.read();
                        assert_ne!(chan.name.as_str(), name0);
                        if self
                            .channel_has_state(&chan.states, &p.b)
                            .map_err(|e| e.0)?
                            .is_some()
                        {
                            // This other channel is in the same state as
                            // our dropped channel is, so all subsequent
                            // patches are in use.
                            break 'outer;
                        }
                        if self
                            .get_changeset(&chan.changes, &p.a)
                            .map_err(|e| e.0)?
                            .is_some()
                        {
                            // This channel has a patch, move on.
                            continue 'outer;
                        }
                    }

                    debug!(target: "drop_channel", "actually unused: {:?}", p);
                    unused_changes.push(p.a);
                }
                let mut deps = Vec::new();
                for ch in unused_changes.iter() {
                    for x in btree::iter(&self.txn, &self.dep, Some((ch, None)))? {
                        let (k, v) = x?;
                        if k > ch {
                            break;
                        }
                        deps.push((*k, *v));
                    }
                    for (k, v) in deps.drain(..) {
                        debug!(target: "drop_channel", "deleting from revdep: {:?} {:?}", k, v);
                        btree::del(&mut self.txn, &mut self.dep, &k, Some(&v))?;
                        btree::del(&mut self.txn, &mut self.revdep, &v, Some(&k))?;
                    }
                }
                btree::drop(&mut self.txn, a)?;
                btree::drop(&mut self.txn, b)?;
                btree::drop(&mut self.txn, c)?;
                btree::drop(&mut self.txn, d)?;
                btree::drop(&mut self.txn, e)?;
                Ok(true)
            } else {
                Ok(false)
            }
        }
    }

    fn open_or_create_remote(
        &mut self,
        id: RemoteId,
        path: &str,
    ) -> Result<RemoteRef<Self>, Self::GraphError> {
        unsafe {
            let mut commit = None;
            match self.open_remotes.lock().entry(id) {
                Entry::Vacant(v) => {
                    let r = match btree::get(&self.txn, &self.remotes, &id, None)? {
                        Some((name_, remote)) if *name_ == id => RemoteRef {
                            db: Arc::new(Mutex::new(Remote {
                                remote: UDb::from_page(remote.remote.into()),
                                rev: UDb::from_page(remote.rev.into()),
                                states: UDb::from_page(remote.states.into()),
                                id_rev: remote.id_rev.into(),
                                tags: Db::from_page(remote.tags.into()),
                                path: SmallString::from_str(path),
                            })),
                            id,
                        },
                        _ => {
                            let br = RemoteRef {
                                db: Arc::new(Mutex::new(Remote {
                                    remote: btree::create_db_(&mut self.txn)?,
                                    rev: btree::create_db_(&mut self.txn)?,
                                    states: btree::create_db_(&mut self.txn)?,
                                    id_rev: 0u64.into(),
                                    tags: btree::create_db(&mut self.txn)?,
                                    path: SmallString::from_str(path),
                                })),
                                id,
                            };
                            commit = Some(br.clone());
                            br
                        }
                    };
                    v.insert(r);
                }
                Entry::Occupied(_) => {}
            }
            if let Some(commit) = commit {
                self.put_remotes(commit)?;
            }
            Ok(self.open_remotes.lock().get(&id).unwrap().clone())
        }
    }

    fn drop_remote(&mut self, remote: RemoteRef<Self>) -> Result<bool, Self::GraphError> {
        let r = self.open_remotes.lock().remove(&remote.id).unwrap();
        std::mem::drop(remote);
        assert_eq!(Arc::strong_count(&r.db), 1);
        Ok(btree::del(&mut self.txn, &mut self.remotes, &r.id, None)?)
    }

    fn drop_named_remote(&mut self, id: RemoteId) -> Result<bool, Self::GraphError> {
        if let Some(r) = self.open_remotes.lock().remove(&id) {
            assert_eq!(Arc::strong_count(&r.db), 1);
        }
        Ok(btree::del(&mut self.txn, &mut self.remotes, &id, None)?)
    }

    fn commit(mut self) -> Result<(), Self::GraphError> {
        use std::ops::DerefMut;
        {
            let open_channels =
                std::mem::replace(self.open_channels.lock().deref_mut(), HashMap::default());
            for (name, channel) in open_channels {
                debug!("commit_channel {:?}", name);
                self.commit_channel(channel)?
            }
        }
        {
            let open_remotes =
                std::mem::replace(self.open_remotes.lock().deref_mut(), HashMap::default());
            for (name, remote) in open_remotes {
                debug!("commit remote {:?}", name);
                self.commit_remote(remote)?
            }
        }
        if let Some(ref cur) = self.cur_channel {
            unsafe {
                assert!(cur.len() < 256);
                let b = self.txn.root_page_mut();
                b[4096 - 256] = cur.len() as u8;
                std::ptr::copy(cur.as_ptr(), b.as_mut_ptr().add(4096 - 255), cur.len())
            }
        }
        // No need to set `Root::Version`, it is set at init.
        debug!(
            "{:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x} {:x}",
            self.tree.db,
            self.revtree.db,
            self.inodes.db,
            self.revinodes.db,
            self.internal.db,
            self.external.db,
            self.revdep.db,
            self.channels.db,
            self.remotes.db,
            self.touched_files.db,
            self.dep.db,
            self.rev_touched_files.db,
            self.partials.db,
        );
        self.txn
            .set_root(Root::Tree as usize, u64::from(self.tree.db).into());
        self.txn
            .set_root(Root::RevTree as usize, u64::from(self.revtree.db).into());
        self.txn
            .set_root(Root::Inodes as usize, u64::from(self.inodes.db).into());
        self.txn
            .set_root(Root::RevInodes as usize, self.revinodes.db.into());
        self.txn
            .set_root(Root::Internal as usize, self.internal.db.into());
        self.txn
            .set_root(Root::External as usize, self.external.db.into());
        self.txn
            .set_root(Root::RevDep as usize, self.revdep.db.into());
        self.txn
            .set_root(Root::Channels as usize, self.channels.db.into());
        self.txn
            .set_root(Root::Remotes as usize, self.remotes.db.into());
        self.txn
            .set_root(Root::TouchedFiles as usize, self.touched_files.db.into());
        self.txn.set_root(Root::Dep as usize, self.dep.db.into());
        self.txn.set_root(
            Root::RevTouchedFiles as usize,
            self.rev_touched_files.db.into(),
        );
        self.txn
            .set_root(Root::Partials as usize, self.partials.db.into());
        self.txn.commit()?;
        Ok(())
    }

    fn set_current_channel(&mut self, cur: &str) -> Result<(), Self::GraphError> {
        self.cur_channel = Some(cur.to_string());
        Ok(())
    }
}

impl Txn {
    pub fn load_const_channel(&self, name: &str) -> Result<Option<Channel>, SanakirjaError> {
        unsafe {
            let name = SmallString::from_str(name);
            match btree::get(&self.txn, &self.channels, &name, None)? {
                Some((name_, c)) if name.as_ref() == name_ => {
                    debug!("load_const_channel = {:?} {:?}", name_, c);
                    Ok(Some(Channel {
                        graph: Db::from_page(c.graph.into()),
                        changes: Db::from_page(c.changes.into()),
                        revchanges: UDb::from_page(c.revchanges.into()),
                        states: UDb::from_page(c.states.into()),
                        tags: Db::from_page(c.tags.into()),
                        apply_counter: c.apply_counter.into(),
                        last_modified: c.last_modified.into(),
                        id: c.id,
                        name,
                    }))
                }
                _ => Ok(None),
            }
        }
    }
}

impl<T> MutTxn<T> {
    fn put_channel(&mut self, channel: ChannelRef<Self>) -> Result<(), SanakirjaError> {
        debug!("Commit_channel.");
        let channel = channel.r.read();
        debug!("Commit_channel, dbs_channels = {:?}", self.channels);
        btree::del(&mut self.txn, &mut self.channels, &channel.name, None)?;
        debug!(
            "channels: {:x} {:x} {:x} {:x} {:x}",
            channel.graph.db,
            channel.changes.db,
            channel.revchanges.db,
            channel.states.db,
            channel.tags.db,
        );
        let sc = SerializedChannel {
            graph: u64::from(channel.graph.db).into(),
            changes: u64::from(channel.changes.db).into(),
            revchanges: u64::from(channel.revchanges.db).into(),
            states: u64::from(channel.states.db).into(),
            tags: u64::from(channel.tags.db).into(),
            apply_counter: channel.apply_counter.into(),
            last_modified: channel.last_modified.into(),
            id: channel.id,
        };
        btree::put(&mut self.txn, &mut self.channels, &channel.name, &sc)?;
        debug!("Commit_channel, self.channels = {:?}", self.channels);
        Ok(())
    }

    fn commit_channel(&mut self, channel: ChannelRef<Self>) -> Result<(), SanakirjaError> {
        std::mem::drop(self.open_channels.lock().remove(&channel.r.read().name));
        self.put_channel(channel)
    }

    fn put_remotes(&mut self, remote: RemoteRef<Self>) -> Result<(), SanakirjaError> {
        btree::del(&mut self.txn, &mut self.remotes, &remote.id, None)?;
        debug!("Commit_remote, dbs_remotes = {:?}", self.remotes);
        let r = remote.db.lock();
        let rr = OwnedSerializedRemote {
            _remote: u64::from(r.remote.db).into(),
            _rev: u64::from(r.rev.db).into(),
            _states: u64::from(r.states.db).into(),
            _id_rev: r.id_rev.into(),
            _tags: u64::from(r.tags.db).into(),
            _path: r.path.clone(),
        };
        debug!("put {:?}", rr);
        btree::put(&mut self.txn, &mut self.remotes, &remote.id, &rr)?;
        debug!("Commit_remote, self.dbs.remotes = {:?}", self.remotes);
        Ok(())
    }

    fn commit_remote(&mut self, remote: RemoteRef<Self>) -> Result<(), SanakirjaError> {
        std::mem::drop(self.open_remotes.lock().remove(&remote.id));
        // assert_eq!(Rc::strong_count(&remote.db), 1);
        self.put_remotes(remote)
    }
}

direct_repr!(ChangeId);
impl ::sanakirja::debug::Check for ChangeId {}

direct_repr!(Vertex<ChangeId>);
impl ::sanakirja::debug::Check for Vertex<ChangeId> {}

direct_repr!(Position<ChangeId>);
impl ::sanakirja::debug::Check for Position<ChangeId> {}

direct_repr!(SerializedEdge);
impl ::sanakirja::debug::Check for SerializedEdge {}

impl ::sanakirja::debug::Check for PathId {}
impl Storable for PathId {
    fn compare<T>(&self, _: &T, x: &Self) -> std::cmp::Ordering {
        self.cmp(x)
    }
    type PageReferences = std::iter::Empty<u64>;
    fn page_references(&self) -> Self::PageReferences {
        std::iter::empty()
    }
}
impl UnsizedStorable for PathId {
    const ALIGN: usize = 8;
    fn size(&self) -> usize {
        9 + self.basename.len()
    }
    unsafe fn onpage_size(p: *const u8) -> usize {
        let len = *(p.add(8)) as usize;
        9 + len
    }
    unsafe fn from_raw_ptr<'a, T>(_: &T, p: *const u8) -> &'a Self {
        path_id_from_raw_ptr(p)
    }
    unsafe fn write_to_page_alloc<T: AllocPage>(&self, t: &mut T, p: *mut u8) {
        *(p as *mut u64) = (self.parent_inode.0).0;
        self.basename.write_to_page_alloc(t, p.add(8))
    }
}

unsafe fn path_id_from_raw_ptr<'a>(p: *const u8) -> &'a PathId {
    let len = *(p.add(8)) as usize;
    std::mem::transmute(std::slice::from_raw_parts(p, 1 + len as usize))
}

#[test]
fn pathid_repr() {
    let o = OwnedPathId {
        parent_inode: Inode::ROOT,
        basename: SmallString::from_str("blablabla"),
    };
    let mut x = vec![0u8; 200];

    unsafe {
        o.write_to_page(x.as_mut_ptr());
        let p = path_id_from_raw_ptr(x.as_ptr());
        assert_eq!(p.basename.as_str(), "blablabla");
        assert_eq!(p.parent_inode, Inode::ROOT);
    }
}

direct_repr!(Inode);
impl ::sanakirja::debug::Check for Inode {}
direct_repr!(SerializedMerkle);
impl ::sanakirja::debug::Check for SerializedMerkle {}
direct_repr!(SerializedHash);
impl ::sanakirja::debug::Check for SerializedHash {}

impl<A: ::sanakirja::debug::Check, B: ::sanakirja::debug::Check> ::sanakirja::debug::Check
    for Pair<A, B>
{
    fn add_refs<T: LoadPage>(
        &self,
        txn: &T,
        pages: &mut std::collections::BTreeMap<u64, usize>,
    ) -> Result<(), T::Error>
    where
        T::Error: std::fmt::Debug,
    {
        self.a.add_refs(txn, pages)?;
        self.b.add_refs(txn, pages)
    }
}
impl<A: Storable, B: Storable> Storable for Pair<A, B> {
    type PageReferences = core::iter::Chain<A::PageReferences, B::PageReferences>;
    fn page_references(&self) -> Self::PageReferences {
        self.a.page_references().chain(self.b.page_references())
    }
    fn compare<T: LoadPage>(&self, t: &T, b: &Self) -> core::cmp::Ordering {
        match self.a.compare(t, &b.a) {
            core::cmp::Ordering::Equal => self.b.compare(t, &b.b),
            ord => ord,
        }
    }
}

impl<A: Ord + UnsizedStorable, B: Ord + UnsizedStorable> UnsizedStorable for Pair<A, B> {
    const ALIGN: usize = std::mem::align_of::<(A, B)>();

    fn size(&self) -> usize {
        let a = self.a.size();
        let b_off = (a + (B::ALIGN - 1)) & !(B::ALIGN - 1);
        (b_off + self.b.size() + (Self::ALIGN - 1)) & !(Self::ALIGN - 1)
    }
    unsafe fn onpage_size(p: *const u8) -> usize {
        let a = A::onpage_size(p);
        let b_off = (a + (B::ALIGN - 1)) & !(B::ALIGN - 1);
        let b_size = B::onpage_size(p.add(b_off));
        (b_off + b_size + (Self::ALIGN - 1)) & !(Self::ALIGN - 1)
    }
    unsafe fn from_raw_ptr<'a, T>(_: &T, p: *const u8) -> &'a Self {
        &*(p as *const Self)
    }
    unsafe fn write_to_page_alloc<T: sanakirja::AllocPage>(&self, t: &mut T, p: *mut u8) {
        self.a.write_to_page_alloc(t, p);
        let off = (self.a.size() + (B::ALIGN - 1)) & !(B::ALIGN - 1);
        self.b.write_to_page_alloc(t, p.add(off));
    }
}

impl ::sanakirja::debug::Check for SerializedRemote {}
impl Storable for SerializedRemote {
    type PageReferences = std::iter::Empty<u64>;
    fn page_references(&self) -> Self::PageReferences {
        std::iter::empty()
    }
    fn compare<T: LoadPage>(&self, _t: &T, b: &Self) -> core::cmp::Ordering {
        self.cmp(b)
    }
}

const REMOTE_LEN: usize = 40;

impl UnsizedStorable for SerializedRemote {
    const ALIGN: usize = 8;

    fn size(&self) -> usize {
        REMOTE_LEN + 1 + self.path.len()
    }
    unsafe fn onpage_size(p: *const u8) -> usize {
        REMOTE_LEN + 1 + (*p.add(REMOTE_LEN)) as usize
    }
    unsafe fn from_raw_ptr<'a, T>(_: &T, p: *const u8) -> &'a Self {
        let len = *p.add(REMOTE_LEN) as usize;
        let m: &SerializedRemote =
            std::mem::transmute(std::slice::from_raw_parts(p, 1 + len as usize));
        m
    }
    unsafe fn write_to_page_alloc<T: sanakirja::AllocPage>(&self, _: &mut T, p: *mut u8) {
        std::ptr::copy(
            &self.remote as *const L64 as *const u8,
            p,
            REMOTE_LEN + 1 + self.path.len(),
        );
        debug!(
            "write_to_page: {:?}",
            std::slice::from_raw_parts(p, REMOTE_LEN + 1 + self.path.len())
        );
    }
}

#[derive(Debug)]
#[repr(C)]
struct OwnedSerializedRemote {
    _remote: L64,
    _rev: L64,
    _states: L64,
    _id_rev: L64,
    _tags: L64,
    _path: SmallString,
}

impl std::ops::Deref for OwnedSerializedRemote {
    type Target = SerializedRemote;
    fn deref(&self) -> &Self::Target {
        let len = REMOTE_LEN + 1 + self._path.len() as usize;
        unsafe {
            std::mem::transmute(std::slice::from_raw_parts(
                self as *const Self as *const u8,
                len,
            ))
        }
    }
}

direct_repr!(SerializedChannel);
impl ::sanakirja::debug::Check for SerializedChannel {}

direct_repr!(RemoteId);
impl ::sanakirja::debug::Check for RemoteId {}