Adding a single-threaded version of record, for working copies that are neither "Send" nor "Sync"

pmeunier
Oct 12, 2022, 11:41 AM
HEA2T44CJZIJCYCH3DYMVN74W2OWZYMJDJX655T6GCVCCSFGNNSQC

Dependencies

  • [2] ZDK3GNDB Tag transactions (including a massive refactoring of errors)
  • [3] AJEH3FSP Properly propagate errors in `libpijul::record::record` instead of unwrap()ing
  • [4] I24UEJQL Various post-fire fixes
  • [5] FXEDPLRI Resurrecting tests, and type cleanup (no need for Arc<RwLock<…>> anymore)
  • [6] RMDMAYRX Adding a root inode (aka supporting submodules)
  • [7] 2VXTRPO4 Custom diff separators
  • [*] SXEYMYF7 Fixing the bad changes in history (unfortunately, by rebooting).
  • [*] O4DNWMPD Cleaunp and proofreading of libpijul::record

Change contents

  • replacement in libpijul/src/record.rs at line 314
    [3.160][3.72:126](),[3.45425][3.72:126]()
    workers.push(std::thread::spawn(move || {
    [3.160]
    [3.126]
    let cl = move || {
  • replacement in libpijul/src/record.rs at line 344
    [3.1424][3.46611:46627](),[2.16526][3.46611:46627](),[3.46611][3.46611:46627]()
    }))
    [2.16526]
    [3.46627]
    };
    workers.push(std::thread::spawn(cl.clone()))
  • edit in libpijul/src/record.rs at line 545
    [10.1]
    [2.16624]
    pub fn record_single_thread<T, W: WorkingCopyRead + Clone, C: ChangeStore + Clone>(
    &mut self,
    txn: ArcTxn<T>,
    diff_algorithm: diff::Algorithm,
    stop_early: bool,
    diff_separator: &regex::bytes::Regex,
    channel: ChannelRef<T>,
    working_copy: &W,
    changes: &C,
    prefix: &str,
    ) -> Result<(), RecordError<C::Error, W::Error, T>>
    where
    T: ChannelMutTxnT + TreeTxnT,
    <W as WorkingCopyRead>::Error: 'static,
    {
    info!("Starting to record");
    let now = std::time::Instant::now();
    let mut stack = vec![(RecordItem::root(), components(prefix))];
    while let Some((mut item, mut components)) = stack.pop() {
    debug!("stack.pop() = Some({:?})", item);
    // Check for moves and file conflicts.
    let vertex: Option<Position<Option<ChangeId>>> =
    self.recorded_inodes.lock().get(&item.inode).cloned();
    let mut root_vertices = Vec::new();
    let vertex = if let Some(vertex) = vertex {
    vertex
    } else if item.inode == Inode::ROOT {
    debug!("TAKING LOCK {}", line!());
    let txn = txn.read();
    debug!("TAKEN");
    let channel = channel.r.read();
    // Test for a "root" vertex below the null one.
    let f0 = EdgeFlags::FOLDER | EdgeFlags::BLOCK;
    let f1 = f0 | EdgeFlags::PSEUDO;
    self.recorded_inodes
    .lock()
    .insert(Inode::ROOT, Position::ROOT.to_option());
    let mut has_nonempty_root = false;
    for e in iter_adjacent(&*txn, txn.graph(&*channel), Vertex::ROOT, f0, f1)? {
    let e = e?;
    let child = txn.find_block(txn.graph(&*channel), e.dest()).unwrap();
    if child.start == child.end {
    // This is the "new" format, with multiple
    // roots, and `grandchild` is one of the
    // roots.
    let grandchild =
    iter_adjacent(&*txn, txn.graph(&*channel), *child, f0, f1)?
    .next()
    .unwrap()?
    .dest();
    root_vertices.push(grandchild);
    self.delete_obsolete_children(
    &*txn,
    txn.graph(&channel),
    working_copy,
    changes,
    &item.full_path,
    grandchild,
    )?;
    } else {
    // Single-root repository, we need to follow
    // the root's children.
    has_nonempty_root = true
    }
    }
    debug!("has_nonempty_root: {:?}", has_nonempty_root);
    debug!("root_vertices: {:?}", root_vertices);
    if has_nonempty_root && !root_vertices.is_empty() {
    // This repository is mixed between "zero" roots,
    // and new-style-roots.
    root_vertices.push(Position::ROOT)
    }
    Position::ROOT.to_option()
    } else if let Some(vertex) = get_inodes_::<_, C, W>(&txn, &channel, &item.inode)? {
    {
    let txn = txn.read();
    let channel = channel.r.read();
    let graph = txn.graph(&*channel);
    self.delete_obsolete_children(
    &*txn,
    &graph,
    working_copy,
    changes,
    &item.full_path,
    vertex,
    )?;
    }
    let rec = self.recorded();
    let new_papa = {
    let mut recorded = self.recorded_inodes.lock();
    recorded.insert(item.inode, vertex.to_option());
    recorded.get(&item.papa).cloned()
    };
    rec.lock().record_existing_file(
    &txn,
    diff_algorithm,
    stop_early,
    &diff_separator,
    &channel,
    working_copy.clone(),
    changes,
    &item,
    new_papa,
    vertex,
    )?;
    vertex.to_option()
    } else {
    let rec = self.recorded();
    debug!("TAKING LOCK {}", line!());
    let mut rec = rec.lock();
    match rec.add_file(working_copy, item.clone()) {
    Ok(Some(vertex)) => {
    // Path addition (maybe just a single directory).
    self.recorded_inodes.lock().insert(item.inode, vertex);
    vertex
    }
    _ => continue,
    }
    };
    if root_vertices.is_empty() {
    // Move on to the next step.
    debug!("TAKING LOCK {}", line!());
    let txn = txn.read();
    let channel = channel.r.read();
    self.push_children::<_, _, C>(
    &*txn,
    &*channel,
    working_copy,
    &mut item,
    &mut components,
    vertex,
    &mut stack,
    prefix,
    changes,
    )?;
    } else {
    for vertex in root_vertices {
    let txn = txn.read();
    let channel = channel.r.read();
    if !vertex.change.is_root() {
    let mut r = self.new_root.lock();
    let age = txn
    .get_changeset(txn.changes(&*channel), &vertex.change)?
    .unwrap();
    if let Some((_, a)) = *r {
    if a < (*age).into() {
    *r = Some((vertex.to_option(), (*age).into()))
    }
    } else {
    *r = Some((vertex.to_option(), (*age).into()))
    }
    }
    item.v_papa = vertex.to_option();
    self.push_children::<_, _, C>(
    &*txn,
    &*channel,
    working_copy,
    &mut item,
    &mut components,
    vertex.to_option(),
    &mut stack,
    prefix,
    changes,
    )?;
    }
    }
    }
    crate::TIMERS.lock().unwrap().record += now.elapsed();
    info!("record done");
    Ok(())
    }