E4MD6T3LNOYWVFTFFWCUKRNS4M2XVSKRLDWPYHMZHGDNO2T5JREQC
TSMS6W4DOKQNUQ4PEMTLOIODR33VFPN6MMNS73ZPSU4BOQVRGPNAC
W2MIZD5BNL7A5HVFWTESF57QU7T6QMEF4RBSLFQXMEEU3XD2NU2QC
OTWDDJE7TTE73D6BGF4ZN6BH2NFUFLPME2VJ3CPALH463UGWLEIQC
6DCQHIFPEH4GZKSRRS32GMKDRPZH4MTCGOUEI7YEUVKWENBF3JWAC
UAQX27N4PI4LHEW6LSHJETIE5MV7JTEMPLTJFYUBMYVPC43H7VOAC
YWFYZNLZ5JHLIFVBRKZK4TSWVPROUPRG77ZB5M7UHT2OKPL4ZSRQC
X3QVVQIS7B7L3XYZAWL3OOBUXOJ6RMOKQ45YMLLGAHYPEEKZ45ZAC
MSRWB47YP6L5BVTS53QQPBOHY5SXTSTR5KD6IIF35UWCTEUOCQWQC
Q7DRIBBRE4MNG4NP3PVIXAJF5PQYLFWYIVK2O4VVLEO6XY3BOSFQC
QYDGYIZRNFRIQD7RUCY5YAN3F2THZA74E5UOHPIFWSULEJFAFVJQC
LSQ6V7M66TEGLJ7QBLRVDX4E7UKJTDQTEXZOS3KGPGFKVXNLPKBQC
OFINGD26ZWCRDVVDI2ZIBLMHXKEMJA6MRNLANJYUHQPIJLPA7J2AC
G4JEQLLX6Q7VVFVAEJZAVQXX33MQ36CSCYSMJ5NQM5VZ76DXKU6QC
H3FVSQIQGFCFKCPXVOSFHP4OSUOBBURJESCZNGQTNDAAD3WQSBEQC
AFKBHYVE25QMIU2WZATEGWG2EXSBQZ44DVDPACKNS3M4QGBF4ONAC
EYNN7RLSFVBWDLRTLNNFUAF46Q6OX3BR5SUEJIOOHBSNP7FVBXGAC
LROAI3NBBSCU4T2YA6EHJYKKKL75AU5A7C7WIRCGIQ56S6HPLRXQC
XEU2QVLCHPYOOD4TQIPEEVYOVSFMKFPLJYWEJYXYJAZ7S54KWDZAC
NXMFNPZ7VWJRLC3M5QJJVTICXCMGE24F3HVIZA7A7RLVMLQMLDVQC
OP6SVMOD2GTQ7VNJ4E5KYFG4MIYA7HBMXJTADALMZH4PY7OQRMZQC
SO25TWFLSRQIVTJTTSN77LO5FZQVQPIZTSBULH7MWBBDEWSK3OCAC
AOX2XQISHGWNNAFBYRN44Q6AWG7H5DPBK5YMFHK42HQNZ2TMHEJQC
S4V4QZ5CF5LUDYWNR2UMWH6CHJDJ5FPGAZCQYM5GY7FJMJV4NN4QC
YXKP4AIWDBIWBBUDWF66YIPG5ECMHNKEV3PX6KYXOVXY3EWG3WGQC
EAAYH6BQWDK52EC5RG3BEZQU3FJPN5RRRN4U5KDKDVPKXBVJMNDAC
KMT3MF5NLEQIPZLHCRYDGQ5EA46HJCG3C2ANEPMZGKGHDK77ADPAC
WS4ZQM4RMIHZ6XZKSDQJGHN5SSSWFL4H236USOPUA33S6RC53RFAC
UUUVNC4DWEEL7WV5IRPKPZ6HZMYCPA53XM7LJWICUD4E6GN37IRQC
ONES3V466GLO5CXKRF5ENK7VFOQPWM3YXLVRGWB56V5SH3W7XNBQC
6UVFCERMGSGNRWCVC3GWO5HWV6MSWE433DXBJVC7KRPP6LLJLCSQC
6DMPXOAT5GQ3BQQOMUZN2GMBQPRA4IB7CCPHTQTIFGO3KWWAKF3QC
T7QB6QEPWBXAU3RL7LE4GRDWWNQ65ZU2YNNTWBYLORJOABAQFEZQC
OHUZ73MKWD7SSB4DKKA532DEQKXQDS6PZ6HJ3EC2DLVJSLQH3NLAC
assert!(t.elapsed().unwrap() >= std::time::Duration::from_millis(900));
info!("started child mutable txn {:?}", txn.env.root.lock());
assert!(t.elapsed().unwrap() >= std::time::Duration::from_millis(90));
info!("started child mutable txn {:?}", txn.root);
unsafe { libc::wait(&mut child) };
std::thread::sleep(std::time::Duration::from_millis(100));
let txn = Env::txn_begin(&env).unwrap();
info!("started parent txn {:?}", txn.root);
// The parent committed, this is a new transaction.
assert_eq!(txn.root, 0);
let mut status = 1;
unsafe { libc::wait(&mut status) };
assert_eq!(status, 0);
}
#[test]
fn more_than_two_versions() {
env_logger::try_init().unwrap_or(());
let n = 5;
let env = Env::new_anon(40960, n).unwrap();
let mut txn = Env::mut_txn_begin(&env).unwrap();
// Allocate two pages.
for i in 0..n {
let page = txn.alloc_page().unwrap();
debug!("page = {:?}", page);
txn.set_root(i, page.0.offset);
}
txn.commit().unwrap();
for i in 0..n {
let mut txn = Env::mut_txn_begin(&env).unwrap();
// Free one of the pages.
debug!("root(0) = {:?}", txn.root(i));
txn.decr_rc(txn.root(i).unwrap()).unwrap();
txn.remove_root(i);
txn.commit().unwrap();
}
let mut txn = Env::mut_txn_begin(&env).unwrap();
unsafe {
let p = &*(env.mmaps.lock()[0].ptr.add(txn.root * PAGE_SIZE) as *const GlobalHeader);
debug!("free page: 0x{:x}", u64::from_le(p.free_db));
let db: Db<u64, ()> = Db {
db: u64::from_le(p.free_db),
k: std::marker::PhantomData,
v: std::marker::PhantomData,
p: std::marker::PhantomData,
};
for x in iter(&txn, &db, None).unwrap() {
debug!("0x{:x}", x.unwrap().0);
}
}
let page = txn.alloc_page().unwrap();
debug!("page = {:?}", page);
//!
//! The binary format of a Sanakirja database is the following:
//!
//! - There is a fixed number of "current versions", set at file
//! initialisation. If a file has n versions, then for all k between 0
//! and n-1 (included), the k^th page (i.e. the byte positions between
//! `k * 4096` and `(k+1) * 4096`, also written as `k << 12` and
//! `(k+1) << 12`) stores the data relative to that version, and is
//! called the "root page" of that version.
//!
//! This is a way to handle concurrent access: indeed, mutable
//! transactions do not exclude readers, but readers that started
//! before the commit of a mutable transaction will keep reading the
//! database as it was before the commit. However, this means that
//! older versions of the database have to be kept "alive", and the
//! "number of current versions" here is the limit on the number of
//! versions that can be kept "alive" at the same time.
//!
//! When a reader starts, it takes a shared file lock on the file
//! representing the youngest committed version. When a writer starts,
//! it takes an exclusive file lock on the file representing the
//! oldest committed version. This implies that if readers are still
//! reading that version, the writer will wait for the exclusive lock.
//!
//! After taking a lock, the writer (also called "mutable
//! transaction" or [`MutTxn`]) copies the entire root page of the
//! youngest committed version onto the root page of the oldest
//! committed version, hence erasing the root page of the oldest
//! version.
//!
//! - Root pages have the following format: a 32-bytes header
//! (described below), followed by 4064 bytes, usable in a more or
//! less free format. The current implementation defines two methods
//! on [`MutTxn`], [`MutTxn::set_root`] and [`MutTxn::remove_root`],
//! treating that space as an array of type `[u64; 510]`. A reasonable
//! use for these is to point to different datastructures allocated in
//! the file, such as the offsets in the file to the root pages of B
//! trees.
//!
//! Now, about the header, there's a version identifier on the first
//! 16 bytes, followed by two bytes: `root` is the version used by the
//! current mutable transaction (if there is current mutable
//! transaction), or by the next mutable transaction (else). The
//! `n_roots` field is the total number of versions.
//!
//! ```
//! #[repr(C)]
//! pub struct GlobalHeader {
//! /// Version of Sanakirja
//! pub version: u16,
//! /// Which page is currently the root page? (only valid for page 0).
//! pub root: u8,
//! /// Total number of versions (or "root pages")
//! pub n_roots: u8,
//! /// CRC of this page.
//! pub crc: u32,
//! /// First free page at the end of the file (only valid for page 0).
//! pub length: u64,
//! /// Offset of the free list.
//! pub free_db: u64,
//! /// Offset of the RC database.
//! pub rc_db: u64,
//! }
//! ```
pub use environment::{Env, MutTxn, Txn};
pub use sanakirja_core::{btree, direct_repr, CowPage, LoadPage, MutPage, Page, Storable};
pub use environment::{Commit, Env, MutTxn, RootDb, Txn};
pub use sanakirja_core::{btree, direct_repr, LoadPage, Storable, UnsizedStorable};
/// Reference counts use a strange encoding, meant to avoid code
/// bloat: indeed, the list of free pages uses `Db<u64, ()>`, so
/// we're just reusing the same code here, encoding the reference
/// counts in the 12 least significant bits of the keys, and the
/// actual pages in the 52 most significant bits.
///
/// Since we can't reuse them in the same transaction, another
/// option would be to put them directly into the table of free
/// pages. However, since calls to `put` may allocate and free
/// pages, this could recurse infinitely, which is why we store
/// them outside of the file.
let root = env.root.lock();
debug!("unlock exclusive {:?}", *root);
env.roots[*root].rw.unlock_exclusive();
if let Some(ref f) = env.roots[*root].lock_file {
f.unlock().unwrap_or(())
}
env.mut_txn_unlock().unwrap_or(());
env.roots[self.root].rw.unlock_exclusive();
env.unlock(self.root).unwrap_or(())
#[cfg(feature = "mmap")]
fn mut_txn_lock(&self) -> Result<(), Error> {
self.mut_txn_lock.lock();
if let Some(ref f) = self.file {
f.lock_exclusive()?;
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn mut_txn_lock(&self) -> Result<(), Error> {
self.mut_txn_lock.lock();
Ok(())
}
#[cfg(feature = "mmap")]
fn mut_txn_unlock(&self) -> Result<(), Error> {
unsafe {
self.mut_txn_lock.unlock();
}
if let Some(ref f) = self.file {
f.unlock()?
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn mut_txn_unlock(&self) -> Result<(), Error> {
unsafe {
self.mut_txn_lock.unlock();
}
Ok(())
}
let (header, free, rc) = {
let env_ = env.borrow();
let maps = env_.mmaps.lock()[0].ptr;
let v = env_.root.lock();
let n = env_.roots.len();
env_.roots[*v].rw.lock_exclusive();
if let Some(ref f) = env_.roots[*v].lock_file {
f.lock_exclusive()?
}
// Root of the last MutTxn.
let v0 = (*v + n - 1) % env_.roots.len();
debug!("v = {:?} v0 = {:?}", v, v0);
// Copy the roots of the last transaction onto this one.
let page_ptr = maps.offset((v0 * PAGE_SIZE) as isize);
let next_page_ptr = maps.offset((*v * PAGE_SIZE) as isize);
std::ptr::copy_nonoverlapping(page_ptr.add(8), next_page_ptr.add(8), PAGE_SIZE - 8);
let env_ = env.borrow();
let header = GlobalHeader::from_le(&*(page_ptr as *const GlobalHeader));
env_.check_crc(v0)?;
// First, take an exclusive file lock on the whole file to
// make sure that no other process is starting a mutable
// transaction at the same time. The worst that can happen
// here is if the other process commits while we're still
// waiting for a lock on the current page, because if that
// happens, this new transaction will erase the
// transaction in the other process.
env_.mut_txn_lock()?;
// Then, we can lock the root page of this transaction.
let maps = env_.mmaps.lock()[0].ptr;
let root = (&*(maps as *const GlobalHeader)).root as usize;
debug!("BEGIN_TXN root = {:?}", root);
env_.roots[root].rw.lock_exclusive();
env_.lock_exclusive(root)?;
// Root of the last MutTxn.
let v0 = (root + env_.roots.len() - 1) % env_.roots.len();
env_.check_crc(v0)?;
// Copy the root page of the last transaction onto this
// one.
let page_ptr = maps.offset((v0 * PAGE_SIZE) as isize);
let next_page_ptr = maps.offset((root * PAGE_SIZE) as isize);
std::ptr::copy_nonoverlapping(page_ptr.add(8), next_page_ptr.add(8), PAGE_SIZE - 8);
let free = header.free_db;
let rc = if header.rc_db == 0 {
None
} else {
Some(btree::Db {
db: header.rc_db,
k: std::marker::PhantomData,
v: std::marker::PhantomData,
p: std::marker::PhantomData,
})
};
(header, free, rc)
};
let length = if header.length == 0 {
(PAGE_SIZE as u64) * (header.n_roots as u64)
} else {
header.length
};
// Finally, read the header and start the transaction.
let header = GlobalHeader::from_le(&*(next_page_ptr as *const GlobalHeader));
debug!("n_roots = {:?}", header.n_roots);
rc,
length,
free,
rc: if header.rc_db == 0 {
None
} else {
Some(btree::Db::from_page(header.rc_db))
},
length: if header.length == 0 {
(PAGE_SIZE as u64) * (header.n_roots as u64)
} else {
header.length
},
free: header.free_db,
unsafe {
let mut free_db: btree::Db<u64, ()> = btree::Db::from_page(self.free);
while !self.free_owned_pages.is_empty() || !self.free_pages.is_empty() {
let mut free_owned_pages =
std::mem::replace(&mut self.free_owned_pages, Vec::new());
let mut free_pages = std::mem::replace(&mut self.free_pages, Vec::new());
for p in free_owned_pages.drain(..).chain(free_pages.drain(..)) {
btree::put(&mut self, &mut free_db, &p, &())?;
debug!("COMMIT");
// If there's no tree of free pages, and no pages to free,
// don't bother with free pages at all (don't even allocate a
// tree).
let free_db =
if self.free == 0 && self.free_owned_pages.is_empty() && self.free_pages.is_empty() {
None
} else {
// Else, allocate or load the tree of free pages.
let mut free_db: btree::Db<u64, ()> = if self.free == 0 {
btree::create_db(&mut self)?
} else {
btree::Db::from_page(self.free)
};
debug!("free_db = {:?}", free_db);
// Adding all the pages freed during the transaction to the
// tree of free pages. If this call to `btree::put` frees
// pages, add them again. This converges in at most log n
// iterations (where n is the total number of free pages).
// First, set the free table to 0 in this transaction, to
// avoid recursing in the calls to `put` below (indeed,
// the table of free pages is used when allocating new
// pages, which may happen in a call to `put`).
self.free = 0;
// Then, while there are pages to free, free them. Since
// the calls to `put` below might free pages (and add
// pages to these two vectors), the (outer) loop might run
// for more than one iteration.
while !self.free_pages.is_empty() || !self.free_owned_pages.is_empty() {
while let Some(p) = self.free_pages.pop() {
let p = p & !0xfff;
btree::put(&mut self, &mut free_db, &p.to_le(), &())?;
}
while let Some(p) = self.free_owned_pages.pop() {
let p = p & !0xfff;
btree::put(&mut self, &mut free_db, &p.to_le(), &())?;
}
}
for p in self.occupied_owned_pages.iter_mut() {
clear_dirty(p);
Some(free_db)
};
// Clear the dirty bit of all pages we've touched. If they've
// been freed and have already been flushed by the kernel, we
// don't want to resurrect them to the main memory, so we
// check that.
let mut occ = std::mem::replace(&mut self.occupied_owned_pages, Vec::new());
for p in occ.iter_mut() {
if let Some(ref free_db) = free_db {
if let Some((pp, ())) = btree::get(&self, free_db, &p.0.offset, None)? {
if *pp == p.0.offset {
continue;
}
}
let env = self.env.borrow();
let mut maps = env.mmaps.lock();
let mut root = env.root.lock();
let globptr = maps[0].ptr.add(*root * PAGE_SIZE) as *mut GlobalHeader;
(&mut *globptr).length = self.length.to_le();
(&mut *globptr).free_db = free_db.db.to_le();
self.free = free_db.db;
clear_dirty(p);
}
let root_dbs = std::slice::from_raw_parts_mut(
maps[0].ptr.add(*root * PAGE_SIZE + GLOBAL_HEADER_SIZE) as *mut u64,
let env = self.env.borrow();
let mut maps = env.mmaps.lock();
// Get this transaction's root page.
let globptr =
unsafe { &mut *(maps[0].ptr.add(self.root * PAGE_SIZE) as *mut GlobalHeader) };
// Set the length and free database.
globptr.length = self.length.to_le();
if let Some(free_db) = free_db {
debug!("COMMIT: free_db = 0x{:x}", free_db.db);
globptr.free_db = free_db.db.to_le();
}
// Set the "root databases" modified by this transaction.
let root_dbs = unsafe {
std::slice::from_raw_parts_mut(
maps[0].ptr.add(self.root * PAGE_SIZE + GLOBAL_HEADER_SIZE) as *mut u64,
);
for (&r, rr) in self.roots.iter().zip(root_dbs.iter_mut()) {
debug!("root_db: {:?}", rr as *mut u64);
debug!("committing root: {:?} {:?}", r, rr);
if r > 0 {
*rr = r
}
)
};
for (&r, rr) in self.roots.iter().zip(root_dbs.iter_mut()) {
debug!("root_db: {:?}", rr);
debug!("committing root: {:?} {:?}", r, rr);
if r > 0 {
*rr = r
// Moving the root page, both on page 0, and on the environment.
(&mut *(maps[0].ptr as *mut GlobalHeader)).root = *root as u8;
// Move the current global root page by one page on page 0.
unsafe {
(&mut *(maps[0].ptr as *mut GlobalHeader)).root =
(self.root as u8 + 1) % (env.roots.len() as u8);
}
for m in maps.iter_mut() {
m.flush()?
}
debug!("commit: unlock {:?}", *root);
env.roots[*root].rw.unlock_exclusive();
if let Some(ref f) = env.roots[*root].lock_file {
debug!("commit: unlock file");
f.unlock().unwrap_or(())
}
*env.first_unused_page.lock() = self.length;
*root = (*root + 1) % env.roots.len();
Ok(())
// Flush all the maps.
for m in maps.iter_mut() {
m.flush()?
// And finally, unlock the root page in the environment.
debug!("commit: unlock {:?}", self.root);
unsafe { env.roots[self.root].rw.unlock_exclusive() };
// Unlock the root page on the file lock (if relevant).
env.unlock(self.root)?;
// And unlock the global mutable transaction mutex.
env.mut_txn_unlock()?;
debug!("/COMMIT");
Ok(())
/// Setting the `num`th element of the initial page, treated as a
/// `[u64; 510]`, to `value`. This doesn't actually write anything
/// to that page, since that page is written during the commit.
///
/// In the current implementation, `value` is probably going to be
/// the offset in the file of the root page of a B tree.
let mut db: btree::Db<u64, ()> = btree::Db {
db: self.free,
k: std::marker::PhantomData,
v: std::marker::PhantomData,
p: std::marker::PhantomData,
};
let mut db: btree::Db<u64, ()> = btree::Db::from_page(self.free);
// Check whether this page is also free for the other
// versions.
fn free_for_all(&self, f: u64) -> Result<bool, Error> {
let env = self.env.borrow();
// We already know it's free for the youngest previous
// transaction and for the current one (because the tree of
// free pages was copied from there), so we only have
// `self.roots.len() - 2` root pages to check.
for i in 1..env.roots.len() - 1 {
let db: btree::Db<u64, ()> = unsafe {
let p = &*(env.mmaps.lock()[0]
.ptr
.add(((self.root + i) % env.roots.len()) * PAGE_SIZE)
as *const GlobalHeader);
if f >= u64::from_le(p.length) {
// Page `f` was allocated strictyl after
// transaction `i`.
continue;
}
if p.free_db == 0 {
// This version doesn't have any free page.
return Ok(false);
}
btree::Db::from_page(p.free_db)
};
if let Some((&f_, ())) = btree::get(self, &db, &f, None)? {
if f_ != f {
return Ok(false);
}
}
}
Ok(true)
}
fn decr_rc(&mut self, off: u64) -> Result<usize, Error> {
debug!("decr_rc {:?} {:?}", off, self.rc);
/// Increment the reference count for page `off`.
fn incr_rc(&mut self, off: u64) -> Result<usize, Error> {
btree::del(self, &mut rc_, &rc, None)?;
if rc & 0xfff > 2 {
btree::put(self, &mut rc_, &(rc - 1), &())?;
self.rc = Some(rc_);
} else {
// Implicit "1".
self.rc = Some(rc_)
}
return Ok((rc & 0xfff) as usize - 1);
} else {
self.rc = Some(rc_)
btree::del::del_at_cursor(self, &mut rc_, &mut curs)?;
debug!("decr_rc_owned {:?} {:?}", off, self.rc);
let rc = self.decr_rc_(off)?;
if rc == 0 {
self.free_owned_page(off);
}
Ok(rc)
}
}
impl<E: Borrow<Env>, A> MutTxn<E, A> {
/// Decrement the reference count of page `off`, freeing that page
/// if the RC reaches 0 after decrementing it.
fn decr_rc_(&mut self, off: u64) -> Result<usize, Error> {
debug!("decr_rc 0x{:x} {:?}", off, self.rc);
// If there's no RC table, free the page. Also, in order to
// avoid infinite recursion (since `del` and `put` below might
// free pages), we `take` the reference counter table.
}
fn incr_rc(&mut self, off: u64) -> Result<usize, Error> {
debug!("incr_rc {:?}", off);
if let Some(mut rc_) = self.rc.take() {
let mut curs = btree::cursor::Cursor::new(self, &rc_)?;
let rc = if let Some((rc, _)) = curs.set(self, &off, None)? {
if *rc & !0xfff == off {
*rc & 0xfff
} else {
1
}
} else {
1
};
if rc > 1 {
btree::del::del_at_cursor(self, &mut rc_, &mut curs)?;
}
assert!(rc + 1 <= 0xfff);
btree::put(self, &mut rc_, &(off | (rc + 1)), &())?;
self.rc = Some(rc_);
Ok(rc as usize + 1)
} else {
let mut rc = btree::create_db(self)?;
debug!("put in RC: {:?} {:?}", rc, off | 2);
btree::put(self, &mut rc, &(off | 2), &())?;
debug!("done");
self.rc = Some(rc);
Ok(2)
}
impl<E: Borrow<Env>, T> RootDb for MutTxn<E, T> {
fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(
&self,
n: usize,
) -> Option<sanakirja_core::btree::Db_<K, V, P>> {
impl<E: Borrow<Env>, T> MutTxn<E, T> {
/// Low-level method to get the root page number `n`, if that page
/// isn't a B tree (use the [`RootDb`] trait else).
pub fn root(&self, n: usize) -> Option<u64> {
}
}
}
impl<E: Borrow<Env>, T> RootDb for MutTxn<E, T> {
// Just call method `root` and convert the result to a `Db`.
fn root_db<K: Storable + ?Sized, V: Storable + ?Sized, P: crate::btree::BTreePage<K, V>>(
&self,
n: usize,
) -> Option<sanakirja_core::btree::Db_<K, V, P>> {
if let Some(db) = self.root(n) {
Some(sanakirja_core::btree::Db_::from_page(db))
} else {
None
/// Environment, required to start any transactions. Thread-safe, but
/// opening the same database several times in the same process is not
/// cross-platform.
/// An environment, which may be either a memory-mapped file, or
/// memory allocated with [`std::alloc`].
roots: Vec<RootLock>,
// Root number of the next MutTxn.
pub(crate) root: Mutex<usize>,
}
struct RootLock {
/// It is undefined behavior to have a file mmapped for than once.
#[cfg(feature = "mmap")]
lock_file: Option<std::fs::File>,
rw: parking_lot::RawRwLock,
n_txn: AtomicUsize,
pub(crate) roots: Vec<RootLock>,
#[cfg(feature = "mmap")]
impl Drop for Env {
fn drop(&mut self) {
for map in self.mmaps.lock().drain(..) {
drop(map.mmap);
}
}
}
/// An immutable transaction.
pub struct Txn<E: Borrow<Env>> {
env: E,
root: usize,
}
#[cfg(feature = "mmap")]
#[test]
#[should_panic]
fn nroots_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; // 8 pages
Env::new(&path, l0, 19).unwrap();
}
#[cfg(feature = "mmap")]
#[test]
fn mmap_growth_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; // 8 pages
{
let env = Env::new(&path, l0, 2).unwrap();
let map1 = env.open_mmap(0, l0).unwrap();
println!("{:?}", map1);
let map2 = env.open_mmap(1, l0).unwrap();
println!("{:?}", map2);
map1.flush().unwrap();
map2.flush().unwrap();
}
let len = std::fs::metadata(&path).unwrap().len();
assert_eq!(len, (l0 << 2) - l0);
}
/// A lock on a root page for this process only, because taking
/// multiple locks on the same file from a single process isn't
/// cross-platform (or even properly defined).
///
/// Usage is as follows:
///
/// - For read-only transactions, we first take a read lock on the `rw`
/// field, and increment `n_txn`, locking the file if the former value
/// is 0.
///
/// - For read-write transactions, we first take a write lock on the
/// `rw` field, and then take an exclusive lock on the file (this is
/// valid since only one read-write transaction can be active in a
/// process at the same time).
///
pub(crate) struct RootLock {
/// It is undefined behavior to have a file mmapped for than once.
#[cfg(feature = "mmap")]
lock_file: Option<std::fs::File>,
#[cfg(feature = "crc32")]
fn set_crc(ptr: *mut u8) {
unsafe {
let root_page = std::slice::from_raw_parts(ptr.add(8), PAGE_SIZE - 8);
let mut h = HASHER.clone();
h.update(root_page);
let globptr = ptr as *mut GlobalHeader;
(&mut *globptr).crc = h.finalize().to_le();
}
/// Count of read-only transactions.
n_txn: AtomicUsize,
/// This method is provided because waiting for a lock on the file
/// system may block the whole process, whereas.
/// The database is very likely to get corrupted if an environment
/// is opened from multiple processes, or more than once by the
/// same process, if at least one of these instances can start a
/// mutable transaction.
/// However, the database is very likely to get corrupted if open
/// more than once at the same time, even within the same process.
///
/// Therefore, do not use this method without another locking
/// mechanism in place to avoid that situation.
/// The `n_roots` argument is ignored if the database already
/// exists, and is used to initialise the first `n_roots` pages of
/// the file else.
if initialise {
for i in 0..n_roots {
*(map.add(i * PAGE_SIZE) as *mut GlobalHeader) = (GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots: n_roots as u8,
crc: 0,
length: (n_roots * PAGE_SIZE) as u64,
free_db: 0,
rc_db: 0,
})
.to_le();
let n_roots = if initialise {
// Initialise the first `n_roots` pages at the start of
// the file.
init(map, n_roots);
// Since the first `n_roots` pages are occupied by roots,
// the first unused page is found at offset `n_roots *
// PAGE_SIZE`.
n_roots
} else {
// Read the root and number of roots from the first page's
// header.
GlobalHeader::from_le(&*(map as *const GlobalHeader)).n_roots as usize
};
set_crc(map.add(i * PAGE_SIZE));
}
}
let glob = GlobalHeader::from_le(&*(map as *const GlobalHeader));
let mut roots = Vec::with_capacity(glob.n_roots as usize);
for _ in 0..glob.n_roots {
roots.push(RootLock {
rw: <parking_lot::RawRwLock as parking_lot::lock_api::RawRwLock>::INIT,
n_txn: AtomicUsize::new(0),
lock_file: None,
})
}
// Finally, create the environment.
first_unused_page: Mutex::new(2),
roots,
root: Mutex::new(glob.root as usize),
mut_txn_lock: RawMutex::INIT,
// Initialise a different `RootLock` for each root page.
roots: (0..n_roots)
.map(|_| RootLock {
rw: RawRwLock::INIT,
n_txn: AtomicUsize::new(0),
lock_file: None,
})
.collect(),
unsafe fn new_nolock_mmap(length: u64, initialise: bool) -> Result<Env, Error> {
unsafe fn new_nolock_(length: u64, initialise: bool, n_roots: usize) -> Result<Env, Error> {
assert!(n_roots >= 1);
assert!(n_roots <= ((length >> 12) as usize));
assert!(n_roots < 256);
let n_roots = 2;
for i in 0..n_roots {
*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots: n_roots as u8,
crc: 0,
length: n_roots as u64 * PAGE_SIZE as u64,
free_db: 0,
rc_db: 0,
})
.to_le();
}
let glob = GlobalHeader::from_le(&*(map as *const GlobalHeader));
let mut roots = Vec::with_capacity(glob.n_roots as usize);
for _ in 0..glob.n_roots {
roots.push(RootLock {
rw: <parking_lot::RawRwLock as parking_lot::lock_api::RawRwLock>::INIT,
n_txn: AtomicUsize::new(0),
})
}
init(map, n_roots);
first_unused_page: Mutex::new(2),
roots,
root: Mutex::new(glob.root as usize),
mut_txn_lock: RawMutex::INIT,
// Initialise a different `RootLock` for each root page.
roots: (0..n_roots)
.map(|_| RootLock {
rw: RawRwLock::INIT,
n_txn: AtomicUsize::new(0),
})
.collect(),
}
}
unsafe fn init(map: *mut u8, n_roots: usize) {
for i in 0..n_roots {
*(map.offset((i * PAGE_SIZE) as isize) as *mut GlobalHeader) = (GlobalHeader {
version: CURRENT_VERSION,
root: 0,
n_roots: n_roots as u8,
crc: 0,
length: n_roots as u64 * PAGE_SIZE as u64,
free_db: 0,
rc_db: 0,
})
.to_le();
set_crc(map.add(i * PAGE_SIZE));
}
}
/// Close this repository, releasing the locks. It is undefined
/// behaviour to use the environment afterwards. This method can
/// be used for instance to release the locks before allocating a
/// new environment (note that `std::mem::replace` followed by
/// `Drop::drop` of the previous value would not release the locks
/// in the correct order).
///
/// The safe alternative to this method is to use an `Option<Env>`
/// instead of an `Env`.
#[cfg(feature = "mmap")]
pub unsafe fn close(&mut self) {
for m in self.mmaps.lock().drain(..) {
drop(m.mmap);
}
}
/// If the CRC feature is disabled, we're not checking CRCs.
#[cfg(not(feature = "crc32"))]
fn check_crc(&self, _root: usize) -> Result<(), crate::CRCError> {
Ok(())
}
/// Else, we are checking CRCs, so return a CRC error if the check
/// fails (the CRC is a 32 bit integer encoded little-endian at
/// bytes [8,12[ of the root pages).
#[cfg(feature = "crc32")]
fn check_crc(&self, root: usize) -> Result<(), crate::CRCError> {
unsafe {
let maps = self.mmaps.lock();
let page_ptr = maps[0].ptr.add(root * PAGE_SIZE);
let crc = (&*(page_ptr as *const GlobalHeader)).crc;
let root_page = std::slice::from_raw_parts(page_ptr.add(8), PAGE_SIZE - 8);
let mut h = HASHER.clone();
h.update(root_page);
if h.finalize() != crc {
return Err(crate::CRCError {});
}
}
#[cfg(feature = "mmap")]
#[test]
#[should_panic]
fn nroots_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; // 8 pages
Env::new(&path, l0, 19).unwrap();
}
#[cfg(feature = "mmap")]
#[test]
fn mmap_growth_test() {
let path = tempfile::tempdir().unwrap();
let path = path.path().join("db");
let l0 = 1 << 15; // 8 pages
{
let env = Env::new(&path, l0, 2).unwrap();
let map1 = env.open_mmap(0, l0).unwrap();
println!("{:?}", map1);
let map2 = env.open_mmap(1, l0).unwrap();
println!("{:?}", map2);
map1.flush().unwrap();
map2.flush().unwrap();
}
let len = std::fs::metadata(&path).unwrap().len();
assert_eq!(len, (l0 << 2) - l0);
}
#[cfg(not(feature = "crc32"))]
fn set_crc(_ptr: *mut u8) {}
#[cfg(feature = "crc32")]
fn set_crc(ptr: *mut u8) {
unsafe {
let root_page = std::slice::from_raw_parts(ptr.add(8), PAGE_SIZE - 8);
let mut h = HASHER.clone();
h.update(root_page);
let globptr = ptr as *mut GlobalHeader;
(&mut *globptr).crc = h.finalize().to_le();
}
}
/// An immutable transaction.
pub struct Txn<E: Borrow<Env>> {
env: E,
pub(crate) root: usize,
debug!("root lock");
let root = env_.root.lock();
let root = (*root + env_.roots.len() - 1) % env_.roots.len();
debug!("shared {:?}", root);
// Find the youngest committed version and lock it. Note
// that there may be processes incrementing the version
// number in parallel to this process. If that happens,
// then since we take a shared file lock on a root page
// (at the end of this function), the only thing that may
// happen is that we don't open the very last version.
let cur_mut_root =
unsafe { (&*(env_.mmaps.lock()[0].ptr as *const GlobalHeader)).root as usize };
// Last committed root page.
let root = (cur_mut_root + env_.roots.len() - 1) % env_.roots.len();
#[cfg(feature = "crc32")]
fn check_crc(&self, root: usize) -> Result<(), crate::CRCError> {
unsafe {
let maps = self.mmaps.lock();
let page_ptr = maps[0].ptr.add(root * PAGE_SIZE);
let crc = (&*(page_ptr as *const GlobalHeader)).crc;
let root_page = std::slice::from_raw_parts(page_ptr.add(8), PAGE_SIZE - 8);
let mut h = HASHER.clone();
h.update(root_page);
if h.finalize() != crc {
return Err(crate::CRCError {});
}
#[cfg(not(feature = "mmap"))]
fn lock_shared(&self, _root: usize) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mmap")]
fn lock_exclusive(&self, root: usize) -> Result<(), Error> {
if let Some(ref f) = self.roots[root].lock_file {
f.lock_exclusive()?;
}
Ok(())
}
#[cfg(not(feature = "mmap"))]
fn lock_exclusive(&self, _root: usize) -> Result<(), Error> {
Ok(())
}
#[cfg(feature = "mmap")]
fn unlock(&self, root: usize) -> Result<(), Error> {
if let Some(ref f) = self.roots[root].lock_file {
f.unlock()?