E4MD6T3LNOYWVFTFFWCUKRNS4M2XVSKRLDWPYHMZHGDNO2T5JREQC TSMS6W4DOKQNUQ4PEMTLOIODR33VFPN6MMNS73ZPSU4BOQVRGPNAC W2MIZD5BNL7A5HVFWTESF57QU7T6QMEF4RBSLFQXMEEU3XD2NU2QC OTWDDJE7TTE73D6BGF4ZN6BH2NFUFLPME2VJ3CPALH463UGWLEIQC 6DCQHIFPEH4GZKSRRS32GMKDRPZH4MTCGOUEI7YEUVKWENBF3JWAC UAQX27N4PI4LHEW6LSHJETIE5MV7JTEMPLTJFYUBMYVPC43H7VOAC YWFYZNLZ5JHLIFVBRKZK4TSWVPROUPRG77ZB5M7UHT2OKPL4ZSRQC X3QVVQIS7B7L3XYZAWL3OOBUXOJ6RMOKQ45YMLLGAHYPEEKZ45ZAC MSRWB47YP6L5BVTS53QQPBOHY5SXTSTR5KD6IIF35UWCTEUOCQWQC Q7DRIBBRE4MNG4NP3PVIXAJF5PQYLFWYIVK2O4VVLEO6XY3BOSFQC QYDGYIZRNFRIQD7RUCY5YAN3F2THZA74E5UOHPIFWSULEJFAFVJQC LSQ6V7M66TEGLJ7QBLRVDX4E7UKJTDQTEXZOS3KGPGFKVXNLPKBQC OFINGD26ZWCRDVVDI2ZIBLMHXKEMJA6MRNLANJYUHQPIJLPA7J2AC G4JEQLLX6Q7VVFVAEJZAVQXX33MQ36CSCYSMJ5NQM5VZ76DXKU6QC H3FVSQIQGFCFKCPXVOSFHP4OSUOBBURJESCZNGQTNDAAD3WQSBEQC AFKBHYVE25QMIU2WZATEGWG2EXSBQZ44DVDPACKNS3M4QGBF4ONAC EYNN7RLSFVBWDLRTLNNFUAF46Q6OX3BR5SUEJIOOHBSNP7FVBXGAC LROAI3NBBSCU4T2YA6EHJYKKKL75AU5A7C7WIRCGIQ56S6HPLRXQC XEU2QVLCHPYOOD4TQIPEEVYOVSFMKFPLJYWEJYXYJAZ7S54KWDZAC NXMFNPZ7VWJRLC3M5QJJVTICXCMGE24F3HVIZA7A7RLVMLQMLDVQC OP6SVMOD2GTQ7VNJ4E5KYFG4MIYA7HBMXJTADALMZH4PY7OQRMZQC SO25TWFLSRQIVTJTTSN77LO5FZQVQPIZTSBULH7MWBBDEWSK3OCAC AOX2XQISHGWNNAFBYRN44Q6AWG7H5DPBK5YMFHK42HQNZ2TMHEJQC S4V4QZ5CF5LUDYWNR2UMWH6CHJDJ5FPGAZCQYM5GY7FJMJV4NN4QC YXKP4AIWDBIWBBUDWF66YIPG5ECMHNKEV3PX6KYXOVXY3EWG3WGQC EAAYH6BQWDK52EC5RG3BEZQU3FJPN5RRRN4U5KDKDVPKXBVJMNDAC KMT3MF5NLEQIPZLHCRYDGQ5EA46HJCG3C2ANEPMZGKGHDK77ADPAC WS4ZQM4RMIHZ6XZKSDQJGHN5SSSWFL4H236USOPUA33S6RC53RFAC UUUVNC4DWEEL7WV5IRPKPZ6HZMYCPA53XM7LJWICUD4E6GN37IRQC ONES3V466GLO5CXKRF5ENK7VFOQPWM3YXLVRGWB56V5SH3W7XNBQC 6UVFCERMGSGNRWCVC3GWO5HWV6MSWE433DXBJVC7KRPP6LLJLCSQC 6DMPXOAT5GQ3BQQOMUZN2GMBQPRA4IB7CCPHTQTIFGO3KWWAKF3QC T7QB6QEPWBXAU3RL7LE4GRDWWNQ65ZU2YNNTWBYLORJOABAQFEZQC OHUZ73MKWD7SSB4DKKA532DEQKXQDS6PZ6HJ3EC2DLVJSLQH3NLAC assert!(t.elapsed().unwrap() >= std::time::Duration::from_millis(900));info!("started child mutable txn {:?}", txn.env.root.lock());
assert!(t.elapsed().unwrap() >= std::time::Duration::from_millis(90));info!("started child mutable txn {:?}", txn.root);
unsafe { libc::wait(&mut child) };
std::thread::sleep(std::time::Duration::from_millis(100));let txn = Env::txn_begin(&env).unwrap();info!("started parent txn {:?}", txn.root);// The parent committed, this is a new transaction.assert_eq!(txn.root, 0);let mut status = 1;unsafe { libc::wait(&mut status) };assert_eq!(status, 0);
}#[test]fn more_than_two_versions() {env_logger::try_init().unwrap_or(());let n = 5;let env = Env::new_anon(40960, n).unwrap();let mut txn = Env::mut_txn_begin(&env).unwrap();// Allocate two pages.for i in 0..n {let page = txn.alloc_page().unwrap();debug!("page = {:?}", page);txn.set_root(i, page.0.offset);}txn.commit().unwrap();for i in 0..n {let mut txn = Env::mut_txn_begin(&env).unwrap();// Free one of the pages.debug!("root(0) = {:?}", txn.root(i));txn.decr_rc(txn.root(i).unwrap()).unwrap();txn.remove_root(i);txn.commit().unwrap();}let mut txn = Env::mut_txn_begin(&env).unwrap();unsafe {let p = &*(env.mmaps.lock()[0].ptr.add(txn.root * PAGE_SIZE) as *const GlobalHeader);debug!("free page: 0x{:x}", u64::from_le(p.free_db));let db: Db<u64, ()> = Db {db: u64::from_le(p.free_db),k: std::marker::PhantomData,v: std::marker::PhantomData,p: std::marker::PhantomData,};for x in iter(&txn, &db, None).unwrap() {debug!("0x{:x}", x.unwrap().0);}}let page = txn.alloc_page().unwrap();debug!("page = {:?}", page);
//!//! The binary format of a Sanakirja database is the following://!//! - There is a fixed number of "current versions", set at file//! initialisation. If a file has n versions, then for all k between 0//! and n-1 (included), the k^th page (i.e. the byte positions between//! `k * 4096` and `(k+1) * 4096`, also written as `k << 12` and//! `(k+1) << 12`) stores the data relative to that version, and is//! called the "root page" of that version.//!//! This is a way to handle concurrent access: indeed, mutable//! transactions do not exclude readers, but readers that started//! before the commit of a mutable transaction will keep reading the//! database as it was before the commit. However, this means that//! older versions of the database have to be kept "alive", and the//! "number of current versions" here is the limit on the number of//! versions that can be kept "alive" at the same time.//!//! When a reader starts, it takes a shared file lock on the file//! representing the youngest committed version. When a writer starts,//! it takes an exclusive file lock on the file representing the//! oldest committed version. This implies that if readers are still//! reading that version, the writer will wait for the exclusive lock.//!//! After taking a lock, the writer (also called "mutable//! transaction" or [`MutTxn`]) copies the entire root page of the//! youngest committed version onto the root page of the oldest//! committed version, hence erasing the root page of the oldest//! version.//!//! - Root pages have the following format: a 32-bytes header//! (described below), followed by 4064 bytes, usable in a more or//! less free format. The current implementation defines two methods//! on [`MutTxn`], [`MutTxn::set_root`] and [`MutTxn::remove_root`],//! treating that space as an array of type `[u64; 510]`. A reasonable//! use for these is to point to different datastructures allocated in//! the file, such as the offsets in the file to the root pages of B//! trees.//!//! Now, about the header, there's a version identifier on the first//! 16 bytes, followed by two bytes: `root` is the version used by the//! current mutable transaction (if there is current mutable//! transaction), or by the next mutable transaction (else). The//! `n_roots` field is the total number of versions.//!//! ```//! #[repr(C)]//! pub struct GlobalHeader {//! /// Version of Sanakirja//! pub version: u16,//! /// Which page is currently the root page? (only valid for page 0).//! pub root: u8,//! /// Total number of versions (or "root pages")//! pub n_roots: u8,//! /// CRC of this page.//! pub crc: u32,//! /// First free page at the end of the file (only valid for page 0).//! pub length: u64,//! /// Offset of the free list.//! pub free_db: u64,//! /// Offset of the RC database.//! pub rc_db: u64,//! }//! ```
pub use environment::{Env, MutTxn, Txn};pub use sanakirja_core::{btree, direct_repr, CowPage, LoadPage, MutPage, Page, Storable};
pub use environment::{Commit, Env, MutTxn, RootDb, Txn};pub use sanakirja_core::{btree, direct_repr, LoadPage, Storable, UnsizedStorable};
/// Reference counts use a strange encoding, meant to avoid code/// bloat: indeed, the list of free pages uses `Db<u64, ()>`, so/// we're just reusing the same code here, encoding the reference/// counts in the 12 least significant bits of the keys, and the/// actual pages in the 52 most significant bits.
////// Since we can't reuse them in the same transaction, another/// option would be to put them directly into the table of free/// pages. However, since calls to `put` may allocate and free/// pages, this could recurse infinitely, which is why we store/// them outside of the file.
let root = env.root.lock();debug!("unlock exclusive {:?}", *root);env.roots[*root].rw.unlock_exclusive();if let Some(ref f) = env.roots[*root].lock_file {f.unlock().unwrap_or(())}
env.mut_txn_unlock().unwrap_or(());env.roots[self.root].rw.unlock_exclusive();env.unlock(self.root).unwrap_or(())
#[cfg(feature = "mmap")]fn mut_txn_lock(&self) -> Result<(), Error> {self.mut_txn_lock.lock();if let Some(ref f) = self.file {f.lock_exclusive()?;}Ok(())}#[cfg(not(feature = "mmap"))]fn mut_txn_lock(&self) -> Result<(), Error> {self.mut_txn_lock.lock();Ok(())}#[cfg(feature = "mmap")]fn mut_txn_unlock(&self) -> Result<(), Error> {unsafe {self.mut_txn_lock.unlock();}if let Some(ref f) = self.file {f.unlock()?}Ok(())}#[cfg(not(feature = "mmap"))]fn mut_txn_unlock(&self) -> Result<(), Error> {unsafe {self.mut_txn_lock.unlock();}Ok(())}
let (header, free, rc) = {let env_ = env.borrow();let maps = env_.mmaps.lock()[0].ptr;let v = env_.root.lock();let n = env_.roots.len();env_.roots[*v].rw.lock_exclusive();if let Some(ref f) = env_.roots[*v].lock_file {f.lock_exclusive()?}// Root of the last MutTxn.let v0 = (*v + n - 1) % env_.roots.len();debug!("v = {:?} v0 = {:?}", v, v0);// Copy the roots of the last transaction onto this one.let page_ptr = maps.offset((v0 * PAGE_SIZE) as isize);let next_page_ptr = maps.offset((*v * PAGE_SIZE) as isize);std::ptr::copy_nonoverlapping(page_ptr.add(8), next_page_ptr.add(8), PAGE_SIZE - 8);
let env_ = env.borrow();
let header = GlobalHeader::from_le(&*(page_ptr as *const GlobalHeader));env_.check_crc(v0)?;
// First, take an exclusive file lock on the whole file to// make sure that no other process is starting a mutable// transaction at the same time. The worst that can happen// here is if the other process commits while we're still// waiting for a lock on the current page, because if that// happens, this new transaction will erase the// transaction in the other process.env_.mut_txn_lock()?;// Then, we can lock the root page of this transaction.let maps = env_.mmaps.lock()[0].ptr;let root = (&*(maps as *const GlobalHeader)).root as usize;debug!("BEGIN_TXN root = {:?}", root);env_.roots[root].rw.lock_exclusive();env_.lock_exclusive(root)?;// Root of the last MutTxn.let v0 = (root + env_.roots.len() - 1) % env_.roots.len();env_.check_crc(v0)?;// Copy the root page of the last transaction onto this// one.let page_ptr = maps.offset((v0 * PAGE_SIZE) as isize);let next_page_ptr = maps.offset((root * PAGE_SIZE) as isize);std::ptr::copy_nonoverlapping(page_ptr.add(8), next_page_ptr.add(8), PAGE_SIZE - 8);
let free = header.free_db;let rc = if header.rc_db == 0 {None} else {Some(btree::Db {db: header.rc_db,k: std::marker::PhantomData,v: std::marker::PhantomData,p: std::marker::PhantomData,})};(header, free, rc)};let length = if header.length == 0 {(PAGE_SIZE as u64) * (header.n_roots as u64)} else {header.length};
// Finally, read the header and start the transaction.let header = GlobalHeader::from_le(&*(next_page_ptr as *const GlobalHeader));debug!("n_roots = {:?}", header.n_roots);
rc,length,free,
rc: if header.rc_db == 0 {None} else {Some(btree::Db::from_page(header.rc_db))},length: if header.length == 0 {(PAGE_SIZE as u64) * (header.n_roots as u64)} else {header.length},free: header.free_db,
unsafe {let mut free_db: btree::Db<u64, ()> = btree::Db::from_page(self.free);while !self.free_owned_pages.is_empty() || !self.free_pages.is_empty() {let mut free_owned_pages =std::mem::replace(&mut self.free_owned_pages, Vec::new());let mut free_pages = std::mem::replace(&mut self.free_pages, Vec::new());for p in free_owned_pages.drain(..).chain(free_pages.drain(..)) {btree::put(&mut self, &mut free_db, &p, &())?;
debug!("COMMIT");// If there's no tree of free pages, and no pages to free,// don't bother with free pages at all (don't even allocate a// tree).let free_db =if self.free == 0 && self.free_owned_pages.is_empty() && self.free_pages.is_empty() {None} else {// Else, allocate or load the tree of free pages.let mut free_db: btree::Db<u64, ()> = if self.free == 0 {btree::create_db(&mut self)?} else {btree::Db::from_page(self.free)};debug!("free_db = {:?}", free_db);// Adding all the pages freed during the transaction to the// tree of free pages. If this call to `btree::put` frees// pages, add them again. This converges in at most log n// iterations (where n is the total number of free pages).// First, set the free table to 0 in this transaction, to// avoid recursing in the calls to `put` below (indeed,// the table of free pages is used when allocating new// pages, which may happen in a call to `put`).self.free = 0;// Then, while there are pages to free, free them. Since// the calls to `put` below might free pages (and add// pages to these two vectors), the (outer) loop might run// for more than one iteration.while !self.free_pages.is_empty() || !self.free_owned_pages.is_empty() {while let Some(p) = self.free_pages.pop() {let p = p & !0xfff;btree::put(&mut self, &mut free_db, &p.to_le(), &())?;}while let Some(p) = self.free_owned_pages.pop() {let p = p & !0xfff;btree::put(&mut self, &mut free_db, &p.to_le(), &())?;}
}for p in self.occupied_owned_pages.iter_mut() {clear_dirty(p);
Some(free_db)};// Clear the dirty bit of all pages we've touched. If they've// been freed and have already been flushed by the kernel, we// don't want to resurrect them to the main memory, so we// check that.let mut occ = std::mem::replace(&mut self.occupied_owned_pages, Vec::new());for p in occ.iter_mut() {if let Some(ref free_db) = free_db {if let Some((pp, ())) = btree::get(&self, free_db, &p.0.offset, None)? {if *pp == p.0.offset {continue;}}
let env = self.env.borrow();let mut maps = env.mmaps.lock();let mut root = env.root.lock();let globptr = maps[0].ptr.add(*root * PAGE_SIZE) as *mut GlobalHeader;(&mut *globptr).length = self.length.to_le();(&mut *globptr).free_db = free_db.db.to_le();self.free = free_db.db;
clear_dirty(p);}
let root_dbs = std::slice::from_raw_parts_mut(maps[0].ptr.add(*root * PAGE_SIZE + GLOBAL_HEADER_SIZE) as *mut u64,
let env = self.env.borrow();let mut maps = env.mmaps.lock();// Get this transaction's root page.let globptr =unsafe { &mut *(maps[0].ptr.add(self.root * PAGE_SIZE) as *mut GlobalHeader) };// Set the length and free database.globptr.length = self.length.to_le();if let Some(free_db) = free_db {debug!("COMMIT: free_db = 0x{:x}", free_db.db);globptr.free_db = free_db.db.to_le();}// Set the "root databases" modified by this transaction.let root_dbs = unsafe {std::slice::from_raw_parts_mut(maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE) as *mut u64,
);for (&r, rr) in self.roots.iter().zip(root_dbs.iter_mut()) {debug!("root_db: {:?}", rr as *mut u64);debug!("committing root: {:?} {:?}", r, rr);if r > 0 {*rr = r}
)};for (&r, rr) in self.roots.iter().zip(root_dbs.iter_mut()) {debug!("root_db: {:?}", rr);debug!("committing root: {:?} {:?}", r, rr);if r > 0 {*rr = r
// Moving the root page, both on page 0, and on the environment.(&mut *(maps[0].ptr as *mut GlobalHeader)).root = *root as u8;
// Move the current global root page by one page on page 0.unsafe {(&mut *(maps[0].ptr as *mut GlobalHeader)).root =(self.root as u8 + 1) % (env.roots.len() as u8);}
for m in maps.iter_mut() {m.flush()?}debug!("commit: unlock {:?}", *root);env.roots[*root].rw.unlock_exclusive();if let Some(ref f) = env.roots[*root].lock_file {debug!("commit: unlock file");f.unlock().unwrap_or(())}*env.first_unused_page.lock() = self.length;*root = (*root + 1) % env.roots.len();Ok(())
// Flush all the maps.for m in maps.iter_mut() {m.flush()?
// And finally, unlock the root page in the environment.debug!("commit: unlock {:?}", self.root);unsafe { env.roots[self.root].rw.unlock_exclusive() };// Unlock the root page on the file lock (if relevant).env.unlock(self.root)?;// And unlock the global mutable transaction mutex.env.mut_txn_unlock()?;debug!("/COMMIT");Ok(())
/// Setting the `num`th element of the initial page, treated as a/// `[u64; 510]`, to `value`. This doesn't actually write anything/// to that page, since that page is written during the commit.////// In the current implementation, `value` is probably going to be/// the offset in the file of the root page of a B tree.
let mut db: btree::Db<u64, ()> = btree::Db {db: self.free,k: std::marker::PhantomData,v: std::marker::PhantomData,p: std::marker::PhantomData,};
let mut db: btree::Db<u64, ()> = btree::Db::from_page(self.free);
// Check whether this page is also free for the other// versions.fn free_for_all(&self, f: u64) -> Result<bool, Error> {let env = self.env.borrow();// We already know it's free for the youngest previous// transaction and for the current one (because the tree of// free pages was copied from there), so we only have// `self.roots.len() - 2` root pages to check.for i in 1..env.roots.len() - 1 {let db: btree::Db<u64, ()> = unsafe {let p = &*(env.mmaps.lock()[0].ptr.add(((self.root + i) % env.roots.len()) * PAGE_SIZE)as *const GlobalHeader);if f >= u64::from_le(p.length) {// Page `f` was allocated strictyl after// transaction `i`.continue;}if p.free_db == 0 {// This version doesn't have any free page.return Ok(false);}btree::Db::from_page(p.free_db)};if let Some((&f_, ())) = btree::get(self, &db, &f, None)? {if f_ != f {return Ok(false);}}}Ok(true)}
fn decr_rc(&mut self, off: u64) -> Result<usize, Error> {debug!("decr_rc {:?} {:?}", off, self.rc);
/// Increment the reference count for page `off`.fn incr_rc(&mut self, off: u64) -> Result<usize, Error> {
btree::del(self, &mut rc_, &rc, None)?;if rc & 0xfff > 2 {btree::put(self, &mut rc_, &(rc - 1), &())?;self.rc = Some(rc_);} else {// Implicit "1".self.rc = Some(rc_)}return Ok((rc & 0xfff) as usize - 1);} else {self.rc = Some(rc_)
btree::del::del_at_cursor(self, &mut rc_, &mut curs)?;
debug!("decr_rc_owned {:?} {:?}", off, self.rc);
let rc = self.decr_rc_(off)?;if rc == 0 {self.free_owned_page(off);}Ok(rc)}}impl<E: Borrow<Env>, A> MutTxn<E, A> {/// Decrement the reference count of page `off`, freeing that page/// if the RC reaches 0 after decrementing it.fn decr_rc_(&mut self, off: u64) -> Result<usize, Error> {debug!("decr_rc 0x{:x} {:?}", off, self.rc);// If there's no RC table, free the page. Also, in order to// avoid infinite recursion (since `del` and `put` below might// free pages), we `take` the reference counter table.
}fn incr_rc(&mut self, off: u64) -> Result<usize, Error> {debug!("incr_rc {:?}", off);if let Some(mut rc_) = self.rc.take() {let mut curs = btree::cursor::Cursor::new(self, &rc_)?;let rc = if let Some((rc, _)) = curs.set(self, &off, None)? {if *rc & !0xfff == off {*rc & 0xfff} else {1}} else {1};if rc > 1 {btree::del::del_at_cursor(self, &mut rc_, &mut curs)?;}assert!(rc + 1 <= 0xfff);btree::put(self, &mut rc_, &(off | (rc + 1)), &())?;self.rc = Some(rc_);Ok(rc as usize + 1)} else {let mut rc = btree::create_db(self)?;debug!("put in RC: {:?} {:?}", rc, off | 2);btree::put(self, &mut rc, &(off | 2), &())?;debug!("done");self.rc = Some(rc);Ok(2)}
impl<E: Borrow<Env>, T> RootDb for MutTxn<E, T> {fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(&self,n: usize,) -> Option<sanakirja_core::btree::Db_<K, V, P>> {
impl<E: Borrow<Env>, T> MutTxn<E, T> {/// Low-level method to get the root page number `n`, if that page/// isn't a B tree (use the [`RootDb`] trait else).pub fn root(&self, n: usize) -> Option<u64> {
}}}impl<E: Borrow<Env>, T> RootDb for MutTxn<E, T> {// Just call method `root` and convert the result to a `Db`.fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(&self,n: usize,) -> Option<sanakirja_core::btree::Db_<K, V, P>> {if let Some(db) = self.root(n) {Some(sanakirja_core::btree::Db_::from_page(db))} else {None
/// Environment, required to start any transactions. Thread-safe, but/// opening the same database several times in the same process is not/// cross-platform.
/// An environment, which may be either a memory-mapped file, or/// memory allocated with [`std::alloc`].
roots: Vec<RootLock>,// Root number of the next MutTxn.pub(crate) root: Mutex<usize>,}struct RootLock {/// It is undefined behavior to have a file mmapped for than once.#[cfg(feature = "mmap")]lock_file: Option<std::fs::File>,rw: parking_lot::RawRwLock,n_txn: AtomicUsize,
pub(crate) roots: Vec<RootLock>,
#[cfg(feature = "mmap")]impl Drop for Env {fn drop(&mut self) {for map in self.mmaps.lock().drain(..) {drop(map.mmap);}}}
/// An immutable transaction.pub struct Txn<E: Borrow<Env>> {env: E,root: usize,}#[cfg(feature = "mmap")]#[test]#[should_panic]fn nroots_test() {let path = tempfile::tempdir().unwrap();let path = path.path().join("db");let l0 = 1 << 15; // 8 pagesEnv::new(&path, l0, 19).unwrap();}#[cfg(feature = "mmap")]#[test]fn mmap_growth_test() {let path = tempfile::tempdir().unwrap();let path = path.path().join("db");let l0 = 1 << 15; // 8 pages{let env = Env::new(&path, l0, 2).unwrap();let map1 = env.open_mmap(0, l0).unwrap();println!("{:?}", map1);let map2 = env.open_mmap(1, l0).unwrap();println!("{:?}", map2);map1.flush().unwrap();map2.flush().unwrap();}let len = std::fs::metadata(&path).unwrap().len();assert_eq!(len, (l0 << 2) - l0);}
/// A lock on a root page for this process only, because taking/// multiple locks on the same file from a single process isn't/// cross-platform (or even properly defined).////// Usage is as follows:////// - For read-only transactions, we first take a read lock on the `rw`/// field, and increment `n_txn`, locking the file if the former value/// is 0.////// - For read-write transactions, we first take a write lock on the/// `rw` field, and then take an exclusive lock on the file (this is/// valid since only one read-write transaction can be active in a/// process at the same time).///pub(crate) struct RootLock {/// It is undefined behavior to have a file mmapped for than once.#[cfg(feature = "mmap")]lock_file: Option<std::fs::File>,
#[cfg(feature = "crc32")]fn set_crc(ptr: *mut u8) {unsafe {let root_page = std::slice::from_raw_parts(ptr.add(8), PAGE_SIZE - 8);let mut h = HASHER.clone();h.update(root_page);let globptr = ptr as *mut GlobalHeader;(&mut *globptr).crc = h.finalize().to_le();}
/// Count of read-only transactions.n_txn: AtomicUsize,
/// This method is provided because waiting for a lock on the file/// system may block the whole process, whereas.
/// The database is very likely to get corrupted if an environment/// is opened from multiple processes, or more than once by the/// same process, if at least one of these instances can start a/// mutable transaction.
/// However, the database is very likely to get corrupted if open/// more than once at the same time, even within the same process.////// Therefore, do not use this method without another locking/// mechanism in place to avoid that situation.
/// The `n_roots` argument is ignored if the database already/// exists, and is used to initialise the first `n_roots` pages of/// the file else.
if initialise {for i in 0..n_roots {*(map.add(i * PAGE_SIZE) as *mut GlobalHeader) = (GlobalHeader {version: CURRENT_VERSION,root: 0,n_roots: n_roots as u8,crc: 0,length: (n_roots * PAGE_SIZE) as u64,free_db: 0,rc_db: 0,}).to_le();
let n_roots = if initialise {// Initialise the first `n_roots` pages at the start of// the file.init(map, n_roots);// Since the first `n_roots` pages are occupied by roots,// the first unused page is found at offset `n_roots *// PAGE_SIZE`.n_roots} else {// Read the root and number of roots from the first page's// header.GlobalHeader::from_le(&*(map as *const GlobalHeader)).n_roots as usize};
set_crc(map.add(i * PAGE_SIZE));}}let glob = GlobalHeader::from_le(&*(map as *const GlobalHeader));let mut roots = Vec::with_capacity(glob.n_roots as usize);for _ in 0..glob.n_roots {roots.push(RootLock {rw: <parking_lot::RawRwLock as parking_lot::lock_api::RawRwLock>::INIT,n_txn: AtomicUsize::new(0),lock_file: None,})}
// Finally, create the environment.
first_unused_page: Mutex::new(2),roots,root: Mutex::new(glob.root as usize),
mut_txn_lock: RawMutex::INIT,// Initialise a different `RootLock` for each root page.roots: (0..n_roots).map(|_| RootLock {rw: RawRwLock::INIT,n_txn: AtomicUsize::new(0),lock_file: None,}).collect(),
unsafe fn new_nolock_mmap(length: u64, initialise: bool) -> Result<Env, Error> {
unsafe fn new_nolock_(length: u64, initialise: bool, n_roots: usize) -> Result<Env, Error> {assert!(n_roots >= 1);assert!(n_roots <= ((length >> 12) as usize));assert!(n_roots < 256);
let n_roots = 2;for i in 0..n_roots {*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {version: CURRENT_VERSION,root: 0,n_roots: n_roots as u8,crc: 0,length: n_roots as u64 * PAGE_SIZE as u64,free_db: 0,rc_db: 0,}).to_le();}let glob = GlobalHeader::from_le(&*(map as *const GlobalHeader));let mut roots = Vec::with_capacity(glob.n_roots as usize);for _ in 0..glob.n_roots {roots.push(RootLock {rw: <parking_lot::RawRwLock as parking_lot::lock_api::RawRwLock>::INIT,n_txn: AtomicUsize::new(0),})}
init(map, n_roots);
first_unused_page: Mutex::new(2),roots,root: Mutex::new(glob.root as usize),
mut_txn_lock: RawMutex::INIT,// Initialise a different `RootLock` for each root page.roots: (0..n_roots).map(|_| RootLock {rw: RawRwLock::INIT,n_txn: AtomicUsize::new(0),}).collect(),
}}unsafe fn init(map: *mut u8, n_roots: usize) {for i in 0..n_roots {*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {version: CURRENT_VERSION,root: 0,n_roots: n_roots as u8,crc: 0,length: n_roots as u64 * PAGE_SIZE as u64,free_db: 0,rc_db: 0,}).to_le();set_crc(map.add(i * PAGE_SIZE));
}}/// Close this repository, releasing the locks. It is undefined/// behaviour to use the environment afterwards. This method can/// be used for instance to release the locks before allocating a/// new environment (note that `std::mem::replace` followed by/// `Drop::drop` of the previous value would not release the locks/// in the correct order).////// The safe alternative to this method is to use an `Option<Env>`/// instead of an `Env`.#[cfg(feature = "mmap")]pub unsafe fn close(&mut self) {for m in self.mmaps.lock().drain(..) {drop(m.mmap);
}}/// If the CRC feature is disabled, we're not checking CRCs.#[cfg(not(feature = "crc32"))]fn check_crc(&self, _root: usize) -> Result<(), crate::CRCError> {Ok(())}/// Else, we are checking CRCs, so return a CRC error if the check/// fails (the CRC is a 32 bit integer encoded little-endian at/// bytes [8,12[ of the root pages).#[cfg(feature = "crc32")]fn check_crc(&self, root: usize) -> Result<(), crate::CRCError> {unsafe {let maps = self.mmaps.lock();let page_ptr = maps[0].ptr.add(root * PAGE_SIZE);let crc = (&*(page_ptr as *const GlobalHeader)).crc;let root_page = std::slice::from_raw_parts(page_ptr.add(8), PAGE_SIZE - 8);let mut h = HASHER.clone();h.update(root_page);if h.finalize() != crc {return Err(crate::CRCError {});}
}#[cfg(feature = "mmap")]#[test]#[should_panic]fn nroots_test() {let path = tempfile::tempdir().unwrap();let path = path.path().join("db");let l0 = 1 << 15; // 8 pagesEnv::new(&path, l0, 19).unwrap();}#[cfg(feature = "mmap")]#[test]fn mmap_growth_test() {let path = tempfile::tempdir().unwrap();let path = path.path().join("db");let l0 = 1 << 15; // 8 pages{let env = Env::new(&path, l0, 2).unwrap();let map1 = env.open_mmap(0, l0).unwrap();println!("{:?}", map1);let map2 = env.open_mmap(1, l0).unwrap();println!("{:?}", map2);map1.flush().unwrap();map2.flush().unwrap();}let len = std::fs::metadata(&path).unwrap().len();assert_eq!(len, (l0 << 2) - l0);}#[cfg(not(feature = "crc32"))]fn set_crc(_ptr: *mut u8) {}#[cfg(feature = "crc32")]fn set_crc(ptr: *mut u8) {unsafe {let root_page = std::slice::from_raw_parts(ptr.add(8), PAGE_SIZE - 8);let mut h = HASHER.clone();h.update(root_page);let globptr = ptr as *mut GlobalHeader;(&mut *globptr).crc = h.finalize().to_le();}}/// An immutable transaction.pub struct Txn<E: Borrow<Env>> {env: E,pub(crate) root: usize,
debug!("root lock");let root = env_.root.lock();let root = (*root + env_.roots.len() - 1) % env_.roots.len();debug!("shared {:?}", root);
// Find the youngest committed version and lock it. Note// that there may be processes incrementing the version// number in parallel to this process. If that happens,// then since we take a shared file lock on a root page// (at the end of this function), the only thing that may// happen is that we don't open the very last version.let cur_mut_root =unsafe { (&*(env_.mmaps.lock()[0].ptr as *const GlobalHeader)).root as usize };// Last committed root page.let root = (cur_mut_root + env_.roots.len() - 1) % env_.roots.len();
#[cfg(feature = "crc32")]fn check_crc(&self, root: usize) -> Result<(), crate::CRCError> {unsafe {let maps = self.mmaps.lock();let page_ptr = maps[0].ptr.add(root * PAGE_SIZE);let crc = (&*(page_ptr as *const GlobalHeader)).crc;let root_page = std::slice::from_raw_parts(page_ptr.add(8), PAGE_SIZE - 8);let mut h = HASHER.clone();h.update(root_page);if h.finalize() != crc {return Err(crate::CRCError {});}
#[cfg(not(feature = "mmap"))]fn lock_shared(&self, _root: usize) -> Result<(), Error> {Ok(())}#[cfg(feature = "mmap")]fn lock_exclusive(&self, root: usize) -> Result<(), Error> {if let Some(ref f) = self.roots[root].lock_file {f.lock_exclusive()?;}Ok(())}#[cfg(not(feature = "mmap"))]fn lock_exclusive(&self, _root: usize) -> Result<(), Error> {Ok(())}#[cfg(feature = "mmap")]fn unlock(&self, root: usize) -> Result<(), Error> {if let Some(ref f) = self.roots[root].lock_file {f.unlock()?