use crate::pristine::sanakirja::{Channel, MutTxn, SanakirjaError, P, UP};
use crate::pristine::*;
use crate::HashSet;
use crate::TxnT;
use log::*;
use serde_derive::*;
use std::io::Read;
use std::path::Path;
use std::sync::{Arc, RwLock};
#[derive(Debug, Serialize, Deserialize, Default)]
struct FileHeader {
version: u64,
header: u64,
channel: u64,
unhashed: u64,
total: u64,
offsets: DbOffsets,
state: Merkle,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct DbOffsets {
internal: u64,
external: u64,
graph: u64,
changes: u64,
revchanges: u64,
states: u64,
tags: u64,
apply_counter: u64,
size: u64,
}
pub struct OpenTagFile {
header: FileHeader,
file: std::fs::File,
}
#[derive(Debug, Error)]
pub enum TagError {
#[error("Version mismatch")]
VersionMismatch,
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Bincode(#[from] bincode::Error),
#[error(transparent)]
Zstd(#[from] zstd_seekable::Error),
#[error(transparent)]
Txn(SanakirjaError),
#[error("Synchronisation error")]
Sync,
}
impl From<TxnErr<SanakirjaError>> for TagError {
fn from(e: TxnErr<SanakirjaError>) -> Self {
TagError::Txn(e.0)
}
}
impl OpenTagFile {
pub fn open<P: AsRef<Path>>(p: P) -> Result<Self, TagError> {
let mut file = std::fs::File::open(p)?;
let mut off = [0u8; std::mem::size_of::<FileHeader>() as usize];
file.read_exact(&mut off)?;
let header = bincode::deserialize(&off)?;
Ok(OpenTagFile { header, file })
}
}
pub const VERSION: u64 = 5;
const BLOCK_SIZE: usize = 4096;
pub fn restore_channel(
mut tag: OpenTagFile,
txn: &mut MutTxn<()>,
name: &str,
) -> Result<ChannelRef<MutTxn<()>>, TagError> {
use std::io::{Seek, SeekFrom};
tag.file.seek(SeekFrom::Start(tag.header.channel))?;
let mut comp = vec![0; (tag.header.unhashed - tag.header.channel) as usize];
debug!("tag header {:?}", tag.header);
tag.file.read_exact(&mut comp)?;
debug!("{:?} {:?}", &comp[..20], comp.len());
debug!("{:?}", &comp[comp.len() - 20..]);
let mut buf = vec![0; tag.header.offsets.size as usize];
zstd_seekable::Seekable::init_buf(&comp)?
.decompress(&mut buf, 0)
.unwrap();
let filetxn = Txn::from_slice(&mut buf);
let external: ::sanakirja::btree::Db_<ChangeId, SerializedHash, UP<ChangeId, SerializedHash>> =
::sanakirja::btree::Db_::from_page(tag.header.offsets.external);
debug!("restoring graph");
let graph = restore(
&filetxn,
txn,
tag.header.offsets.graph,
|file_txn, txn, k: &Vertex<ChangeId>, v: &SerializedEdge| {
let k = if k.change.is_root() {
*k
} else {
debug!("btree get: {:?}", k.change);
let (kc, h) =
::sanakirja::btree::get(file_txn, &external, &k.change, None)?.unwrap();
assert_eq!(k.change, *kc);
Vertex {
change: crate::pristine::make_changeid(txn, &h.into())?,
..*k
}
};
let dest = v.dest();
let dest = {
if dest.change.is_root() {
dest
} else {
let (vd, change) =
::sanakirja::btree::get(file_txn, &external, &dest.change, None)?.unwrap();
assert_eq!(v.dest().change, *vd);
Position {
change: crate::pristine::make_changeid(txn, &change.into())?,
..v.dest()
}
}
};
let introduced_by = v.introduced_by();
let introduced_by = if introduced_by.is_root() {
introduced_by
} else {
let (vi, change) =
::sanakirja::btree::get(file_txn, &external, &introduced_by, None)?.unwrap();
assert_eq!(introduced_by, *vi);
crate::pristine::make_changeid(txn, &change.into())?
};
let v = Edge {
dest,
introduced_by,
..v.into()
};
Ok((k, v.into()))
},
)?;
debug!("restoring changes");
let changes = restore(
&filetxn,
txn,
tag.header.offsets.changes,
|file_txn, txn, k: &ChangeId, v: &L64| {
let (k_, h) = ::sanakirja::btree::get(file_txn, &external, k, None)?.unwrap();
assert_eq!(k, k_);
let k = crate::pristine::make_changeid(txn, &h.into())?;
Ok((k, *v))
},
)?;
debug!("restoring revchanges");
let revchanges = restore(
&filetxn,
txn,
tag.header.offsets.revchanges,
|file_txn, txn, k: &L64, v: &Pair<ChangeId, SerializedMerkle>| {
let (v0, h) = ::sanakirja::btree::get(file_txn, &external, &v.a, None)?.unwrap();
assert_eq!(v.a, *v0);
let v_ = crate::pristine::make_changeid(txn, &h.into())?;
Ok((
*k,
Pair {
a: v_,
b: v.b.clone(),
},
))
},
)?;
debug!("restoring states");
let states = restore(
&filetxn,
txn,
tag.header.offsets.states,
|_, _, k: &SerializedMerkle, v: &L64| Ok((k.clone(), *v)),
)?;
debug!("restoring states");
let tags = restore(
&filetxn,
txn,
tag.header.offsets.tags,
|_, _, k: &L64, v: &SerializedHash| Ok((*k, *v)),
)?;
let name = crate::small_string::SmallString::from_str(name);
let br = ChannelRef {
r: Arc::new(RwLock::new(Channel {
graph,
changes,
revchanges,
states,
tags,
apply_counter: tag.header.offsets.apply_counter,
name: name.clone(),
last_modified: 0,
})),
};
txn.open_channels.lock().unwrap().insert(name, br.clone());
Ok(br)
}
struct Txn<'a> {
data: *mut u8,
marker: std::marker::PhantomData<&'a ()>,
}
impl<'a> Txn<'a> {
fn from_slice(s: &'a mut [u8]) -> Self {
Txn {
data: s.as_mut_ptr(),
marker: std::marker::PhantomData,
}
}
}
impl<'a> ::sanakirja::LoadPage for Txn<'a> {
type Error = ::sanakirja::CRCError;
fn load_page(&self, off: u64) -> Result<::sanakirja::CowPage, ::sanakirja::CRCError> {
Ok(::sanakirja::CowPage {
data: unsafe { self.data.add(off as usize) },
offset: off,
})
}
}
fn restore<
K: ::sanakirja::UnsizedStorable,
V: ::sanakirja::UnsizedStorable,
P: ::sanakirja::btree::BTreeMutPage<K, V>,
F,
>(
file_txn: &Txn,
txn: &mut crate::pristine::sanakirja::MutTxn<()>,
pending: u64,
f: F,
) -> Result<::sanakirja::btree::Db_<K, V, P>, TxnErr<SanakirjaError>>
where
F: Fn(
&Txn,
&mut crate::pristine::sanakirja::MutTxn<()>,
&K,
&V,
) -> Result<(K, V), TxnErr<SanakirjaError>>,
{
use ::sanakirja::AllocPage;
let mut dict = HashSet::default();
let page = txn.txn.alloc_page()?;
let result = page.0.offset;
let mut pending = vec![(pending, page)];
while let Some((offset, mut new_page_)) = pending.pop() {
debug!("{:?}", offset);
let page = ::sanakirja::CowPage {
data: unsafe { file_txn.data.offset(offset as isize) },
offset,
};
let mut curs = P::cursor_first(&page);
let mut new_curs = P::cursor_first(&new_page_.0);
P::init(&mut new_page_);
unsafe {
P::set_left_child(
&mut new_page_,
&new_curs,
P::left_child(page.as_page(), &curs),
);
}
while let Some((k, v, r)) = P::next(&txn.txn, page.as_page(), &mut curs) {
let (k, v) = f(file_txn, txn, k, v)?;
let r = if r > 0 {
assert!(dict.insert(r));
let new_page = txn.txn.alloc_page()?;
let off = new_page.0.offset;
pending.push((r, new_page));
off
} else {
0
};
unsafe { P::put_mut(&mut new_page_, &new_curs, &k, &v, r) }
P::move_next(&mut new_curs);
}
}
Ok(::sanakirja::btree::Db_::from_page(result))
}
pub fn from_channel<W: std::io::Write, T: ::sanakirja::LoadPage<Error = ::sanakirja::Error>>(
txn: &crate::pristine::sanakirja::GenericTxn<T>,
channel: &str,
header: &crate::change::ChangeHeader,
mut w: W,
) -> Result<Hash, TagError> {
let out = Vec::with_capacity(1 << 16);
let (out, offsets, state) = compress_channel(txn, channel, out)?;
debug!("{:?} {:?}", &out[..20], out.len());
debug!("{:?}", &out[out.len() - 20..]);
let mut header_buf = Vec::with_capacity(1 << 10);
bincode::serialize_into(&mut header_buf, header).unwrap();
let mut off = FileHeader {
version: VERSION,
header: 0,
channel: 0,
unhashed: 0,
total: 0,
offsets,
state,
};
off.header = bincode::serialized_size(&off)?;
off.channel = off.header + header_buf.len() as u64;
off.unhashed = off.channel + out.len() as u64;
off.total = off.unhashed;
let mut hasher = Hasher::default();
let mut off_buf = Vec::with_capacity(off.header as usize);
bincode::serialize_into(&mut off_buf, &off)?;
debug!("off_buf = {:?}", off_buf.len());
w.write_all(&off_buf)?;
hasher.update(&off_buf);
debug!("header_buf = {:?}", header_buf.len());
w.write_all(&header_buf)?;
hasher.update(&header_buf);
debug!("out = {:?}", out.len());
w.write_all(&out)?;
hasher.update(&out);
Ok(hasher.finish())
}
const LEVEL: usize = 10;
const PIPE_LEN: usize = 10;
fn compress_channel<
W: std::io::Write + Send + 'static,
T: ::sanakirja::LoadPage<Error = ::sanakirja::Error>,
>(
txn: &crate::pristine::sanakirja::GenericTxn<T>,
channel: &str,
mut to: W,
) -> Result<(W, DbOffsets, Merkle), TagError> {
debug!("int = {:?}", txn.internal.db);
let (sender, receiver) = std::sync::mpsc::sync_channel::<Vec<u8>>(PIPE_LEN);
let (bsender, breceiver) = std::sync::mpsc::sync_channel::<Vec<u8>>(PIPE_LEN);
for _ in 0..PIPE_LEN {
bsender.send(vec![0; 4096]).map_err(|_| TagError::Sync)?;
}
let t = std::thread::spawn(move || -> Result<(W, usize), TagError> {
let mut comp = zstd_seekable::SeekableCStream::new(LEVEL, BLOCK_SIZE).unwrap();
let mut out = [0; BLOCK_SIZE];
let mut n = 0;
while let Ok(input) = receiver.recv() {
n += BLOCK_SIZE;
let mut input_off = 0;
let mut output_off = 0;
while input_off < BLOCK_SIZE as usize {
let (a, b) = comp
.compress(&mut out[output_off..], &input[input_off..])
.unwrap();
output_off += a;
input_off += b;
}
to.write_all(&out[..output_off]).unwrap();
bsender.send(input).map_err(|_| TagError::Sync)?;
}
while let Ok(n) = comp.end_stream(&mut out) {
if n == 0 {
break;
}
to.write_all(&out[..n])?;
}
Ok((to, n))
});
let channel = txn.load_channel(channel)?.unwrap();
let channel = channel.read().unwrap();
let mut new = 0;
debug!("copying internal");
let internal = copy::<SerializedHash, ChangeId, UP<SerializedHash, ChangeId>, _>(
txn,
txn.internal.db,
&mut new,
&sender,
&breceiver,
)?;
debug!("copying external");
let external = copy::<ChangeId, SerializedHash, UP<ChangeId, SerializedHash>, _>(
txn,
txn.external.db,
&mut new,
&sender,
&breceiver,
)?;
debug!("copying graph");
let graph = copy::<Vertex<ChangeId>, SerializedEdge, P<Vertex<ChangeId>, SerializedEdge>, _>(
txn,
channel.graph.db,
&mut new,
&sender,
&breceiver,
)?;
debug!("copying changes");
let changes = copy::<ChangeId, L64, P<ChangeId, L64>, _>(
txn,
channel.changes.db,
&mut new,
&sender,
&breceiver,
)?;
debug!("copying revchanges");
let revchanges = copy::<
L64,
Pair<ChangeId, SerializedMerkle>,
UP<L64, Pair<ChangeId, SerializedMerkle>>,
_,
>(&txn, channel.revchanges.db, &mut new, &sender, &breceiver)?;
debug!("copying states");
let states = copy::<SerializedMerkle, L64, UP<SerializedMerkle, L64>, _>(
txn,
channel.states.db,
&mut new,
&sender,
&breceiver,
)?;
let tags = copy::<L64, SerializedHash, UP<L64, SerializedHash>, _>(
&txn,
channel.states.db,
&mut new,
&sender,
&breceiver,
)?;
std::mem::drop(sender);
let (w, n) = t.join().unwrap()?;
let state = crate::pristine::current_state(txn, &channel)?;
Ok((
w,
DbOffsets {
internal,
external,
graph,
changes,
revchanges,
states,
tags: tags,
apply_counter: channel.apply_counter,
size: n as u64,
},
state,
))
}
fn copy<
K: ::sanakirja::UnsizedStorable,
V: ::sanakirja::UnsizedStorable,
P: ::sanakirja::btree::BTreeMutPage<K, V>,
T: ::sanakirja::LoadPage<Error = ::sanakirja::Error>,
>(
txn: &crate::pristine::sanakirja::GenericTxn<T>,
pending: u64,
new_page: &mut u64,
sender: &std::sync::mpsc::SyncSender<Vec<u8>>,
buffers: &std::sync::mpsc::Receiver<Vec<u8>>,
) -> Result<u64, TagError> {
let mut dict = HashSet::default();
let result = *new_page;
let mut pending = vec![(pending, *new_page)];
*new_page += BLOCK_SIZE as u64;
while let Some((old_page_off, new_page_)) = pending.pop() {
let page = txn.txn.load_page(old_page_off).unwrap();
let mut memory = buffers.recv().map_err(|_| TagError::Sync)?;
let mut new_page_ = ::sanakirja::MutPage(::sanakirja::CowPage {
data: memory.as_mut_ptr(),
offset: new_page_,
});
P::init(&mut new_page_);
let mut curs = P::cursor_first(&page);
let mut new_curs = P::cursor_first(&new_page_.0);
unsafe {
P::set_left_child(
&mut new_page_,
&new_curs,
P::left_child(page.as_page(), &curs),
);
}
while let Some((k, v, r)) = P::next(&txn.txn, page.as_page(), &mut curs) {
let r = if r > 0 {
assert!(dict.insert(r));
let new = *new_page;
*new_page += BLOCK_SIZE as u64;
pending.push((r, new));
unsafe {
P::set_left_child(&mut new_page_, &curs, new);
}
new
} else {
0
};
debug!("put {:?} {:?}", k, v);
unsafe { P::put_mut(&mut new_page_, &new_curs, k, v, r) }
P::move_next(&mut new_curs);
}
sender.send(memory).unwrap();
}
Ok(result)
}