Pre-tags cleanup + fast Sanakirja

[?]
Mar 7, 2021, 4:35 PM
MFTN7GBWZNQAFHKER57MLZAJGVEAHV2GYAQN2QTDHTPCEURDVIGQC

Dependencies

  • [2] ZBNKSYA6 Fixing a bus error when starting a transaction on a full disk
  • [3] ZHABNS3S Canonicalize all paths
  • [4] WI5BS6BS New published versions
  • [5] CCLLB7OI Upgrading to Sanakirja 0.15 + version bump
  • [6] ERV3644Q Adding the block module
  • [7] X243Z3Y5 Recording only the required metadata (can even be changed later!)
  • [8] A3DMBJJA Upgrading the `git` subcommand to the latest Sanakirja and Libpijul
  • [9] JL4WKA5P Implement the Sanakirja concurrency model in a cross-process way
  • [10] H23LO7U7 a few more clippy lints addressed
  • [11] VE2UWMW4 Trying to fix channel touch
  • [12] YN63NUZO Sanakirja 1.0
  • [13] WZVCLZKY address clippy lints
  • [14] B3QWIGDE Fixing the Git features with the latest Pijul (+ conflicts in Cargo.toml)
  • [15] Q7CAYX5N Fixing Windows compilation
  • [16] QQZNSB26 Permission update (after #X243)
  • [17] HDGRZISM Version updates
  • [18] RXNT67OT Sanakirja version, and removing an unwrap
  • [19] I52XSRUH Massive cleanup, and simplification
  • [20] VO5OQW4W Removing anyhow in libpijul
  • [21] G65S7FAW Version bump and cleanup
  • [22] 33SQMZYX New versions of dependencies
  • [23] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [24] 6DOXSHWG Cleanup, and version bump
  • [25] TPEH2XNB 1.0.0-alpha.28, with Tokio 1.0
  • [26] PJ7T2VFL Do not hang on locked repositories
  • [27] UFCZKKLX Upgrading to the latest Sanakirja/Rand
  • [28] G6S6PWZE Do not touch the channel if this is a partial record
  • [29] JRENVH5D Reqwest 0.11
  • [30] HKEOO4QJ Version bump
  • [31] G734WNM6 flake.nix: use crate2nix
  • [32] 3AMEP2Y5 More convenient interface for channels
  • [33] IIV3EL2X Cleanup, formatting, and fixing the Git feature
  • [34] 7ZFRYVVQ Cargo.nix and formatting
  • [35] YTQS4ES3 Fixing a parsing problem (related to permissions), and the associated permissions
  • [36] DJYHARZ7 Skipping old files when recording
  • [37] VYHHOEYH Versions and formatting
  • [38] GHO6DWPI Refactoring iterators
  • [39] KUMJITTF Version bump in the lockfiles

Change contents

  • edit in pijul/src/commands/record.rs at line 154
    [3.105252][3.110:298]()
    let oldest = if let Ok(t) = oldest.duration_since(std::time::SystemTime::UNIX_EPOCH) {
    t.as_secs() as u64
    } else {
    0
    };
  • edit in pijul/src/commands/record.rs at line 156
    [3.82]
    [3.82]
    let mut oldest = oldest
    .duration_since(std::time::SystemTime::UNIX_EPOCH)
    .unwrap()
    .as_secs() as u64;
    if oldest == 0 {
    // If no diff was done at all, it means that no
    // existing file changed since last time (some
    // files may have been added, deleted or moved,
    // but `touch` isn't about those).
    oldest = std::time::SystemTime::now()
    .duration_since(std::time::SystemTime::UNIX_EPOCH)
    .unwrap()
    .as_secs() as u64;
    }
  • edit in pijul/Cargo.toml at line 37
    [3.197209][3.1996:2059]()
    "src/repository/unix_lock.rs",
    "src/repository/basic_lock.rs",
  • replacement in pijul/Cargo.toml at line 68
    [3.197874][2.1141:1199]()
    sanakirja = { version = "1.1.6", features = [ "crc32" ] }
    [3.197874]
    [3.197895]
    sanakirja = { version = "1.1.7", features = [ "crc32" ] }
  • file deletion: channel_dump.rs (----------)channel_dump.rs (-xw-x--x--)
    [3.525229][3.1160:1177](),[3.1177][3.650052:650052](),[3.525229][3.650034:650051](),[3.650051][3.650052:650052]()
    use super::*;
    use byteorder::ByteOrder;
    enum DumpChannelState<
    T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,
    RT: std::ops::Deref<Target = T>,
    > {
    Changes {
    log: crate::pristine::Cursor<T, RT, T::RevchangesetCursor, u64, (ChangeId, Merkle)>,
    current: ChangeId,
    deps: Option<crate::pristine::Cursor<T, RT, T::DepCursor, ChangeId, ChangeId>>,
    },
    Graph {
    cursor: T::GraphCursor,
    current: Vertex<ChangeId>,
    is_first: bool,
    },
    }
    pub struct DumpChannel<
    T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,
    RT: std::ops::Deref<Target = T>,
    C: std::ops::Deref<Target = T::Channel>,
    > {
    state: Option<DumpChannelState<T, RT>>,
    txn: RT,
    channel: C,
    }
    pub enum DumpChunk {
    Dep([u8; 9]),
    Hash([u8; 42]),
    Edge([u8; 50]),
    End([u8; 25]),
    }
    impl std::ops::Deref for DumpChunk {
    type Target = [u8];
    fn deref(&self) -> &Self::Target {
    match *self {
    DumpChunk::Dep(ref h) => h,
    DumpChunk::Hash(ref h) => h,
    DumpChunk::Edge(ref e) => e,
    DumpChunk::End(ref e) => e,
    }
    }
    }
    #[repr(u8)]
    enum Msg {
    Hash = 253,
    Dep = 254,
    Vertex = 255,
    }
    impl<
    T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,
    RT: std::ops::Deref<Target = T> + Clone,
    C: std::ops::Deref<Target = T::Channel>,
    > Iterator for DumpChannel<T, RT, C>
    {
    type Item = Result<DumpChunk, TxnErr<T::GraphError>>;
    fn next(&mut self) -> Option<Self::Item> {
    loop {
    match self.state.take() {
    Some(DumpChannelState::Changes {
    mut log,
    current,
    deps,
    }) => {
    if let Some(mut deps) = deps {
    while let Some(x) = deps.next() {
    let (h, dep) = match x {
    Ok(x) => x,
    Err(e) => return Some(Err(e)),
    };
    if h > current {
    break;
    } else if h < current {
    continue;
    }
    let mut msg = [Msg::Dep as u8; 9];
    byteorder::BigEndian::write_u64(&mut msg[1..], dep.0);
    self.state = Some(DumpChannelState::Changes {
    log,
    current,
    deps: Some(deps),
    });
    debug!("dep msg = {:?}", msg);
    return Some(Ok(DumpChunk::Dep(msg)));
    }
    }
    if let Some(x) = log.next() {
    let (_, (h, _)) = match x {
    Ok(e) => e,
    Err(e) => return Some(Err(e)),
    };
    let deps = match T::iter_dep_ref(self.txn.clone(), h) {
    Ok(e) => Some(e),
    Err(e) => return Some(Err(e)),
    };
    self.state = Some(DumpChannelState::Changes {
    log,
    current: h,
    deps,
    });
    let ext = match self.txn.get_external(h) {
    Ok(ext) => ext.unwrap(),
    Err(e) => return Some(Err(e)),
    };
    let mut msg = [Msg::Hash as u8; 1 + 33 + 8];
    (&mut msg[1..34]).clone_from_slice(&ext.to_bytes()[..]);
    byteorder::BigEndian::write_u64(&mut msg[34..], h.0);
    return Some(Ok(DumpChunk::Hash(msg)));
    } else {
    self.state = Some(DumpChannelState::Graph {
    cursor: match self.txn.iter_graph(self.txn.graph(&self.channel)) {
    Ok(c) => c,
    Err(e) => return Some(Err(e)),
    },
    current: Vertex::ROOT,
    is_first: true,
    })
    }
    }
    Some(DumpChannelState::Graph {
    mut cursor,
    mut current,
    is_first,
    }) => {
    if let Some(x) = self
    .txn
    .next_graph(self.txn.graph(&self.channel), &mut cursor)
    {
    let (v, e) = match x {
    Err(e) => return Some(Err(e)),
    Ok(x) => x,
    };
    if !e.flag.contains(EdgeFlags::PARENT) {
    self.state = Some(DumpChannelState::Graph {
    cursor,
    current,
    is_first,
    });
    continue;
    }
    if v != current || is_first {
    let mut buf = [Msg::Vertex as u8; 50];
    byteorder::LittleEndian::write_u64(&mut buf[1..], v.change.0);
    byteorder::LittleEndian::write_u64(&mut buf[9..], v.start.0);
    byteorder::LittleEndian::write_u64(&mut buf[17..], v.end.0);
    current = v;
    buf[25] = e.flag.bits();
    byteorder::LittleEndian::write_u64(&mut buf[26..], e.dest.change.0);
    byteorder::LittleEndian::write_u64(&mut buf[34..], e.dest.pos.0);
    byteorder::LittleEndian::write_u64(&mut buf[42..], e.introduced_by.0);
    self.state = Some(DumpChannelState::Graph {
    cursor,
    current,
    is_first: false,
    });
    debug!("sending {:?}", &buf[..]);
    return Some(Ok(DumpChunk::Edge(buf)));
    } else {
    let mut buf = [0; 25];
    buf[0] = e.flag.bits();
    byteorder::LittleEndian::write_u64(&mut buf[1..], e.dest.change.0);
    byteorder::LittleEndian::write_u64(&mut buf[9..], e.dest.pos.0);
    byteorder::LittleEndian::write_u64(&mut buf[17..], e.introduced_by.0);
    self.state = Some(DumpChannelState::Graph {
    cursor,
    current,
    is_first: false,
    });
    debug!("sending {:?}", &buf[..]);
    return Some(Ok(DumpChunk::End(buf)));
    }
    } else {
    self.state = None;
    let mut buf = [Msg::Vertex as u8; 25];
    byteorder::LittleEndian::write_u64(&mut buf[1..], Vertex::BOTTOM.change.0);
    byteorder::LittleEndian::write_u64(&mut buf[9..], Vertex::BOTTOM.start.0);
    byteorder::LittleEndian::write_u64(&mut buf[17..], Vertex::BOTTOM.end.0);
    debug!("sending {:?}", buf);
    return Some(Ok(DumpChunk::End(buf)));
    }
    }
    None => return None,
    }
    }
    }
    }
    pub fn dump_channel<
    T: ChannelTxnT + GraphIter + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>,
    RT: std::ops::Deref<Target = T> + Clone,
    C: std::ops::Deref<Target = T::Channel>,
    >(
    txn: RT,
    channel: C,
    ) -> Result<DumpChannel<T, RT, C>, TxnErr<T::GraphError>> {
    Ok(DumpChannel {
    state: Some(DumpChannelState::Changes {
    log: changeid_log_ref(txn.clone(), &channel, 0)?,
    current: ChangeId::ROOT,
    deps: None,
    }),
    txn,
    channel,
    })
    }
    pub struct ChannelFromDump<'a, T: ChannelMutTxnT> {
    txn: &'a mut T,
    channel: ChannelRef<T>,
    buf: Buf,
    current: Vertex<ChangeId>,
    reverses: Vec<(Vertex<ChangeId>, Edge)>,
    starts: HashMap<Position<ChangeId>, ChangePosition>,
    current_changeid: Option<ChangeId>,
    local_changeid: HashMap<ChangeId, ChangeId>,
    pub alive: HashSet<ChangeId>,
    }
    /// The following type does zero-copy buffering: if there are enough
    /// bytes in the part we just read, we just return these bytes. Else,
    /// we copy the bytes in cache, and complete the cache once we have
    /// enough bytes.
    ///
    /// The size of the cache could be generic, but since edges and
    /// vertices take 25 bytes, hashes 33, and [u8; 33] doesn't implement
    /// AsMut<[u8]>, we just use a fixed-sized array of length 33.
    pub struct Buf {
    /// Internal cache.
    buf: [u8; 68],
    /// Length of the internal cache that is currently used.
    buf_len: usize,
    /// Position in the last buffer that was read. The `read` method
    /// must be called with the same buffer until that method returns
    /// `None`.
    pos: usize,
    }
    impl Buf {
    /// Create a new buffer.
    fn new() -> Self {
    Buf {
    buf: [0; 68],
    buf_len: 0,
    pos: 0,
    }
    }
    /// Read `wanted` number of bytes from `bytes` using the internal
    /// cache if needed. This method must be called with the same
    /// `bytes` buffer until it returns `None`.
    fn read<'a>(&'a mut self, bytes: &'a [u8], wanted: usize) -> Option<&'a [u8]> {
    trace!(
    "bytes = {:?}, self.buf = {:?} {:?} {:?}",
    bytes,
    &self.buf[..],
    self.buf_len,
    self.pos
    );
    assert!(wanted < self.buf.len());
    if self.buf_len > 0 {
    let needs = wanted - self.buf_len;
    if self.pos + needs > bytes.len() {
    // Not enough bytes to complete the internal buffer,
    // we need to save these extra bytes.
    let len = self.buf_len + bytes.len();
    (&mut self.buf[self.buf_len..len]).clone_from_slice(&bytes[self.pos..]);
    self.buf_len = len;
    self.pos = 0;
    None
    } else {
    // There are enough bytes, output them.
    (&mut self.buf[self.buf_len..wanted])
    .clone_from_slice(&bytes[self.pos..self.pos + needs]);
    self.buf_len = 0;
    self.pos += needs;
    Some(&self.buf[..wanted])
    }
    } else if bytes.len() - self.pos >= wanted {
    // The internal buffer is empty, and `bytes` is long enough.
    let buf = &bytes[self.pos..self.pos + wanted];
    self.pos += wanted;
    Some(buf)
    } else {
    // The internal buffer is empty and `bytes` is too short,
    // save the extra bytes.
    self.buf_len = bytes.len() - self.pos;
    (&mut self.buf[..self.buf_len]).clone_from_slice(&bytes[self.pos..]);
    self.pos = 0;
    None
    }
    }
    fn current<'a>(&self, bytes: &'a [u8]) -> Option<&'a u8> {
    // debug!("self.pos = {:?}", self.pos);
    bytes.get(self.pos)
    }
    }
    impl<'a, T: ChannelMutTxnT + DepsMutTxnT + DepsTxnT<DepsError = <T as GraphTxnT>::GraphError>>
    ChannelFromDump<'a, T>
    {
    pub fn new(txn: &'a mut T, channel: ChannelRef<T>) -> Self {
    let mut starts = HashMap::with_capacity(4096);
    starts.insert(Position::ROOT, Position::ROOT.pos);
    ChannelFromDump {
    txn,
    channel,
    buf: Buf::new(),
    current: Vertex::ROOT,
    reverses: Vec::with_capacity(4096),
    starts,
    current_changeid: None,
    local_changeid: HashMap::new(),
    alive: HashSet::new(),
    }
    }
    pub fn read(&mut self, bytes: &[u8]) -> Result<bool, TxnErr<T::GraphError>> {
    let mut channel = self.channel.borrow_mut();
    while let Some(&cur) = self.buf.current(bytes) {
    debug!("cur = {:?}", cur);
    if cur == Msg::Hash as u8 {
    if let Some(buf) = self.buf.read(bytes, 42) {
    let hash = Hash::from_bytes(&buf[1..34]).unwrap();
    let mut p = ChangeId(byteorder::BigEndian::read_u64(&buf[34..]));
    // Test if `p` is already taken for another hash.
    if let Some(hh) = self.txn.get_external(p)? {
    if hh != hash {
    let pp = make_changeid(self.txn, &hash)?;
    self.local_changeid.insert(p, pp);
    p = pp
    }
    }
    let t = self.txn.apply_counter(&channel);
    debug!("hash = {:?} {:?}", hash, p);
    self.txn.put_external(p, hash)?;
    self.txn.put_internal(hash, p)?;
    self.txn.put_changes(&mut channel, p, t, &hash)?;
    self.current_changeid = Some(p);
    }
    } else if cur == Msg::Dep as u8 {
    debug!("dep");
    if let Some(buf) = self.buf.read(bytes, 9) {
    let mut a = ChangeId(byteorder::BigEndian::read_u64(&buf[1..9]));
    if let Some(aa) = self.local_changeid.get(&a) {
    a = *aa
    }
    if let Some(cur) = self.current_changeid {
    self.txn.put_dep(cur, a)?;
    self.txn.put_revdep(a, cur)?;
    }
    debug!("cur = {:?}, a = {:?}", self.current_changeid, a);
    }
    } else if cur == Msg::Vertex as u8 {
    // New vertex
    if let Some(buf) = self.buf.read(bytes, 25) {
    self.current = read_vertex(buf);
    debug!("new vertex {:?}", self.current);
    if self.current == Vertex::BOTTOM {
    finish_channel(self.txn, &mut channel, &self.reverses, &self.starts)?;
    return Ok(true);
    }
    if let Some(aa) = self.local_changeid.get(&self.current.change) {
    self.current.change = *aa
    }
    } else {
    break;
    }
    } else if let Some(buf) = self.buf.read(bytes, 25) {
    // Edge
    let mut edge = read_edge(buf);
    if let Some(aa) = self.local_changeid.get(&edge.dest.change) {
    edge.dest.change = *aa
    }
    if let Some(aa) = self.local_changeid.get(&edge.introduced_by) {
    edge.introduced_by = *aa
    }
    if !edge.flag.contains(EdgeFlags::DELETED) {
    self.alive.insert(edge.dest.change);
    }
    debug!("put edge {:?} {:?}", self.current, edge);
    self.txn
    .put_graph(T::graph_mut(&mut channel), self.current, edge)?;
    self.reverses.push((self.current, edge));
    self.starts
    .insert(self.current.end_pos(), self.current.start);
    } else {
    break;
    }
    }
    self.buf.pos = 0;
    Ok(false)
    }
    pub fn edges(&self) -> &[(Vertex<ChangeId>, Edge)] {
    &self.reverses[..]
    }
    }
    fn finish_channel<T: super::ChannelMutTxnT>(
    txn: &mut T,
    channel: &mut T::Channel,
    reverses: &[(Vertex<ChangeId>, Edge)],
    ends: &HashMap<Position<ChangeId>, ChangePosition>,
    ) -> Result<(), TxnErr<T::GraphError>> {
    debug!("ends: {:?}", ends);
    for &(v, e) in reverses {
    debug!("{:?}", e);
    let u = Vertex {
    change: e.dest.change,
    start: *ends.get(&e.dest).unwrap(),
    end: e.dest.pos,
    };
    let mut e = e;
    e.flag ^= EdgeFlags::PARENT;
    e.dest = v.start_pos();
    txn.put_graph(T::graph_mut(channel), u, e)?;
    }
    Ok(())
    }
    fn read_vertex(bytes: &[u8]) -> Vertex<ChangeId> {
    let change = byteorder::LittleEndian::read_u64(&bytes[1..]);
    let start = byteorder::LittleEndian::read_u64(&bytes[9..]);
    let end = byteorder::LittleEndian::read_u64(&bytes[17..]);
    Vertex {
    change: ChangeId(change),
    start: ChangePosition(start),
    end: ChangePosition(end),
    }
    }
    fn read_edge(bytes: &[u8]) -> Edge {
    let flag = EdgeFlags::from_bits(bytes[0]).unwrap();
    let change = byteorder::LittleEndian::read_u64(&bytes[1..]);
    let p = byteorder::LittleEndian::read_u64(&bytes[9..]);
    let introduced_by = byteorder::LittleEndian::read_u64(&bytes[17..]);
    Edge {
    flag,
    dest: Position {
    change: ChangeId(change),
    pos: ChangePosition(p),
    },
    introduced_by: ChangeId(introduced_by),
    }
    }
    #[derive(Debug, Error)]
    pub enum ChannelDumpError<T: std::error::Error + 'static> {
    #[error(transparent)]
    Txn(T),
    #[error("Channel name already exists: {0}")]
    ChannelNameExists(String),
    }
    impl<T: std::error::Error + 'static> std::convert::From<TxnErr<T>> for ChannelDumpError<T> {
    fn from(e: TxnErr<T>) -> Self {
    ChannelDumpError::Txn(e.0)
    }
    }
  • edit in libpijul/src/pristine/mod.rs at line 24
    [3.587220][3.98621:98674]()
    // #[cfg(feature = "dump")]
    // pub mod channel_dump;
  • edit in libpijul/Cargo.toml at line 48
    [3.1021836][3.1021836:1021868]()
    "src/pristine/channel_dump.rs",
  • replacement in libpijul/Cargo.toml at line 97
    [3.1022980][2.3621:3679]()
    sanakirja = { version = "1.1.6", features = [ "crc32" ] }
    [3.1022980]
    [3.1023001]
    sanakirja = { version = "1.1.7", features = [ "crc32" ] }
  • replacement in Cargo.nix at line 4561
    [3.144463][2.3680:3707]()
    version = "1.1.6";
    [3.144463]
    [3.144491]
    version = "1.1.7";
  • replacement in Cargo.nix at line 4563
    [3.144517][2.3708:3781]()
    sha256 = "0vpannz78bnayw7874jdymsz3rz6l1liy6vxcaxdd7bykkhwh1s5";
    [3.144517]
    [3.144590]
    sha256 = "1v73b74y36jyqjab2qmycchbqkkg4flwybzwij2v7lazc77qfbgi";
  • replacement in Cargo.lock at line 1587
    [3.1072132][2.3782:3800]()
    version = "1.1.6"
    [3.1072132]
    [3.1072151]
    version = "1.1.7"
  • replacement in Cargo.lock at line 1589
    [3.1072216][2.3801:3879]()
    checksum = "4507c8e19c7e9dd6ba627d1b1f69a0e6e7f175f54d92830ef7ca2e74beb5ea6e"
    [3.1072216]
    [3.1072294]
    checksum = "f12d87cf615fd1b3858cfc2fcfa9236f4ebc2063be62b194c45e9ae1c959e3ec"