pub mod git;
#[cfg(test)]
mod test;
#[doc(inline)]
pub use pijul::Hash as ChangeHash;
use crate::encoding::Encoding;
use crate::identity::SKey;
use crate::prelude::*;
use crate::{diff, encoding, file, to_record, PijulConfig};
use libpijul::pristine::sanakirja;
use libpijul::ChannelRef;
use pijul::change::{
self, get_change_contents, Author, BaseHunk, ChangeHeader, Hunk, Local,
LocalChange,
};
use pijul::changestore::{ChangeStore, FileMetadata};
use pijul::{
working_copy, ChannelMutTxnT, ChannelTxnT, HashSet, MutTxnT, MutTxnTExt,
TxnT, TxnTExt,
};
use pijul_remote::{PushDelta, RemoteDelta, RemoteRepo};
use tokio::task;
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{cmp, fs};
use canonical_path::CanonicalPathBuf;
use derivative::Derivative;
use itertools::Itertools;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
pub const IGNORE_FILE: &str = ".ignore";
pub const ROOT_FILE: &str = ".";
pub const PIJUL_DIR: &str = ".pijul";
pub const GIT_DIR: &str = ".git";
pub const MAX_LEN_BASE64_DISPLAY: usize = 4096;
pub const STATUS_LOG_LIMIT: usize = 10;
#[derive(Clone, Debug)]
pub struct State {
pub dir_name: String,
pub channel: String,
pub other_channels: BTreeSet<String>,
pub untracked_files: BTreeSet<file::Path>,
pub changed_files: ChangedFiles,
pub short_log: Log,
pub remotes: Remotes,
}
#[derive(Clone, Debug, Default)]
pub struct Remotes {
pub default: Option<String>,
pub other: Vec<String>,
}
#[derive(Clone, Debug, Default)]
pub struct RecordDichotomy {
pub local_records: Log,
pub remote_records: Log,
pub remote_unrecords: Log,
}
impl RecordDichotomy {
pub fn is_empty(&self) -> bool {
let Self {
local_records,
remote_records,
remote_unrecords,
} = self;
remote_records.is_empty()
&& remote_unrecords.is_empty()
&& local_records.is_empty()
}
pub fn len(&self) -> usize {
let Self {
local_records,
remote_records,
remote_unrecords,
} = self;
remote_records.len() + remote_unrecords.len() + local_records.len()
}
pub fn get(&self, ix: usize) -> Option<&LogEntry> {
let Self {
local_records,
remote_records,
remote_unrecords,
} = self;
if ix < local_records.len() {
return local_records.get(ix);
}
let ix = ix - local_records.len();
if ix < remote_records.len() {
return remote_records.get(ix);
}
let ix = ix - remote_records.len();
remote_unrecords.get(ix)
}
}
#[derive(Clone, Derivative, strum::Display)]
#[derivative(Debug)]
pub enum MsgIn {
RefreshChangedAndUntrackedFiles,
AddUntrackedFile {
path: String,
recursive: bool,
},
RmAddedFile {
path: String,
},
Record {
msg: String,
desc: Option<String>,
#[derivative(Debug = "ignore")]
sk: Arc<SKey>,
id_pk: String,
to_record: to_record::State,
},
GetChangeDiffs {
hash: ChangeHash,
},
SwitchToChannel(String),
ForkChannel(String),
LoadEntireLog,
LoadOtherChannelLog(String),
Push {
remote: String,
channel: String,
},
Pull {
remote: String,
channel: String,
},
ResetFile {
path: String,
},
CompareRemote {
remote: String,
remote_channel: String,
},
}
#[derive(Debug, strum::Display)]
pub enum MsgOut {
Init(State),
InitFailed {
err: String,
switch_to_project_picker: bool,
},
RepoTaskExited,
Refreshed {
state: anyhow::Result<State>,
invalidate_logs: bool,
},
AddedUntrackedFile {
result: anyhow::Result<()>,
path: String,
},
RmedAddedFile {
result: anyhow::Result<()>,
path: String,
},
GotChangeDiffs {
hash: ChangeHash,
diffs: anyhow::Result<ChangedFiles>,
},
LoadedEntireLog(anyhow::Result<Log>),
LoadedOtherChannelLog {
channel: String,
log: anyhow::Result<Log>,
},
Pushed {
result: Result<(), PushError>,
remote: String,
channel: String,
},
Pulled {
result: Result<(), PullError>,
remote: String,
channel: String,
},
ComparedRemote {
result: anyhow::Result<ComparedRemote>,
remote: String,
remote_channel: String,
},
}
#[derive(Debug)]
pub struct ComparedRemote {
pub record_dichotomy: RecordDichotomy,
pub remote: String,
pub remote_channel: String,
}
#[derive(Debug, Error)]
pub enum PushError {
#[error("Nothing to push")]
Empty,
#[error("{0}")]
Other(#[from] anyhow::Error),
}
#[derive(Debug, Error)]
pub enum PullError {
#[error("Nothing to pull")]
Empty,
#[error("{0}")]
Other(#[from] anyhow::Error),
}
#[derive(Derivative)]
#[derivative(Debug)]
pub struct InternalState {
#[derivative(Debug(format_with = "fmt_pijul_repository"))]
pub repo: pijul::Repository,
}
pub type ChangedFiles = BTreeMap<file::Path, ChangedFile>;
pub type ChangedFile = BTreeSet<ChangedFileDiff>;
#[derive(Clone, Debug, PartialEq, Eq, strum::Display, Hash)]
pub enum ChangedFileDiff {
Move {
old_path: String,
},
MoveEdge,
Del {
contents: Option<Contents>,
},
Undel,
Add {
contents: Option<Contents>,
},
SolveNameConflict,
UnsolveNameConflict,
Edit {
line: usize,
deleted: bool,
contents: Contents,
},
Replacement {
line: usize,
change_contents: Contents,
replacement_contents: Contents,
},
SolveOrderConflict,
UnsolveOrderConflict,
ResurrectZombines,
AddRoot,
DelRoot,
}
impl PartialOrd for ChangedFileDiff {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ChangedFileDiff {
fn cmp(&self, other: &Self) -> cmp::Ordering {
diff_line(self)
.zip(diff_line(other))
.map(|(left, right)| left.cmp(&right))
.unwrap_or(cmp::Ordering::Equal)
}
}
fn diff_line(diff: &ChangedFileDiff) -> Option<usize> {
match diff {
ChangedFileDiff::Move { .. } => None,
ChangedFileDiff::MoveEdge => None,
ChangedFileDiff::Del { .. } => None,
ChangedFileDiff::Undel => None,
ChangedFileDiff::Add { .. } => None,
ChangedFileDiff::SolveNameConflict => None,
ChangedFileDiff::UnsolveNameConflict => None,
ChangedFileDiff::Edit { line, .. } => Some(*line),
ChangedFileDiff::Replacement { line, .. } => Some(*line),
ChangedFileDiff::SolveOrderConflict => None,
ChangedFileDiff::UnsolveOrderConflict => None,
ChangedFileDiff::ResurrectZombines => None,
ChangedFileDiff::AddRoot => None,
ChangedFileDiff::DelRoot => None,
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Contents {
Decoded(String),
ShortBase64(String),
UnknownEncoding(Vec<u8>),
}
pub type Diff = LocalChange<DiffHunk, Author>;
pub type DiffHunk = Hunk<Option<ChangeHash>, Local>;
pub type Log = Vec<LogEntry>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogEntry {
pub hash: ChangeHash,
pub message: String,
pub description: Option<String>,
pub timestamp: Timestamp,
pub file_paths: Vec<String>,
}
#[derive(Debug, Error)]
pub enum LoadError {
#[error("The path to repository {0:?} doesn't exit")]
DoesntExist(String),
#[error("The path {0:?} doesn't contain a Pijul repository")]
NotPijulRepo(String),
#[error("The path to repository {0:?} is not accessible: {1}")]
Inaccessible(String, std::io::Error),
}
pub fn manage(
path: PathBuf,
mut msg_in_rx: mpsc::UnboundedReceiver<MsgIn>,
msg_out_tx: mpsc::UnboundedSender<MsgOut>,
) -> std::thread::JoinHandle<()> {
if !matches!(
pijul_interaction::get_context(),
Ok(pijul_interaction::InteractiveContext::NotInteractive)
) {
pijul_interaction::set_context(
pijul_interaction::InteractiveContext::NotInteractive,
);
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
match load(&path) {
Ok((mut internal_state, Ok(state))) => {
let _ = msg_out_tx.send(MsgOut::Init(state));
let local = task::LocalSet::new();
let msg_out_tx_clone = msg_out_tx.clone();
local.spawn_local(async move {
while let Some(msg) = msg_in_rx.recv().await {
info!("Repo received msg {msg}");
internal_state =
update(internal_state, msg, &msg_out_tx_clone)
.await;
}
});
rt.block_on(local);
let _ = msg_out_tx.send(MsgOut::RepoTaskExited);
}
Ok((_, Err(err))) => {
let _ = msg_out_tx.send(MsgOut::InitFailed {
err: err.to_string(),
switch_to_project_picker: false,
});
}
Err(err) => {
let _ = msg_out_tx.send(MsgOut::InitFailed {
err: err.to_string(),
switch_to_project_picker: true,
});
}
}
})
}
pub fn load(
path: &Path,
) -> Result<(InternalState, anyhow::Result<State>), LoadError> {
match std::fs::exists(path) {
Ok(true) => {}
Ok(false) => {
return Err(LoadError::DoesntExist(
path.to_string_lossy().to_string(),
))
}
Err(e) => {
return Err(LoadError::Inaccessible(
path.to_string_lossy().to_string(),
e,
))
}
}
let repo = pijul::Repository::find_root(Some(path)).map_err(|_e| {
LoadError::NotPijulRepo(path.to_string_lossy().to_string())
})?;
let internal_state = InternalState { repo };
let state = get_state(&internal_state);
Ok((internal_state, state))
}
pub fn hash_bytes(bytes: &[u8]) -> ChangeHash {
let mut hasher = pijul::pristine::Hasher::default();
hasher.update(bytes);
hasher.finish()
}
pub fn hash_to_string(hash: &ChangeHash) -> String {
pijul::Base32::to_base32(hash)
}
pub fn is_pijul(dir: &Path) -> bool {
dir.join(PIJUL_DIR).exists()
}
pub fn is_git(dir: &Path) -> bool {
dir.join(GIT_DIR).exists()
}
async fn update(
mut internal_state: InternalState,
msg_in: MsgIn,
msg_out_tx: &mpsc::UnboundedSender<MsgOut>,
) -> InternalState {
match msg_in {
MsgIn::RefreshChangedAndUntrackedFiles => {
let state: anyhow::Result<State>;
(internal_state, state) = spawn_blocking(move || {
let state = get_state(&internal_state);
(internal_state, state)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::Refreshed {
state,
invalidate_logs: true,
});
}
MsgIn::AddUntrackedFile { path, recursive } => {
let path_clone = path.clone();
let result: anyhow::Result<()>;
(internal_state, result) = spawn_blocking(move || {
let result = add(&mut internal_state, &path_clone, recursive);
(internal_state, result)
})
.await
.unwrap();
let _ =
msg_out_tx.send(MsgOut::AddedUntrackedFile { result, path });
}
MsgIn::RmAddedFile { path } => {
let path_clone = path.clone();
let result: anyhow::Result<()>;
(internal_state, result) = spawn_blocking(move || {
let result = rm(&mut internal_state, &path_clone);
(internal_state, result)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::RmedAddedFile { result, path });
}
MsgIn::Record {
msg,
desc,
sk,
id_pk,
to_record,
} => {
let state: anyhow::Result<State>;
(internal_state, state) = spawn_blocking(move || {
let state = |internal_state: &InternalState| {
record(internal_state, msg, desc, sk, id_pk, to_record)?;
get_state(internal_state)
};
let state = state(&internal_state);
(internal_state, state)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::Refreshed {
state,
invalidate_logs: true,
});
}
MsgIn::GetChangeDiffs { hash } => {
let diffs: anyhow::Result<ChangedFiles>;
(internal_state, diffs) = spawn_blocking(move || {
let diffs =
get_change_diffs(&internal_state, hash).context(format!(
"Getting change with the hash {}",
hash_to_string(&hash)
));
(internal_state, diffs)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::GotChangeDiffs { hash, diffs });
}
MsgIn::SwitchToChannel(name) => {
let state: anyhow::Result<State>;
(internal_state, state) = spawn_blocking(move || {
let state = |internal_state: &mut InternalState| {
switch_to_channel(internal_state, name)?;
get_state(internal_state)
};
let state = state(&mut internal_state);
(internal_state, state)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::Refreshed {
state,
invalidate_logs: true,
});
}
MsgIn::ForkChannel(name) => {
let state: anyhow::Result<State>;
(internal_state, state) = spawn_blocking(move || {
let state = |internal_state: &mut InternalState| {
fork_channel(internal_state, name)?;
get_state(internal_state)
};
let state = state(&mut internal_state);
(internal_state, state)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::Refreshed {
state,
invalidate_logs: false,
});
}
MsgIn::LoadEntireLog => {
let log: anyhow::Result<Log>;
(internal_state, log) = spawn_blocking(move || {
let log = get_log(&internal_state.repo, None, None, None);
(internal_state, log)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::LoadedEntireLog(log));
}
MsgIn::LoadOtherChannelLog(channel) => {
let channel_returned: String;
let log: anyhow::Result<Log>;
(internal_state, channel_returned, log) =
spawn_blocking(move || {
let log = get_log(
&internal_state.repo,
Some(&channel),
None,
None,
);
(internal_state, channel, log)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::LoadedOtherChannelLog {
channel: channel_returned,
log,
});
}
MsgIn::Push { remote, channel } => {
let result = push(&internal_state.path, &remote, &channel).await;
let _ = msg_out_tx.send(MsgOut::Pushed {
remote,
channel,
result,
});
}
MsgIn::Pull { remote, channel } => {
let result = pull(&internal_state.path, &remote, &channel).await;
let _ = msg_out_tx.send(MsgOut::Pulled {
remote,
channel,
result,
});
}
MsgIn::ResetFile { path } => {
let state: anyhow::Result<State>;
(internal_state, state) = spawn_blocking(move || {
let state = |internal_state: &mut InternalState| {
reset_file(internal_state, path)?;
get_state(internal_state)
};
let state = state(&mut internal_state);
(internal_state, state)
})
.await
.unwrap();
let _ = msg_out_tx.send(MsgOut::Refreshed {
state,
invalidate_logs: false,
});
}
MsgIn::CompareRemote {
remote,
remote_channel,
} => {
let result =
compare_remote(&mut internal_state, &remote, &remote_channel)
.await;
let _ = msg_out_tx.send(MsgOut::ComparedRemote {
result,
remote,
remote_channel,
});
}
}
internal_state
}
pub async fn init(path: PathBuf) -> Result<PathBuf, anyhow::Error> {
let config =
PijulConfig::load(None, vec![]).context("Loading Pijul config")?;
let repo = pijul::Repository::init(&config, Some(&path), None, None)
.with_context(|| {
format!("Initializing a new Pijul repo at {}", path.display())
})?;
let mut txn = repo.pristine.mut_txn_begin()?;
let channel_name = libpijul::DEFAULT_CHANNEL.to_string();
txn.open_or_create_channel(&channel_name)
.context("Creating a default channel")?;
txn.set_current_channel(&channel_name)
.context("Setting the current channel to the default channel")?;
txn.commit().context("Commit")?;
Ok(path)
}
fn get_state(state: &InternalState) -> anyhow::Result<State> {
let repo = &state.repo;
let dir_name = dir_name(repo).to_string();
let channel = current_channel(repo).context("getting current channel")?;
let other_channels =
other_channels(repo).context("getting other channels")?;
let diff = get_diff(repo).context("getting diff")?;
let untracked_files =
untracked_files(repo).context("getting untracked files")?;
let changed_files =
changed_files(repo, &diff).context("getting changed files")?;
let (offset, limit) = (None, Some(STATUS_LOG_LIMIT));
let log =
get_log(repo, Some(&channel), offset, limit).context("getting log")?;
let config = PijulConfig::load(Some(&repo.path), vec![])
.context("getting Pijul config")?;
let remotes = Remotes {
default: config.default_remote,
other: config
.remotes
.into_iter()
.map(|conf| match conf {
pijul_config::remote::RemoteConfig::Ssh { name, .. } => name,
pijul_config::remote::RemoteConfig::Http { name, .. } => name,
})
.collect(),
};
Ok(State {
dir_name,
channel,
other_channels,
untracked_files,
changed_files,
short_log: log,
remotes,
})
}
fn dir_name(repo: &pijul::Repository) -> Cow<'_, str> {
repo.path.iter().next_back().unwrap().to_string_lossy()
}
fn current_channel(repo: &pijul::Repository) -> anyhow::Result<String> {
let txn = repo.pristine.txn_begin()?;
Ok(txn
.current_channel()
.unwrap_or(pijul::DEFAULT_CHANNEL)
.to_string())
}
fn other_channels(
repo: &pijul::Repository,
) -> anyhow::Result<BTreeSet<String>> {
let txn = repo.pristine.txn_begin()?;
let current = txn.current_channel().unwrap_or(pijul::DEFAULT_CHANNEL);
let channels = txn
.channels("")?
.into_iter()
.filter_map(|channel| {
let channel = channel.read();
let name = txn.name(&*channel);
(name != current).then(|| name.to_string())
})
.collect();
Ok(channels)
}
fn switch_to_channel(
state: &mut InternalState,
name: String,
) -> anyhow::Result<()> {
let repo = &state.repo;
let current_channel = current_channel(repo)?;
let txn = repo.pristine.arc_txn_begin()?;
if name == current_channel {
return Ok(());
} else {
let channel = {
let txn = txn.read();
txn.load_channel(¤t_channel)?
};
if let Some(channel) = channel
&& has_unrecorded_changes(txn.clone(), channel.clone(), repo)?
{
error!("Cannot change channel, as there are unrecorded changes.");
return Ok(());
}
}
txn.write().set_current_channel(&name)?;
txn.commit().context("Commit")?;
Ok(())
}
fn fork_channel(state: &mut InternalState, name: String) -> anyhow::Result<()> {
let repo = &state.repo;
let current_channel = current_channel(repo)?;
let mut txn = repo.pristine.mut_txn_begin()?;
let channel = txn
.load_channel(¤t_channel)?
.context("Loading current channel")?;
let _fork = txn.fork(&channel, name.trim())?;
txn.commit().context("Commit")?;
Ok(())
}
#[derive(Debug)]
pub enum NewChannelError {
AlreadyExists(String),
}
#[allow(dead_code)] fn new_channel(
repo: &mut pijul::Repository,
name: String,
) -> Result<(), NewChannelError> {
use libpijul::{GraphTxnT, MutTxnTExt};
let mut txn = repo.pristine.mut_txn_begin().unwrap();
if txn.load_channel(&name).unwrap().is_some() {
return Err(NewChannelError::AlreadyExists(name));
}
let new = txn.open_or_create_channel(&name).unwrap();
let current = txn.current_channel().unwrap();
let channel = txn.load_channel(current).unwrap().unwrap();
let ch = channel.read();
let h = if let Some(Ok((_k, v))) =
libpijul::pristine::changeid_log(&txn, &ch, 0u64.into())
.unwrap()
.next()
{
Some(txn.get_external(&v.a).unwrap().unwrap().into())
} else {
None
};
if let Some(h) = h {
let mut new = new.write();
txn.apply_change(&repo.changes, &mut new, &h).unwrap();
}
txn.set_current_channel(&name).unwrap();
txn.commit().unwrap();
Ok(())
}
pub(crate) fn record(
state: &InternalState,
msg: String,
desc: Option<String>,
sk: Arc<SKey>,
id_pk: String,
to_record: to_record::State,
) -> anyhow::Result<()> {
let repo = &state.repo;
let diff = get_diff(repo)?;
let channel = current_channel(repo)?;
let txn = repo.pristine.arc_txn_begin()?;
let channel = txn
.read()
.load_channel(&channel)?
.context("Loading current channel")?;
let mut extra = Vec::new();
let config = PijulConfig::load(Some(&repo.path), vec![])?;
for h in config.extra_dependencies.iter() {
let (h, c) = txn.read().hash_from_prefix(h)?;
if txn
.read()
.get_changeset(txn.read().changes(&*channel.read()), &c)?
.is_none()
{
bail!(
"Change {:?} (from .pijul/config) is not on channel {:?}",
h,
channel.read().name
)
}
extra.push(h)
}
let authors =
vec![Author(BTreeMap::from_iter([("key".to_string(), id_pk)]))];
let timestamp = Timestamp::now();
let header = ChangeHeader {
message: msg,
authors,
description: desc,
timestamp,
};
txn.write().apply_root_change_if_needed(
&repo.changes,
&channel,
rand::rng(),
)?;
let mut state = pijul::RecordBuilder::new();
state.record(
txn.clone(),
pijul::Algorithm::default(),
false,
&pijul::DEFAULT_SEPARATOR,
channel.clone(),
&repo.working_copy,
&repo.changes,
"",
1, )?;
let rec = state.finish();
if rec.actions.is_empty() {
bail!("Nothing to record");
}
{
let mut txn_ = txn.write();
let actions = rec
.actions
.into_iter()
.map(|rec| {
let res = rec.globalize(&*txn_)?;
Ok(res)
})
.collect::<anyhow::Result<Vec<_>>>()?;
let contents = Arc::into_inner(rec.contents)
.context(
"Failed to get unique strong reference on change contents",
)?
.into_inner();
let mut change = LocalChange::make_change(
&*txn_,
&channel,
actions,
contents,
header,
vec![],
)?;
match to_record.overall {
to_record::PickSet::Exclude => {
bail!("Nothing to record")
}
to_record::PickSet::Partial => {
change.changes.retain(|hunk| {
if let Ok((file, diff)) =
pijul_change_to_diff(repo, &diff, hunk)
{
match to_record::determine_file(&to_record, &file) {
to_record::PickSet::Exclude => {
return false;
}
to_record::PickSet::Partial => {
let diff_id = diff::id_parts_hash(&diff);
match to_record::determine_change(
&file, diff_id, &to_record,
) {
to_record::Pick::Exclude => {
return false;
}
to_record::Pick::Include => {}
}
}
to_record::PickSet::Include => {}
}
}
true
});
}
to_record::PickSet::Include => {}
}
let current: HashSet<_> = change.dependencies.iter().cloned().collect();
for dep in extra.into_iter() {
if !current.contains(&dep) {
change.dependencies.push(dep)
}
}
#[allow(clippy::result_large_err)]
let hash = repo.changes.save_change(&mut change, |change, hash| {
change.unhashed = Some(serde_json::json!({
"signature": sk.sign_raw(&hash.to_bytes()).unwrap(),
}));
Ok::<_, pijul::changestore::filesystem::Error>(())
})?;
txn_.apply_local_change(&channel, &change, &hash, &rec.updatables)?;
let mut path = repo.path.join(pijul::DOT_DIR);
path.push("identities");
std::fs::create_dir_all(&path)?;
let mut oldest = rec
.oldest_change
.duration_since(std::time::SystemTime::UNIX_EPOCH)?
.as_millis() as u64;
if oldest == 0 {
oldest = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)?
.as_millis() as u64;
}
txn_.touch_channel(&mut *channel.write(), Some((oldest / 1000) * 1000));
}
txn.commit().context("Commit")?;
Ok(())
}
fn get_diff(repo: &pijul::Repository) -> anyhow::Result<Diff> {
let txn = repo.pristine.arc_txn_begin()?;
let cur_channel = txn
.read()
.current_channel()
.unwrap_or(pijul::DEFAULT_CHANNEL)
.to_string();
let channel = txn.write().open_or_create_channel(&cur_channel)?;
let mut state = pijul::RecordBuilder::new();
state.record(
txn.clone(),
pijul::Algorithm::default(),
false,
&pijul::DEFAULT_SEPARATOR,
channel.clone(),
&repo.working_copy,
&repo.changes,
"",
std::thread::available_parallelism()?.get(),
)?;
let rec = state.finish();
let actions: Vec<_> = {
let txn_ = txn.read();
rec.actions
.into_iter()
.map(|rec| {
let res = rec.globalize(&*txn_)?;
Ok(res)
})
.collect::<anyhow::Result<Vec<_>>>()?
};
let contents = std::sync::Arc::into_inner(rec.contents)
.context("Failed to get unique strong reference on change contents")?
.into_inner();
let mut change = LocalChange::make_change(
&*txn.read(),
&channel,
actions,
contents,
ChangeHeader::default(),
Vec::new(),
)?;
let (dependencies, extra_known) = {
let txn_ = txn.read();
pijul::change::dependencies(
&*txn_,
&*channel.read(),
change.changes.iter(),
)?
};
change.dependencies = dependencies;
change.extra_known = extra_known;
Ok(change)
}
pub fn add(
repo: &mut pijul::Repository,
file_path_str: &str,
recursive: bool,
) -> anyhow::Result<()> {
let txn = repo.pristine.arc_txn_begin()?;
let repo_path = CanonicalPathBuf::canonicalize(&repo.path)?;
let full_path = {
let mut p = repo.path.clone();
p.push(file_path_str);
p
};
let path = CanonicalPathBuf::canonicalize(full_path)?;
let meta = std::fs::metadata(&path)?;
if !working_copy::filesystem::filter_ignore(
repo_path.as_ref(),
path.as_ref(),
meta.is_dir(),
) {
bail!("Cannot add ignored file \"{file_path_str}\"");
}
if recursive {
let threads = std::thread::available_parallelism()?.get();
use libpijul::working_copy::filesystem::*;
let (full, _) = get_prefix(Some(repo_path.as_ref()), path.as_path())?;
let full = CanonicalPathBuf::new(&full)?;
repo.working_copy.add_prefix_rec(
&txn,
repo_path.clone(),
full.clone(),
false,
threads,
0,
)?
} else {
let mut txn = txn.write();
let file_path = PathBuf::from(file_path_str);
let path_str = path_slash::PathExt::to_slash_lossy(file_path.as_path());
if !txn.is_tracked(&path_str)? {
if let Err(e) = txn.add(&path_str, meta.is_dir(), 0) {
bail!("Failed to track file with: {e}");
}
} else {
bail!("Won't add file \"{path_str}\" as it's already tracked");
}
}
txn.commit().context("Commit")?;
Ok(())
}
fn rm(repo: &mut pijul::Repository, file_path_str: &str) -> anyhow::Result<()> {
info!("Removing tracked {file_path_str}");
let mut txn = repo.pristine.mut_txn_begin()?;
let file_path = PathBuf::from(file_path_str);
let path_str = path_slash::PathExt::to_slash_lossy(file_path.as_path());
if txn.is_tracked(&path_str)? {
txn.remove_file(&path_str)?;
}
txn.commit().context("Commit")?;
Ok(())
}
fn get_change_diffs(
internal_state: &InternalState,
hash: ChangeHash,
) -> anyhow::Result<ChangedFiles> {
let change = internal_state.repo.changes.get_change(&hash)?;
changed_files(&internal_state.repo, &change)
}
fn changed_files(
repo: &pijul::Repository,
diff: &Diff,
) -> anyhow::Result<ChangedFiles> {
let mut changes: ChangedFiles = BTreeMap::new();
for change in diff.changes.iter() {
let (file, diff) = pijul_change_to_diff(repo, diff, change)?;
changes.entry(file).or_default().insert(diff);
}
Ok(changes)
}
fn pijul_change_to_diff(
repo: &pijul::Repository,
diff: &Diff,
hunk: &DiffHunk,
) -> anyhow::Result<(file::Path, ChangedFileDiff)> {
let (path, diff) = match hunk {
BaseHunk::FileMove {
del: _,
add,
path: old_path,
} => match add {
change::Atom::NewVertex(add) => {
let FileMetadata {
basename: new_path,
metadata: _,
..
} = FileMetadata::read(
&diff.contents[add.start.0.into()..add.end.0.into()],
);
(
new_path.to_string(),
ChangedFileDiff::Move {
old_path: old_path.clone(),
},
)
}
change::Atom::EdgeMap(_edge_map) => {
(old_path.clone(), ChangedFileDiff::MoveEdge)
}
},
BaseHunk::FileDel {
del: _,
contents,
path,
encoding: _,
} => {
let contents = if let Some(contents) = contents.as_ref() {
let raw_contents = get_change_contents(
&repo.changes,
contents,
&diff.contents,
)?;
let encoding = encoding::detect(&raw_contents);
Some(try_decode_contents(raw_contents, &encoding))
} else {
None
};
(path.clone(), ChangedFileDiff::Del { contents })
}
BaseHunk::FileUndel {
undel: _,
contents: _,
path,
encoding: _,
} => (path.clone(), ChangedFileDiff::Undel),
BaseHunk::FileAdd {
path,
add_name: _,
add_inode: _,
contents,
encoding: _,
} => {
let contents = if let Some(contents) = contents {
let raw_contents = get_change_contents(
&repo.changes,
contents,
&diff.contents,
)?;
let encoding = encoding::detect(&raw_contents);
Some(try_decode_contents(raw_contents, &encoding))
} else {
None
};
(path.clone(), ChangedFileDiff::Add { contents })
}
BaseHunk::SolveNameConflict { name: _, path } => {
(path.clone(), ChangedFileDiff::SolveNameConflict)
}
BaseHunk::UnsolveNameConflict { name: _, path } => {
(path.clone(), ChangedFileDiff::UnsolveNameConflict)
}
BaseHunk::Edit {
local,
change,
encoding: _,
} => {
let line = local.line;
let deleted = if let change::Atom::EdgeMap(map) = change {
map.edges
.first()
.map(|edge| edge.flag.is_deleted())
.unwrap_or_default()
} else {
false
};
let raw_contents =
get_change_contents(&repo.changes, change, &diff.contents)?;
let encoding = encoding::detect(&raw_contents);
let contents = try_decode_contents(raw_contents, &encoding);
let diff = ChangedFileDiff::Edit {
line,
deleted,
contents,
};
(local.path.clone(), diff)
}
BaseHunk::Replacement {
change,
replacement,
local,
encoding: _e,
} => {
let line = local.line;
let raw_change_contents =
get_change_contents(&repo.changes, change, &diff.contents)?;
let encoding = encoding::detect(&raw_change_contents);
let change_contents =
try_decode_contents(raw_change_contents, &encoding);
let raw_replacement_contents = get_change_contents(
&repo.changes,
replacement,
&diff.contents,
)?;
let encoding = encoding::detect(&raw_replacement_contents);
let replacement_contents =
try_decode_contents(raw_replacement_contents, &encoding);
let diff = ChangedFileDiff::Replacement {
line,
change_contents,
replacement_contents,
};
(local.path.clone(), diff)
}
BaseHunk::SolveOrderConflict { change: _, local } => {
(local.path.clone(), ChangedFileDiff::SolveOrderConflict)
}
BaseHunk::UnsolveOrderConflict { change: _, local } => {
(local.path.clone(), ChangedFileDiff::UnsolveOrderConflict)
}
BaseHunk::ResurrectZombies {
change: _,
local,
encoding: _,
} => (local.path.clone(), ChangedFileDiff::ResurrectZombines),
BaseHunk::AddRoot { name: _, inode: _ } => {
(ROOT_FILE.to_string(), ChangedFileDiff::AddRoot)
}
BaseHunk::DelRoot { name: _, inode: _ } => {
(ROOT_FILE.to_string(), ChangedFileDiff::DelRoot)
}
};
let is_dir = fs::metadata(repo.path.join(&path))
.map(|metadata| metadata.is_dir())
.unwrap_or_default();
let path = file::Path { raw: path, is_dir };
Ok((path, diff))
}
fn untracked_files(
repo: &pijul::Repository,
) -> anyhow::Result<BTreeSet<file::Path>> {
let repo_path = CanonicalPathBuf::canonicalize(&repo.path)?;
let txn = repo.pristine.arc_txn_begin()?;
let threads = std::thread::available_parallelism()?.get();
let txn_ = txn.clone();
repo.working_copy
.iterate_prefix_rec(
repo_path.clone(),
repo_path.clone(),
false,
threads,
move |path, _| {
use path_slash::PathExt;
let path_str = path.to_slash_lossy();
path_str.is_empty() || txn.read().is_tracked(&path_str).unwrap()
},
)?
.filter_map(move |path| match path {
Err(e) => Some(Err(anyhow::Error::from(e))),
Ok((path, _)) => {
use path_slash::PathExt;
let path_str = path.to_slash_lossy();
let is_tracked = match txn_.read().is_tracked(&path_str) {
Ok(is_tracked) => is_tracked,
Err(e) => return Some(Err(anyhow::Error::from(e))),
};
if !is_tracked {
let path = path_str.into_owned();
let is_dir = match fs::metadata(repo.path.join(&path)) {
Err(e) => {
return if e.kind() == std::io::ErrorKind::NotFound {
None
} else {
Some(Err(anyhow::Error::from(e)))
};
}
Ok(metadata) => metadata.is_dir(),
};
Some(Ok(file::Path { raw: path, is_dir }))
} else {
None
}
}
})
.collect()
}
pub(crate) fn get_log(
repo: &pijul::Repository,
channel: Option<&str>,
offset: Option<usize>,
limit: Option<usize>,
) -> anyhow::Result<Vec<LogEntry>> {
let mut offset = offset.unwrap_or(0);
let mut limit = limit.unwrap_or(usize::MAX);
let txn = repo.pristine.txn_begin()?;
let cur_channel = channel
.unwrap_or_else(|| {
txn.current_channel().unwrap_or(pijul::DEFAULT_CHANNEL)
})
.to_string();
let channel = txn
.load_channel(&cur_channel)?
.context("Loading current channel")?;
let inodes = get_inodes(&txn, &repo.path, &[]).context("getting inodes")?;
let mut reverse_log = txn.reverse_log(&channel.read(), None)?.peekable();
if reverse_log.peek().is_none() {
return Ok(vec![]);
}
let mut entries = vec![];
for pr in reverse_log {
let (_, (hash, _mrk)) = pr?;
let cid = pijul::GraphTxnT::get_internal(&txn, hash)?
.context("Getting log change ID")?;
let mut is_in_filters = inodes.is_empty();
for (_, position) in inodes.iter() {
if let Some(position) = position {
is_in_filters = pijul::DepsTxnT::get_touched_files(
&txn,
position,
Some(cid),
)? == Some(cid);
if is_in_filters {
break;
}
}
}
if is_in_filters {
if offset == 0 && limit > 0 {
let entry = mk_log_entry(repo, ChangeHash::from(hash))?;
entries.push(entry);
limit -= 1
} else if limit > 0 {
offset -= 1
} else {
break;
}
}
}
Ok(entries)
}
fn mk_log_entry(
repo: &pijul::Repository,
hash: ChangeHash,
) -> anyhow::Result<LogEntry> {
let header = repo.changes.get_header(&hash)?;
let files = repo.changes.get_changes(&hash)?;
let file_paths: BTreeSet<String> = files
.into_iter()
.map(|hunk| hunk.path().to_string())
.collect();
let file_paths: Vec<String> = file_paths.into_iter().collect();
let message = header.message;
let description = header.description;
let timestamp = header.timestamp;
Ok(LogEntry {
hash,
message,
description,
timestamp,
file_paths,
})
}
#[allow(
clippy::await_holding_lock,
reason = "imposed by sanakirja API for txn"
)]
async fn push(
repo_path: &Path,
remote_name: &str,
channel_name: &str,
) -> Result<(), PushError> {
let repo = pijul::Repository::find_root(Some(repo_path))?;
let txn = repo.pristine.arc_txn_begin().context("Begin pijul txn")?;
let config = PijulConfig::load(Some(repo_path), vec![])?;
let remote_channel = &channel_name;
let push_channel = None;
let no_cert_check = false;
let mut remote = pijul_remote::repository(
&config,
Some(&repo.path),
None,
remote_name,
remote_channel,
no_cert_check,
true,
)
.await?;
let mut channel = txn
.write()
.open_or_create_channel(channel_name)
.context("Open or create channel")?;
let PushDelta {
to_upload,
remote_unrecs: _,
unknown_changes: _,
..
} = to_upload(&mut txn.write(), &mut channel, &repo, &mut remote).await?;
debug!("to_upload = {:?}", to_upload);
if to_upload.is_empty() {
txn.commit().context("Commit")?;
return Err(PushError::Empty);
}
remote
.upload_changes(
&mut *txn.write(),
repo.changes_dir.clone(),
push_channel,
&to_upload,
)
.await?;
txn.commit().context("Commit")?;
remote.finish().await?;
Ok(())
}
async fn to_upload<T: sanakirja::RawMutTxnT + 'static>(
txn: &mut sanakirja::MutTxn<T>,
channel: &mut ChannelRef<sanakirja::MutTxn<T>>,
repo: &pijul::Repository,
remote: &mut RemoteRepo,
) -> Result<PushDelta, anyhow::Error> {
let path = &[];
let force_cache = false;
let changes = &[];
let remote_delta = remote
.update_changelist_pushpull(
txn,
path,
channel,
Some(force_cache),
repo,
changes,
false,
)
.await?;
if let &mut RemoteRepo::LocalChannel(ref remote_channel) = remote {
remote_delta.to_local_channel_push(
remote_channel,
txn,
path,
channel,
repo,
)
} else {
remote_delta.to_remote_push(txn, path, channel, repo)
}
}
#[allow(
clippy::await_holding_lock,
reason = "imposed by sanakirja API for txn"
)]
async fn pull(
repo_path: &Path,
remote_name: &str,
channel_name: &str,
) -> Result<(), PullError> {
let mut repo = pijul::Repository::find_root(Some(repo_path))?;
let txn = repo.pristine.arc_txn_begin().context("Begin txn")?;
let mut channel = txn
.write()
.open_or_create_channel(channel_name)
.context("Open or create channel")?;
let config = PijulConfig::load(Some(repo_path), vec![])?;
let from_channel = libpijul::DEFAULT_CHANNEL;
let no_cert_check = false;
let mut remote = pijul_remote::repository(
&config,
Some(&repo.path),
None,
remote_name,
from_channel,
no_cert_check,
true,
)
.await?;
let RemoteDelta {
inodes,
remote_ref,
to_download,
remote_unrecs: _,
..
} = to_download(&mut txn.write(), &mut channel, &mut repo, &mut remote)
.await?;
let hash = pending(txn.clone(), &channel, &mut repo)?;
if let Some(ref r) = remote_ref {
remote.update_identities(&mut repo, r).await?;
}
if to_download.is_empty() {
if let Some(ref h) = hash {
txn.write()
.unrecord(&repo.changes, &channel, h, 0, &repo.working_copy)
.context("Unrecord")?;
}
txn.commit().context("Commit")?;
return Err(PullError::Empty);
}
{
let mut ws = libpijul::ApplyWorkspace::new();
debug!("to_download = {:#?}", to_download);
let mut channel = channel.write();
let mut txn = txn.write();
for h in to_download.iter().rev() {
match h {
pijul_remote::CS::Change(h) => {
txn.apply_change_rec_ws(
&repo.changes,
&mut channel,
h,
&mut ws,
)
.context("Apply change")?;
}
pijul_remote::CS::State(s) => {
if let Some(n) = txn
.channel_has_state(&channel.states, &s.into())
.context("Check if channel has state")?
{
txn.put_tags(&mut channel.tags, n.into(), s)
.context("Put tags")?;
} else {
return Err(anyhow!(
"Cannot add tag {}: channel {:?} does not have that state",
libpijul::Base32::to_base32(s),
channel.name
))?;
}
}
}
}
}
let full = false;
remote
.complete_changes(&repo, &*txn.read(), &mut channel, &to_download, full)
.await?;
remote.finish().await?;
debug!("inodes = {:?}", inodes);
debug!("to_download: {:?}", to_download.len());
let mut touched = HashSet::new();
let txn_ = txn.read();
for d in to_download.iter() {
debug!("to_download {:?}", d);
match d {
pijul_remote::CS::Change(d) => {
use pijul::GraphTxnT;
if let Some(int) =
txn_.get_internal(&d.into()).context("Get internal")?
{
use pijul::DepsTxnT;
for inode in txn_
.iter_rev_touched(int)
.context("Iter rev touched")?
{
let (int_, inode) = inode.context("Get change node")?;
if int_ < int {
continue;
} else if int_ > int {
break;
}
use pijul::GraphTxnT;
let ext = libpijul::pristine::Position {
change: txn_
.get_external(&inode.change)
.context("Get external change")?
.unwrap()
.into(),
pos: inode.pos,
};
if inodes.is_empty() || inodes.contains(&ext) {
touched.insert(*inode);
}
}
}
}
pijul_remote::CS::State(_) => {
}
}
}
std::mem::drop(txn_);
let is_current_channel = true;
if is_current_channel {
let mut touched_paths = BTreeSet::new();
{
let txn_ = txn.read();
for &i in touched.iter() {
if let Some((path, _)) = libpijul::fs::find_path(
&repo.changes,
&*txn_,
&*channel.read(),
false,
i,
)
.context("Find change path")?
{
touched_paths.insert(path.join("/"));
} else {
touched_paths.clear();
break;
}
}
}
if touched_paths.is_empty() {
touched_paths.insert(String::from(""));
}
let mut last: Option<&str> = None;
let mut conflicts = Vec::new();
for path in touched_paths.iter() {
if let Some(last_path) = last {
if last_path.len() < path.len() {
let (pre_last, post_last) = path.split_at(last_path.len());
if pre_last == last_path && post_last.starts_with("/") {
continue;
}
}
}
debug!("path = {:?}", path);
conflicts.extend(
libpijul::output::output_repository_no_pending(
&repo.working_copy,
&repo.changes,
&txn,
&channel,
path,
true,
None,
std::thread::available_parallelism()
.context("Get available parallelism")?
.get(),
0,
)
.context("Output repo no pending")?
.into_iter(),
);
last = Some(path)
}
}
if let Some(h) = hash {
txn.write()
.unrecord(&repo.changes, &channel, &h, 0, &repo.working_copy)
.context("Unrecord")?;
repo.changes.del_change(&h).context("Delete change")?;
}
txn.commit().context("Commit")?;
Ok(())
}
fn reset_file(state: &mut InternalState, path: String) -> anyhow::Result<()> {
let repo = &state.repo;
let current_channel = current_channel(repo)?;
let txn = repo.pristine.arc_txn_begin()?;
let channel = txn
.read()
.load_channel(¤t_channel)?
.context("Loading current channel")?;
let mut conflicts = Vec::new();
conflicts.extend(libpijul::output::output_repository_no_pending(
&repo.working_copy,
&repo.changes,
&txn,
&channel,
&path,
true,
None,
std::thread::available_parallelism()?.get(),
0,
)?);
txn.commit().context("Commit")?;
Ok(())
}
#[allow(
clippy::await_holding_lock,
reason = "imposed by sanakirja API for txn"
)]
async fn compare_remote(
internal_state: &mut InternalState,
remote_name: &str,
remote_channel_name: &str,
) -> anyhow::Result<ComparedRemote> {
let txn = internal_state
.repo
.pristine
.arc_txn_begin()
.context("Begin txn")?;
let mut channel = txn
.write()
.open_or_create_channel(¤t_channel(&internal_state.repo)?)
.context("Loading current channel")?;
let config = PijulConfig::load(Some(&internal_state.repo.path), vec![])?;
let no_cert_check = false;
let mut remote = pijul_remote::repository(
&config,
Some(&internal_state.repo.path),
None,
remote_name,
remote_channel_name,
no_cert_check,
true,
)
.await?;
let PushDelta {
to_upload,
remote_unrecs: _,
unknown_changes: _,
..
} = to_upload(
&mut txn.write(),
&mut channel,
&internal_state.repo,
&mut remote,
)
.await?;
let RemoteDelta {
inodes: _,
remote_ref: _,
to_download,
remote_unrecs,
..
} = to_download(
&mut txn.write(),
&mut channel,
&mut internal_state.repo,
&mut remote,
)
.await?;
let remote_records = to_download
.into_iter()
.filter_map(|cs| match cs {
pijul_remote::CS::Change(hash) => Some(hash),
pijul_remote::CS::State(_) => None,
})
.map(|hash| mk_log_entry(&internal_state.repo, hash))
.sorted_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => Ord::cmp(&a.timestamp, &b.timestamp),
_ => cmp::Ordering::Equal,
})
.collect::<anyhow::Result<Log>>()?;
let remote_unrecords = remote_unrecs
.into_iter()
.filter_map(|(_, cs)| match cs {
pijul_remote::CS::Change(hash) => Some(hash),
pijul_remote::CS::State(_) => None,
})
.map(|hash| mk_log_entry(&internal_state.repo, hash))
.sorted_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => Ord::cmp(&a.timestamp, &b.timestamp),
_ => cmp::Ordering::Equal,
})
.collect::<anyhow::Result<Log>>()?;
let local_records = to_upload
.into_iter()
.filter_map(|cs| match cs {
pijul_remote::CS::Change(hash) => Some(hash),
pijul_remote::CS::State(_) => None,
})
.map(|hash| mk_log_entry(&internal_state.repo, hash))
.sorted_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => Ord::cmp(&a.timestamp, &b.timestamp).reverse(),
_ => cmp::Ordering::Equal,
})
.collect::<anyhow::Result<Log>>()?;
let record_dichotomy = RecordDichotomy {
remote_records,
remote_unrecords,
local_records,
};
Ok(ComparedRemote {
record_dichotomy,
remote: remote_name.to_string(),
remote_channel: remote_channel_name.to_string(),
})
}
async fn to_download<T: sanakirja::RawMutTxnT + 'static>(
txn: &mut sanakirja::MutTxn<T>,
channel: &mut ChannelRef<sanakirja::MutTxn<T>>,
repo: &mut pijul::Repository,
remote: &mut RemoteRepo,
) -> Result<RemoteDelta<sanakirja::MutTxn<T>>, anyhow::Error> {
let path = &[];
let force_cache = None;
let changes = &[];
let delta = remote
.update_changelist_pushpull(
txn,
path,
channel,
force_cache,
repo,
changes,
true,
)
.await?;
let to_download = remote
.pull(
repo,
txn,
channel,
delta.to_download.as_slice(),
&delta.inodes,
false,
)
.await?;
Ok(RemoteDelta {
to_download,
..delta
})
}
fn pending<T: pijul::MutTxnTExt + pijul::TxnT + Send + Sync + 'static>(
txn: pijul::ArcTxn<T>,
channel: &pijul::ChannelRef<T>,
repo: &mut pijul_repository::Repository,
) -> anyhow::Result<Option<libpijul::Hash>> {
use libpijul::changestore::ChangeStore;
let mut builder = pijul::record::Builder::new();
builder.record(
txn.clone(),
libpijul::Algorithm::default(),
false,
&libpijul::DEFAULT_SEPARATOR,
channel.clone(),
&repo.working_copy,
&repo.changes,
"",
std::thread::available_parallelism()?.get(),
)?;
let recorded = builder.finish();
if recorded.actions.is_empty() {
return Ok(None);
}
let mut txn = txn.write();
let actions = recorded
.actions
.into_iter()
.map(|rec| rec.globalize(&*txn).unwrap())
.collect();
let contents = if let Ok(c) = std::sync::Arc::try_unwrap(recorded.contents)
{
c.into_inner()
} else {
unreachable!()
};
let mut pending_change = libpijul::change::Change::make_change(
&*txn,
channel,
actions,
contents,
libpijul::change::ChangeHeader::default(),
Vec::new(),
)?;
let (dependencies, extra_known) = libpijul::change::dependencies(
&*txn,
&*channel.read(),
pending_change.changes.iter(),
)?;
pending_change.dependencies = dependencies;
pending_change.extra_known = extra_known;
let hash = repo
.changes
.save_change(&mut pending_change, |_, _| Ok::<_, anyhow::Error>(()))
.unwrap();
txn.apply_local_change(
channel,
&pending_change,
&hash,
&recorded.updatables,
)?;
Ok(Some(hash))
}
fn get_inodes(
txn: &sanakirja::Txn,
repo_path: &Path,
pats: &[String],
) -> anyhow::Result<
Vec<(
pijul::Inode,
Option<pijul::pristine::Position<pijul::ChangeId>>,
)>,
> {
let mut inodes = Vec::new();
for pat in pats {
let canon_path = match Path::new(pat).canonicalize() {
Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => {
bail!("Path {pat:?} not found {e:?}")
}
Err(e) => {
bail!("Path {pat:?} error: {e:?}")
}
Ok(p) => p,
};
match canon_path.strip_prefix(repo_path).map(|p| p.to_str()) {
Err(e) => {
bail!("Strip path prefix failed with {e:?}")
}
Ok(None) => {
bail!("Invalid UTF8 {pat:?}")
}
Ok(Some(s)) => {
let inode = pijul::fs::find_inode(txn, s)
.with_context(|| format!("find inode {s:?}"))?;
let inode_position =
pijul::TreeTxnT::get_inodes(txn, &inode, None)
.with_context(|| format!("get inode position {s:?}"))?;
inodes.push((inode, inode_position.cloned()))
}
};
}
Ok(inodes)
}
fn fmt_pijul_repository(
value: &pijul::Repository,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
f.debug_struct("Repository")
.field("path", &value.path)
.field("changes_dir", &value.changes_dir)
.finish_non_exhaustive()
}
impl Deref for InternalState {
type Target = pijul::Repository;
fn deref(&self) -> &Self::Target {
&self.repo
}
}
impl DerefMut for InternalState {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.repo
}
}
fn try_decode_contents(raw: Vec<u8>, encoding: &Encoding) -> Contents {
match encoding {
Encoding::Text(encoding) => {
let decoded = encoding.decode(&raw);
Contents::Decoded(decoded.to_string())
}
Encoding::Other => {
if raw.len() <= MAX_LEN_BASE64_DISPLAY {
let encoded =
format!("b{}", data_encoding::BASE64.encode(&raw));
Contents::ShortBase64(encoded)
} else {
Contents::UnknownEncoding(raw)
}
}
Encoding::Image
| Encoding::Audio
| Encoding::Video
| Encoding::Font => Contents::UnknownEncoding(raw),
}
}
fn has_unrecorded_changes<T: sanakirja::RawMutTxnT + 'static + Sync>(
txn: pijul::ArcTxn<sanakirja::MutTxn<T>>,
channel: pijul::ChannelRef<sanakirja::MutTxn<T>>,
repo: &pijul::Repository,
) -> anyhow::Result<bool> {
let mut state = libpijul::RecordBuilder::new();
state.record(
txn,
libpijul::Algorithm::default(),
false,
&libpijul::DEFAULT_SEPARATOR,
channel,
&repo.working_copy,
&repo.changes,
"",
std::thread::available_parallelism()?.get(),
)?;
let rec = state.finish();
debug!("actions = {:?}", rec.actions);
Ok(!rec.actions.is_empty())
}