Pre-tags cleanup + fast Sanakirja
[?]
Mar 7, 2021, 4:35 PM
MFTN7GBWZNQAFHKER57MLZAJGVEAHV2GYAQN2QTDHTPCEURDVIGQCDependencies
- [2]
ZBNKSYA6Fixing a bus error when starting a transaction on a full disk - [3]
ZHABNS3SCanonicalize all paths - [4]
WI5BS6BSNew published versions - [5]
CCLLB7OIUpgrading to Sanakirja 0.15 + version bump - [6]
ERV3644QAdding the block module - [7]
X243Z3Y5Recording only the required metadata (can even be changed later!) - [8]
A3DMBJJAUpgrading the `git` subcommand to the latest Sanakirja and Libpijul - [9]
JL4WKA5PImplement the Sanakirja concurrency model in a cross-process way - [10]
H23LO7U7a few more clippy lints addressed - [11]
VE2UWMW4Trying to fix channel touch - [12]
YN63NUZOSanakirja 1.0 - [13]
WZVCLZKYaddress clippy lints - [14]
B3QWIGDEFixing the Git features with the latest Pijul (+ conflicts in Cargo.toml) - [15]
Q7CAYX5NFixing Windows compilation - [16]
QQZNSB26Permission update (after #X243) - [17]
HDGRZISMVersion updates - [18]
RXNT67OTSanakirja version, and removing an unwrap - [19]
I52XSRUHMassive cleanup, and simplification - [20]
VO5OQW4WRemoving anyhow in libpijul - [21]
G65S7FAWVersion bump and cleanup - [22]
33SQMZYXNew versions of dependencies - [23]
SXEYMYF7Fixing the bad changes in history (unfortunately, by rebooting). - [24]
6DOXSHWGCleanup, and version bump - [25]
TPEH2XNB1.0.0-alpha.28, with Tokio 1.0 - [26]
PJ7T2VFLDo not hang on locked repositories - [27]
UFCZKKLXUpgrading to the latest Sanakirja/Rand - [28]
G6S6PWZEDo not touch the channel if this is a partial record - [29]
JRENVH5DReqwest 0.11 - [30]
HKEOO4QJVersion bump - [31]
G734WNM6flake.nix: use crate2nix - [32]
3AMEP2Y5More convenient interface for channels - [33]
IIV3EL2XCleanup, formatting, and fixing the Git feature - [34]
7ZFRYVVQCargo.nix and formatting - [35]
YTQS4ES3Fixing a parsing problem (related to permissions), and the associated permissions - [36]
DJYHARZ7Skipping old files when recording - [37]
VYHHOEYHVersions and formatting - [38]
GHO6DWPIRefactoring iterators - [39]
KUMJITTFVersion bump in the lockfiles
Change contents
- edit in pijul/src/commands/record.rs at line 154
let oldest = if let Ok(t) = oldest.duration_since(std::time::SystemTime::UNIX_EPOCH) {t.as_secs() as u64} else {0}; - edit in pijul/src/commands/record.rs at line 156
let mut oldest = oldest.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap().as_secs() as u64;if oldest == 0 {// If no diff was done at all, it means that no// existing file changed since last time (some// files may have been added, deleted or moved,// but `touch` isn't about those).oldest = std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap().as_secs() as u64;} - edit in pijul/Cargo.toml at line 37
"src/repository/unix_lock.rs","src/repository/basic_lock.rs", - replacement in pijul/Cargo.toml at line 68
sanakirja = { version = "1.1.6", features = [ "crc32" ] }sanakirja = { version = "1.1.7", features = [ "crc32" ] } - file deletion: channel_dump.rs channel_dump.rs[3.525229]→[3.1160:1177](∅→∅),[3.1177]→[3.650052:650052](∅→∅),[3.525229]→[3.650034:650051](∅→∅),[3.650051]→[3.650052:650052](∅→∅)
use super::*;use byteorder::ByteOrder;enum DumpChannelState<T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,RT: std::ops::Deref<Target = T>,> {Changes {log: crate::pristine::Cursor<T, RT, T::RevchangesetCursor, u64, (ChangeId, Merkle)>,current: ChangeId,deps: Option<crate::pristine::Cursor<T, RT, T::DepCursor, ChangeId, ChangeId>>,},Graph {cursor: T::GraphCursor,current: Vertex<ChangeId>,is_first: bool,},}pub struct DumpChannel<T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,RT: std::ops::Deref<Target = T>,C: std::ops::Deref<Target = T::Channel>,> {state: Option<DumpChannelState<T, RT>>,txn: RT,channel: C,}pub enum DumpChunk {Dep([u8; 9]),Hash([u8; 42]),Edge([u8; 50]),End([u8; 25]),}impl std::ops::Deref for DumpChunk {type Target = [u8];fn deref(&self) -> &Self::Target {match *self {DumpChunk::Dep(ref h) => h,DumpChunk::Hash(ref h) => h,DumpChunk::Edge(ref e) => e,DumpChunk::End(ref e) => e,}}}#[repr(u8)]enum Msg {Hash = 253,Dep = 254,Vertex = 255,}impl<T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,RT: std::ops::Deref<Target = T> + Clone,C: std::ops::Deref<Target = T::Channel>,> Iterator for DumpChannel<T, RT, C>{type Item = Result<DumpChunk, TxnErr<T::GraphError>>;fn next(&mut self) -> Option<Self::Item> {loop {match self.state.take() {Some(DumpChannelState::Changes {mut log,current,deps,}) => {if let Some(mut deps) = deps {while let Some(x) = deps.next() {let (h, dep) = match x {Ok(x) => x,Err(e) => return Some(Err(e)),};if h > current {break;} else if h < current {continue;}let mut msg = [Msg::Dep as u8; 9];byteorder::BigEndian::write_u64(&mut msg[1..], dep.0);self.state = Some(DumpChannelState::Changes {log,current,deps: Some(deps),});debug!("dep msg = {:?}", msg);return Some(Ok(DumpChunk::Dep(msg)));}}if let Some(x) = log.next() {let (_, (h, _)) = match x {Ok(e) => e,Err(e) => return Some(Err(e)),};let deps = match T::iter_dep_ref(self.txn.clone(), h) {Ok(e) => Some(e),Err(e) => return Some(Err(e)),};self.state = Some(DumpChannelState::Changes {log,current: h,deps,});let ext = match self.txn.get_external(h) {Ok(ext) => ext.unwrap(),Err(e) => return Some(Err(e)),};let mut msg = [Msg::Hash as u8; 1 + 33 + 8];(&mut msg[1..34]).clone_from_slice(&ext.to_bytes()[..]);byteorder::BigEndian::write_u64(&mut msg[34..], h.0);return Some(Ok(DumpChunk::Hash(msg)));} else {self.state = Some(DumpChannelState::Graph {cursor: match self.txn.iter_graph(self.txn.graph(&self.channel)) {Ok(c) => c,Err(e) => return Some(Err(e)),},current: Vertex::ROOT,is_first: true,})}}Some(DumpChannelState::Graph {mut cursor,mut current,is_first,}) => {if let Some(x) = self.txn.next_graph(self.txn.graph(&self.channel), &mut cursor){let (v, e) = match x {Err(e) => return Some(Err(e)),Ok(x) => x,};if !e.flag.contains(EdgeFlags::PARENT) {self.state = Some(DumpChannelState::Graph {cursor,current,is_first,});continue;}if v != current || is_first {let mut buf = [Msg::Vertex as u8; 50];byteorder::LittleEndian::write_u64(&mut buf[1..], v.change.0);byteorder::LittleEndian::write_u64(&mut buf[9..], v.start.0);byteorder::LittleEndian::write_u64(&mut buf[17..], v.end.0);current = v;buf[25] = e.flag.bits();byteorder::LittleEndian::write_u64(&mut buf[26..], e.dest.change.0);byteorder::LittleEndian::write_u64(&mut buf[34..], e.dest.pos.0);byteorder::LittleEndian::write_u64(&mut buf[42..], e.introduced_by.0);self.state = Some(DumpChannelState::Graph {cursor,current,is_first: false,});debug!("sending {:?}", &buf[..]);return Some(Ok(DumpChunk::Edge(buf)));} else {let mut buf = [0; 25];buf[0] = e.flag.bits();byteorder::LittleEndian::write_u64(&mut buf[1..], e.dest.change.0);byteorder::LittleEndian::write_u64(&mut buf[9..], e.dest.pos.0);byteorder::LittleEndian::write_u64(&mut buf[17..], e.introduced_by.0);self.state = Some(DumpChannelState::Graph {cursor,current,is_first: false,});debug!("sending {:?}", &buf[..]);return Some(Ok(DumpChunk::End(buf)));}} else {self.state = None;let mut buf = [Msg::Vertex as u8; 25];byteorder::LittleEndian::write_u64(&mut buf[1..], Vertex::BOTTOM.change.0);byteorder::LittleEndian::write_u64(&mut buf[9..], Vertex::BOTTOM.start.0);byteorder::LittleEndian::write_u64(&mut buf[17..], Vertex::BOTTOM.end.0);debug!("sending {:?}", buf);return Some(Ok(DumpChunk::End(buf)));}}None => return None,}}}}pub fn dump_channel<T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,RT: std::ops::Deref<Target = T> + Clone,C: std::ops::Deref<Target = T::Channel>,>(txn: RT,channel: C,) -> Result<DumpChannel<T, RT, C>, TxnErr<T::GraphError>> {Ok(DumpChannel {state: Some(DumpChannelState::Changes {log: changeid_log_ref(txn.clone(), &channel, 0)?,current: ChangeId::ROOT,deps: None,}),txn,channel,})}pub struct ChannelFromDump<'a, T: ChannelMutTxnT> {txn: &'a mut T,channel: ChannelRef<T>,buf: Buf,current: Vertex<ChangeId>,reverses: Vec<(Vertex<ChangeId>, Edge)>,starts: HashMap<Position<ChangeId>, ChangePosition>,current_changeid: Option<ChangeId>,local_changeid: HashMap<ChangeId, ChangeId>,pub alive: HashSet<ChangeId>,}/// The following type does zero-copy buffering: if there are enough/// bytes in the part we just read, we just return these bytes. Else,/// we copy the bytes in cache, and complete the cache once we have/// enough bytes.////// The size of the cache could be generic, but since edges and/// vertices take 25 bytes, hashes 33, and [u8; 33] doesn't implement/// AsMut<[u8]>, we just use a fixed-sized array of length 33.pub struct Buf {/// Internal cache.buf: [u8; 68],/// Length of the internal cache that is currently used.buf_len: usize,/// Position in the last buffer that was read. The `read` method/// must be called with the same buffer until that method returns/// `None`.pos: usize,}impl Buf {/// Create a new buffer.fn new() -> Self {Buf {buf: [0; 68],buf_len: 0,pos: 0,}}/// Read `wanted` number of bytes from `bytes` using the internal/// cache if needed. This method must be called with the same/// `bytes` buffer until it returns `None`.fn read<'a>(&'a mut self, bytes: &'a [u8], wanted: usize) -> Option<&'a [u8]> {trace!("bytes = {:?}, self.buf = {:?} {:?} {:?}",bytes,&self.buf[..],self.buf_len,self.pos);assert!(wanted < self.buf.len());if self.buf_len > 0 {let needs = wanted - self.buf_len;if self.pos + needs > bytes.len() {// Not enough bytes to complete the internal buffer,// we need to save these extra bytes.let len = self.buf_len + bytes.len();(&mut self.buf[self.buf_len..len]).clone_from_slice(&bytes[self.pos..]);self.buf_len = len;self.pos = 0;None} else {// There are enough bytes, output them.(&mut self.buf[self.buf_len..wanted]).clone_from_slice(&bytes[self.pos..self.pos + needs]);self.buf_len = 0;self.pos += needs;Some(&self.buf[..wanted])}} else if bytes.len() - self.pos >= wanted {// The internal buffer is empty, and `bytes` is long enough.let buf = &bytes[self.pos..self.pos + wanted];self.pos += wanted;Some(buf)} else {// The internal buffer is empty and `bytes` is too short,// save the extra bytes.self.buf_len = bytes.len() - self.pos;(&mut self.buf[..self.buf_len]).clone_from_slice(&bytes[self.pos..]);self.pos = 0;None}}fn current<'a>(&self, bytes: &'a [u8]) -> Option<&'a u8> {// debug!("self.pos = {:?}", self.pos);bytes.get(self.pos)}}impl<'a, T: ChannelMutTxnT + DepsMutTxnT + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>>ChannelFromDump<'a, T>{pub fn new(txn: &'a mut T, channel: ChannelRef<T>) -> Self {let mut starts = HashMap::with_capacity(4096);starts.insert(Position::ROOT, Position::ROOT.pos);ChannelFromDump {txn,channel,buf: Buf::new(),current: Vertex::ROOT,reverses: Vec::with_capacity(4096),starts,current_changeid: None,local_changeid: HashMap::new(),alive: HashSet::new(),}}pub fn read(&mut self, bytes: &[u8]) -> Result<bool, TxnErr<T::GraphError>> {let mut channel = self.channel.borrow_mut();while let Some(&cur) = self.buf.current(bytes) {debug!("cur = {:?}", cur);if cur == Msg::Hash as u8 {if let Some(buf) = self.buf.read(bytes, 42) {let hash = Hash::from_bytes(&buf[1..34]).unwrap();let mut p = ChangeId(byteorder::BigEndian::read_u64(&buf[34..]));// Test if `p` is already taken for another hash.if let Some(hh) = self.txn.get_external(p)? {if hh != hash {let pp = make_changeid(self.txn, &hash)?;self.local_changeid.insert(p, pp);p = pp}}let t = self.txn.apply_counter(&channel);debug!("hash = {:?} {:?}", hash, p);self.txn.put_external(p, hash)?;self.txn.put_internal(hash, p)?;self.txn.put_changes(&mut channel, p, t, &hash)?;self.current_changeid = Some(p);}} else if cur == Msg::Dep as u8 {debug!("dep");if let Some(buf) = self.buf.read(bytes, 9) {let mut a = ChangeId(byteorder::BigEndian::read_u64(&buf[1..9]));if let Some(aa) = self.local_changeid.get(&a) {a = *aa}if let Some(cur) = self.current_changeid {self.txn.put_dep(cur, a)?;self.txn.put_revdep(a, cur)?;}debug!("cur = {:?}, a = {:?}", self.current_changeid, a);}} else if cur == Msg::Vertex as u8 {// New vertexif let Some(buf) = self.buf.read(bytes, 25) {self.current = read_vertex(buf);debug!("new vertex {:?}", self.current);if self.current == Vertex::BOTTOM {finish_channel(self.txn, &mut channel, &self.reverses, &self.starts)?;return Ok(true);}if let Some(aa) = self.local_changeid.get(&self.current.change) {self.current.change = *aa}} else {break;}} else if let Some(buf) = self.buf.read(bytes, 25) {// Edgelet mut edge = read_edge(buf);if let Some(aa) = self.local_changeid.get(&edge.dest.change) {edge.dest.change = *aa}if let Some(aa) = self.local_changeid.get(&edge.introduced_by) {edge.introduced_by = *aa}if !edge.flag.contains(EdgeFlags::DELETED) {self.alive.insert(edge.dest.change);}debug!("put edge {:?} {:?}", self.current, edge);self.txn.put_graph(T::graph_mut(&mut channel), self.current, edge)?;self.reverses.push((self.current, edge));self.starts.insert(self.current.end_pos(), self.current.start);} else {break;}}self.buf.pos = 0;Ok(false)}pub fn edges(&self) -> &[(Vertex<ChangeId>, Edge)] {&self.reverses[..]}}fn finish_channel<T: super::ChannelMutTxnT>(txn: &mut T,channel: &mut T::Channel,reverses: &[(Vertex<ChangeId>, Edge)],ends: &HashMap<Position<ChangeId>, ChangePosition>,) -> Result<(), TxnErr<T::GraphError>> {debug!("ends: {:?}", ends);for &(v, e) in reverses {debug!("{:?}", e);let u = Vertex {change: e.dest.change,start: *ends.get(&e.dest).unwrap(),end: e.dest.pos,};let mut e = e;e.flag ^= EdgeFlags::PARENT;e.dest = v.start_pos();txn.put_graph(T::graph_mut(channel), u, e)?;}Ok(())}fn read_vertex(bytes: &[u8]) -> Vertex<ChangeId> {let change = byteorder::LittleEndian::read_u64(&bytes[1..]);let start = byteorder::LittleEndian::read_u64(&bytes[9..]);let end = byteorder::LittleEndian::read_u64(&bytes[17..]);Vertex {change: ChangeId(change),start: ChangePosition(start),end: ChangePosition(end),}}fn read_edge(bytes: &[u8]) -> Edge {let flag = EdgeFlags::from_bits(bytes[0]).unwrap();let change = byteorder::LittleEndian::read_u64(&bytes[1..]);let p = byteorder::LittleEndian::read_u64(&bytes[9..]);let introduced_by = byteorder::LittleEndian::read_u64(&bytes[17..]);Edge {flag,dest: Position {change: ChangeId(change),pos: ChangePosition(p),},introduced_by: ChangeId(introduced_by),}}#[derive(Debug, Error)]pub enum ChannelDumpError<T: std::error::Error + 'static> {#[error(transparent)]Txn(T),#[error("Channel name already exists: {0}")]ChannelNameExists(String),}impl<T: std::error::Error + 'static> std::convert::From<TxnErr<T>> for ChannelDumpError<T> {fn from(e: TxnErr<T>) -> Self {ChannelDumpError::Txn(e.0)}} - edit in libpijul/src/pristine/mod.rs at line 24
// #[cfg(feature = "dump")]// pub mod channel_dump; - edit in libpijul/Cargo.toml at line 48
"src/pristine/channel_dump.rs", - replacement in libpijul/Cargo.toml at line 97
sanakirja = { version = "1.1.6", features = [ "crc32" ] }sanakirja = { version = "1.1.7", features = [ "crc32" ] } - replacement in Cargo.nix at line 4561
version = "1.1.6";version = "1.1.7"; - replacement in Cargo.nix at line 4563
sha256 = "0vpannz78bnayw7874jdymsz3rz6l1liy6vxcaxdd7bykkhwh1s5";sha256 = "1v73b74y36jyqjab2qmycchbqkkg4flwybzwij2v7lazc77qfbgi"; - replacement in Cargo.lock at line 1587
version = "1.1.6"version = "1.1.7" - replacement in Cargo.lock at line 1589
checksum = "4507c8e19c7e9dd6ba627d1b1f69a0e6e7f175f54d92830ef7ca2e74beb5ea6e"checksum = "f12d87cf615fd1b3858cfc2fcfa9236f4ebc2063be62b194c45e9ae1c959e3ec"