use std::collections::{BTreeSet, HashSet};
use std::io::Write;
use std::path::PathBuf;
use super::{make_changelist, parse_changelist};
use anyhow::bail;
use clap::{Parser, ValueHint};
use lazy_static::lazy_static;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::sanakirja::MutTxn;
use libpijul::*;
use log::debug;
use regex::Regex;
use pijul_interaction::{ProgressBar, Spinner, APPLY_MESSAGE, OUTPUT_MESSAGE};
use pijul_remote::{self as remote, PushDelta, RemoteDelta, RemoteRepo, CS};
use pijul_repository::Repository;
#[derive(Parser, Debug)]
pub struct Remote {
#[clap(subcommand)]
subcmd: Option<SubRemote>,
#[clap(long = "repository", value_hint = ValueHint::DirPath)]
repo_path: Option<PathBuf>,
}
#[derive(Parser, Debug)]
pub enum SubRemote {
#[clap(name = "default")]
Default { remote: String },
#[clap(name = "delete")]
Delete { remote: String },
}
impl Remote {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let mut stdout = std::io::stdout();
match self.subcmd {
None => {
let txn = repo.pristine.txn_begin()?;
for r in txn.iter_remotes(&libpijul::pristine::RemoteId::nil())? {
let r = r?;
writeln!(stdout, " {}: {}", r.id(), r.lock().path.as_str())?;
}
}
Some(SubRemote::Default { remote }) => {
let mut repo = repo;
repo.config.default_remote = Some(remote);
repo.update_config()?;
}
Some(SubRemote::Delete { remote }) => {
let remote =
if let Some(r) = libpijul::pristine::RemoteId::from_base32(remote.as_bytes()) {
r
} else {
bail!("Could not parse identifier: {:?}", remote)
};
let mut txn = repo.pristine.mut_txn_begin()?;
if !txn.drop_named_remote(remote)? {
bail!("Remote not found: {:?}", remote)
} else {
txn.commit()?;
}
}
}
Ok(())
}
}
#[derive(Parser, Debug)]
pub struct Push {
#[clap(long = "repository", value_hint = ValueHint::DirPath)]
repo_path: Option<PathBuf>,
#[clap(long = "from-channel")]
from_channel: Option<String>,
#[clap(long = "all", short = 'a', conflicts_with = "changes")]
all: bool,
#[clap(long = "force-cache", short = 'f')]
force_cache: bool,
#[clap(short = 'k')]
no_cert_check: bool,
#[clap(long = "path", value_hint = ValueHint::AnyPath)]
path: Vec<String>,
to: Option<String>,
#[clap(long = "to-channel")]
to_channel: Option<String>,
#[clap(last = true)]
changes: Vec<String>,
}
#[derive(Parser, Debug)]
pub struct Pull {
#[clap(long = "repository", value_hint = ValueHint::DirPath)]
repo_path: Option<PathBuf>,
#[clap(long = "to-channel")]
to_channel: Option<String>,
#[clap(long = "all", short = 'a', conflicts_with = "changes")]
all: bool,
#[clap(long = "force-cache", short = 'f')]
force_cache: bool,
#[clap(short = 'k')]
no_cert_check: bool,
#[clap(long = "full")]
full: bool, #[clap(long = "path", value_hint = ValueHint::AnyPath)]
path: Vec<String>,
from: Option<String>,
#[clap(long = "from-channel")]
from_channel: Option<String>,
#[clap(last = true)]
changes: Vec<String>, }
lazy_static! {
static ref CHANNEL: Regex = Regex::new(r#"([^:]*)(:(.*))?"#).unwrap();
}
impl Push {
async fn to_upload(
&self,
txn: &mut MutTxn<()>,
channel: &mut ChannelRef<MutTxn<()>>,
repo: &Repository,
remote: &mut RemoteRepo,
) -> Result<PushDelta, anyhow::Error> {
let remote_delta = remote
.update_changelist_pushpull(
txn,
&self.path,
channel,
Some(self.force_cache),
repo,
self.changes.as_slice(),
false,
)
.await?;
if let RemoteRepo::LocalChannel(ref remote_channel) = remote {
remote_delta.to_local_channel_push(
remote_channel,
txn,
self.path.as_slice(),
channel,
repo,
)
} else {
remote_delta.to_remote_push(txn, self.path.as_slice(), channel, repo)
}
}
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut stderr = std::io::stderr();
let repo = Repository::find_root(self.repo_path.clone())?;
debug!("{:?}", repo.config);
let txn = repo.pristine.arc_txn_begin()?;
let cur = txn
.read()
.current_channel()
.unwrap_or(libpijul::DEFAULT_CHANNEL)
.to_string();
let channel_name = if let Some(ref c) = self.from_channel {
c
} else {
cur.as_str()
};
let remote_name = if let Some(ref rem) = self.to {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
bail!("Missing remote");
};
let mut push_channel = None;
let remote_channel = if let Some(ref c) = self.to_channel {
let c = CHANNEL.captures(c).unwrap();
push_channel = c.get(3).map(|x| x.as_str());
let c = c.get(1).unwrap().as_str();
if c.is_empty() {
channel_name
} else {
c
}
} else {
channel_name
};
debug!("remote_channel = {:?} {:?}", remote_channel, push_channel);
let mut remote = remote::repository(
&repo,
Some(&repo.path),
None,
&remote_name,
remote_channel,
self.no_cert_check,
true,
)
.await?;
let mut channel = txn.write().open_or_create_channel(&channel_name)?;
let PushDelta {
to_upload,
remote_unrecs,
unknown_changes,
..
} = self
.to_upload(&mut *txn.write(), &mut channel, &repo, &mut remote)
.await?;
debug!("to_upload = {:?}", to_upload);
if to_upload.is_empty() {
writeln!(stderr, "Nothing to push")?;
txn.commit()?;
return Ok(());
}
notify_remote_unrecords(&repo, remote_unrecs.as_slice());
notify_unknown_changes(unknown_changes.as_slice());
let to_upload = if !self.changes.is_empty() {
let mut u: Vec<CS> = Vec::new();
let mut not_found = Vec::new();
let txn = txn.read();
for change in self.changes.iter() {
match txn.hash_from_prefix(change) {
Ok((hash, _)) => {
if to_upload.contains(&CS::Change(hash)) {
u.push(CS::Change(hash));
}
}
Err(_) => {
if !not_found.contains(change) {
not_found.push(change.to_string());
}
}
}
}
u.sort_by(|a, b| match (a, b) {
(CS::Change(a), CS::Change(b)) => {
let na = txn.get_revchanges(&channel, a).unwrap().unwrap();
let nb = txn.get_revchanges(&channel, b).unwrap().unwrap();
na.cmp(&nb)
}
(CS::State(a), CS::State(b)) => {
let na = txn
.channel_has_state(txn.states(&*channel.read()), &a.into())
.unwrap()
.unwrap();
let nb = txn
.channel_has_state(txn.states(&*channel.read()), &b.into())
.unwrap()
.unwrap();
na.cmp(&nb)
}
_ => unreachable!(),
});
if !not_found.is_empty() {
bail!("Changes not found: {:?}", not_found)
}
check_deps(&repo.changes, &to_upload, &u)?;
u
} else if self.all {
to_upload
} else {
let mut o = make_changelist(&repo.changes, &to_upload, "push")?;
loop {
let d = parse_changelist(&edit::edit_bytes(&o[..])?, &to_upload);
let comp = complete_deps(&repo.changes, Some(&to_upload), &d)?;
if comp.len() == d.len() {
break comp;
}
o = make_changelist(&repo.changes, &comp, "push")?
}
};
debug!("to_upload = {:?}", to_upload);
if to_upload.is_empty() {
writeln!(stderr, "Nothing to push")?;
txn.commit()?;
return Ok(());
}
remote
.upload_changes(
&mut *txn.write(),
repo.changes_dir.clone(),
push_channel,
&to_upload,
)
.await?;
txn.commit()?;
remote.finish().await?;
Ok(())
}
}
impl Pull {
async fn to_download(
&self,
txn: &mut MutTxn<()>,
channel: &mut ChannelRef<MutTxn<()>>,
repo: &mut Repository,
remote: &mut RemoteRepo,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
let force_cache = if self.force_cache {
Some(self.force_cache)
} else {
None
};
let delta = remote
.update_changelist_pushpull(
txn,
&self.path,
channel,
force_cache,
repo,
self.changes.as_slice(),
true,
)
.await?;
let to_download = remote
.pull(
repo,
txn,
channel,
delta.to_download.as_slice(),
&delta.inodes,
false,
)
.await?;
Ok(RemoteDelta {
to_download,
..delta
})
}
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path.clone())?;
let txn = repo.pristine.arc_txn_begin()?;
let cur = txn
.read()
.current_channel()
.unwrap_or(libpijul::DEFAULT_CHANNEL)
.to_string();
let channel_name = if let Some(ref c) = self.to_channel {
c
} else {
cur.as_str()
};
let is_current_channel = channel_name == cur;
let mut channel = txn.write().open_or_create_channel(&channel_name)?;
debug!("{:?}", repo.config);
let remote_name = if let Some(ref rem) = self.from {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
bail!("Missing remote")
};
let from_channel = if let Some(ref c) = self.from_channel {
c
} else {
libpijul::DEFAULT_CHANNEL
};
let mut remote = remote::repository(
&repo,
Some(&repo.path),
None,
&remote_name,
from_channel,
self.no_cert_check,
true,
)
.await?;
debug!("downloading");
let RemoteDelta {
inodes,
remote_ref,
mut to_download,
remote_unrecs,
..
} = self
.to_download(&mut *txn.write(), &mut channel, &mut repo, &mut remote)
.await?;
let hash = super::pending(txn.clone(), &mut channel, &mut repo)?;
if let Some(ref r) = remote_ref {
remote.update_identities(&mut repo, r).await?;
}
notify_remote_unrecords(&repo, remote_unrecs.as_slice());
if to_download.is_empty() {
let mut stderr = std::io::stderr();
writeln!(stderr, "Nothing to pull")?;
if let Some(ref h) = hash {
txn.write().unrecord(&repo.changes, &mut channel, h, 0)?;
}
txn.commit()?;
return Ok(());
}
if self.changes.is_empty() {
if !self.all {
let mut o = make_changelist(&repo.changes, &to_download, "pull")?;
to_download = loop {
let d = parse_changelist(&edit::edit_bytes(&o[..])?, &to_download);
let comp = complete_deps(&repo.changes, Some(&to_download), &d)?;
if comp.len() == d.len() {
break comp;
}
o = make_changelist(&repo.changes, &comp, "pull")?
};
}
} else {
to_download = complete_deps(&repo.changes, None, &to_download)?;
}
{
let mut ws = libpijul::ApplyWorkspace::new();
debug!("to_download = {:#?}", to_download);
let apply_bar = ProgressBar::new(to_download.len() as u64, APPLY_MESSAGE)?;
let mut channel = channel.write();
let mut txn = txn.write();
for h in to_download.iter().rev() {
match h {
CS::Change(h) => {
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
CS::State(s) => {
if let Some(n) = txn.channel_has_state(&channel.states, &s.into())? {
txn.put_tags(&mut channel.tags, n.into(), s)?;
} else {
bail!(
"Cannot add tag {}: channel {:?} does not have that state",
s.to_base32(),
channel.name
)
}
}
}
apply_bar.inc(1);
}
}
debug!("completing changes");
remote
.complete_changes(&repo, &*txn.read(), &mut channel, &to_download, self.full)
.await?;
remote.finish().await?;
debug!("inodes = {:?}", inodes);
debug!("to_download: {:?}", to_download.len());
let mut touched = HashSet::new();
let txn_ = txn.read();
for d in to_download.iter() {
debug!("to_download {:?}", d);
match d {
CS::Change(d) => {
if let Some(int) = txn_.get_internal(&d.into())? {
for inode in txn_.iter_rev_touched(int)? {
let (int_, inode) = inode?;
if int_ < int {
continue;
} else if int_ > int {
break;
}
let ext = libpijul::pristine::Position {
change: txn_.get_external(&inode.change)?.unwrap().into(),
pos: inode.pos,
};
if inodes.is_empty() || inodes.contains(&ext) {
touched.insert(*inode);
}
}
}
}
CS::State(_) => {
}
}
}
std::mem::drop(txn_);
if is_current_channel {
let mut touched_paths = BTreeSet::new();
{
let txn_ = txn.read();
for &i in touched.iter() {
if let Some((path, _)) =
libpijul::fs::find_path(&repo.changes, &*txn_, &*channel.read(), false, i)?
{
touched_paths.insert(path);
} else {
touched_paths.clear();
break;
}
}
}
if touched_paths.is_empty() {
touched_paths.insert(String::from(""));
}
let mut last: Option<&str> = None;
let mut conflicts = Vec::new();
let _output_spinner = Spinner::new(OUTPUT_MESSAGE);
for path in touched_paths.iter() {
match last {
Some(last_path) => {
if last_path.len() < path.len() {
let (pre_last, post_last) = path.split_at(last_path.len());
if pre_last == last_path && post_last.starts_with("/") {
continue;
}
}
}
_ => (),
}
debug!("path = {:?}", path);
conflicts.extend(
libpijul::output::output_repository_no_pending(
&repo.working_copy,
&repo.changes,
&txn,
&channel,
path,
true,
None,
std::thread::available_parallelism()?.get(),
0,
)?
.into_iter(),
);
last = Some(path)
}
super::print_conflicts(&conflicts)?;
}
if let Some(h) = hash {
txn.write().unrecord(&repo.changes, &mut channel, &h, 0)?;
repo.changes.del_change(&h)?;
}
txn.commit()?;
Ok(())
}
}
fn complete_deps<C: ChangeStore>(
c: &C,
original: Option<&[CS]>,
now: &[CS],
) -> Result<Vec<CS>, anyhow::Error> {
debug!("complete deps {:?} {:?}", original, now);
let original_: Option<HashSet<_>> = original.map(|original| original.iter().collect());
let now_: HashSet<_> = now.iter().cloned().collect();
let mut result = Vec::with_capacity(original.unwrap_or(now).len());
let mut result_h = HashSet::with_capacity(original.unwrap_or(now).len());
let mut stack: Vec<_> = now.iter().rev().cloned().collect();
while let Some(h) = stack.pop() {
stack.push(h);
let l0 = stack.len();
let hh = if let CS::Change(h) = h {
h
} else {
stack.pop();
result.push(h);
continue;
};
for d in c.get_dependencies(&hh)? {
let is_missing =
now_.get(&CS::Change(d)).is_none() && result_h.get(&CS::Change(d)).is_none();
debug!("complete_deps {:?} {:?}", d, is_missing);
let is_missing = if let Some(ref original) = original_ {
original.get(&CS::Change(d)).is_some() && is_missing
} else {
is_missing
};
if is_missing {
stack.push(CS::Change(d));
}
}
if stack.len() == l0 {
stack.pop();
debug!("all deps, push");
if result_h.insert(h) {
result.push(h);
}
}
}
debug!("result {:?}", result);
Ok(result)
}
fn check_deps<C: ChangeStore>(c: &C, original: &[CS], now: &[CS]) -> Result<(), anyhow::Error> {
let original_: HashSet<_> = original.iter().collect();
let now_: HashSet<_> = now.iter().collect();
for n in now {
let n = if let CS::Change(n) = n { n } else { continue };
for d in c.get_dependencies(n)? {
if original_.get(&CS::Change(d)).is_some() && now_.get(&CS::Change(d)).is_none() {
bail!("Missing dependency: {:?}", n)
}
}
}
Ok(())
}
fn notify_remote_unrecords(repo: &Repository, remote_unrecs: &[(u64, pijul_remote::CS)]) {
use std::fmt::Write;
if !remote_unrecs.is_empty() {
let mut s = format!(
"# The following changes have been unrecorded in the remote.\n\
# This buffer is only being used to inform you of the remote change;\n\
# your push will continue when it is closed.\n"
);
for (_, hash) in remote_unrecs {
let header = match hash {
CS::Change(hash) => repo.changes.get_header(hash).unwrap(),
CS::State(hash) => repo.changes.get_tag_header(hash).unwrap(),
};
s.push_str("#\n");
writeln!(&mut s, "# {}", header.message).unwrap();
writeln!(&mut s, "# {}", header.timestamp).unwrap();
match hash {
CS::Change(hash) => {
writeln!(&mut s, "# {}", hash.to_base32()).unwrap();
}
CS::State(hash) => {
writeln!(&mut s, "# {}", hash.to_base32()).unwrap();
}
}
}
if let Err(e) = edit::edit(s.as_str()) {
log::error!(
"Notification of remote unrecords experienced an error: {}",
e
);
}
}
}
fn notify_unknown_changes(unknown_changes: &[pijul_remote::CS]) {
use std::fmt::Write;
if unknown_changes.is_empty() {
return;
} else {
let mut s = format!(
"# The following changes are new in the remote\n# (and are not yet known to your local copy):\n#\n"
);
let rest_len = unknown_changes.len().saturating_sub(5);
for hash in unknown_changes.iter().take(5) {
let hash = match hash {
CS::Change(hash) => hash.to_base32(),
CS::State(hash) => hash.to_base32(),
};
writeln!(&mut s, "# {}", hash).expect("Infallible write to String");
}
if rest_len > 0 {
let plural = if rest_len == 1 { "" } else { "s" };
writeln!(&mut s, "# ... plus {} more change{}", rest_len, plural)
.expect("Infallible write to String");
}
if let Err(e) = edit::edit(s.as_str()) {
log::error!(
"Notification of unknown changes experienced an error: {}",
e
);
}
}
}