use std::collections::HashSet;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context};
use async_trait::async_trait;
use lazy_static::lazy_static;
use libpijul::pristine::{
sanakirja::MutTxn, Base32, ChangeId, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef,
TxnT,
};
use libpijul::DOT_DIR;
use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
use log::{debug, info};
use pijul_config::*;
use pijul_identity::Complete;
use pijul_repository::*;
pub mod ssh;
use ssh::*;
pub mod local;
use local::*;
pub mod http;
use http::*;
use pijul_interaction::{
ProgressBar, Spinner, APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, UPLOAD_MESSAGE,
};
pub const PROTOCOL_VERSION: usize = 3;
pub enum RemoteRepo {
Local(Local),
Ssh(Ssh),
Http(Http),
LocalChannel(String),
None,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CS {
Change(Hash),
State(Merkle),
}
pub async fn repository(
repo: &Repository,
self_path: Option<&Path>,
user: Option<&str>,
name: &str,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if let Some(name) = repo.config.remotes.iter().find(|e| e.name() == name) {
name.to_remote(channel, no_cert_check, with_path).await
} else {
unknown_remote(self_path, user, name, channel, no_cert_check, with_path).await
}
}
pub async fn prove(
identity: &Complete,
origin: Option<&str>,
no_cert_check: bool,
) -> Result<(), anyhow::Error> {
let remote = origin.unwrap_or(&identity.config.author.origin);
let mut stderr = std::io::stderr();
writeln!(
stderr,
"Linking identity `{}` with {}@{}",
&identity.name, &identity.config.author.username, remote
)?;
let mut remote = if let Ok(repo) = Repository::find_root(None) {
repository(
&repo,
None,
Some(&identity.config.author.username),
&remote,
libpijul::DEFAULT_CHANNEL,
no_cert_check,
false,
)
.await?
} else {
unknown_remote(
None,
Some(&identity.config.author.username),
&remote,
libpijul::DEFAULT_CHANNEL,
no_cert_check,
false,
)
.await?
};
let (key, _password) = identity
.credentials
.clone()
.unwrap()
.decrypt(&identity.name)?;
remote.prove(key).await?;
Ok(())
}
#[async_trait]
pub trait ToRemote {
async fn to_remote(
&self,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error>;
}
#[async_trait]
impl ToRemote for RemoteConfig {
async fn to_remote(
&self,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
match self {
RemoteConfig::Ssh { ssh, .. } => {
if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = sshr.connect(ssh, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", ssh)
}
RemoteConfig::Http {
http,
headers,
name,
} => {
let mut h = Vec::new();
for (k, v) in headers.iter() {
match v {
RemoteHttpHeader::String(s) => {
h.push((k.clone(), s.clone()));
}
RemoteHttpHeader::Shell(shell) => {
h.push((k.clone(), shell_cmd(&shell.shell)?));
}
}
}
return Ok(RemoteRepo::Http(Http {
url: http.parse().unwrap(),
channel: channel.to_string(),
client: reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(no_cert_check)
.build()?,
headers: h,
name: name.to_string(),
}));
}
}
}
}
pub async fn unknown_remote(
self_path: Option<&Path>,
user: Option<&str>,
name: &str,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if let Ok(url) = url::Url::parse(name) {
let scheme = url.scheme();
if scheme == "http" || scheme == "https" {
debug!("unknown_remote, http = {:?}", name);
return Ok(RemoteRepo::Http(Http {
url,
channel: channel.to_string(),
client: reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(no_cert_check)
.build()?,
headers: Vec::new(),
name: name.to_string(),
}));
} else if scheme == "ssh" {
if let Some(mut ssh) = ssh_remote(user, name, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = ssh.connect(name, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", name)
} else {
bail!("Remote scheme not supported: {:?}", scheme)
}
}
if let Ok(root) = std::fs::canonicalize(name) {
if let Some(path) = self_path {
let path = std::fs::canonicalize(path)?;
if path == root {
return Ok(RemoteRepo::LocalChannel(channel.to_string()));
}
}
let mut dot_dir = root.join(DOT_DIR);
let changes_dir = dot_dir.join(CHANGES_DIR);
dot_dir.push(PRISTINE_DIR);
debug!("dot_dir = {:?}", dot_dir);
match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
Ok(pristine) => {
debug!("pristine done");
return Ok(RemoteRepo::Local(Local {
root: Path::new(name).to_path_buf(),
channel: channel.to_string(),
changes_dir,
pristine: Arc::new(pristine),
name: name.to_string(),
}));
}
Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
sanakirja::Error::IO(e),
)) if e.kind() == std::io::ErrorKind::NotFound => {
debug!("repo not found")
}
Err(e) => return Err(e.into()),
}
}
if let Some(mut ssh) = ssh_remote(user, name, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = ssh.connect(name, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", name)
}
pub fn get_local_inodes(
txn: &mut MutTxn<()>,
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
path: &[String],
) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
let mut paths = HashSet::new();
for path in path.iter() {
let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
if ambiguous {
bail!("Ambiguous path: {:?}", path)
}
paths.insert(p);
paths.extend(
libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
);
}
Ok(paths)
}
pub struct PushDelta {
pub to_upload: Vec<CS>,
pub remote_unrecs: Vec<(u64, CS)>,
pub unknown_changes: Vec<CS>,
}
pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
pub inodes: HashSet<Position<Hash>>,
pub to_download: Vec<CS>,
pub remote_ref: Option<RemoteRef<T>>,
pub ours_ge_dichotomy_set: HashSet<CS>,
pub theirs_ge_dichotomy_set: HashSet<CS>,
pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
pub remote_unrecs: Vec<(u64, CS)>,
}
impl RemoteDelta<MutTxn<()>> {
pub fn to_local_channel_push(
self,
remote_channel: &str,
txn: &mut MutTxn<()>,
path: &[String],
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
) -> Result<PushDelta, anyhow::Error> {
let mut to_upload = Vec::new();
let inodes = get_local_inodes(txn, channel, repo, path)?;
for x in txn.reverse_log(&*channel.read(), None)? {
let (_, (h, _)) = x?;
if let Some(channel) = txn.load_channel(remote_channel)? {
let channel = channel.read();
let h_int = txn.get_internal(h)?.unwrap();
if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
if inodes.is_empty() {
to_upload.push(CS::Change(h.into()))
} else {
for p in inodes.iter() {
if txn.get_touched_files(p, Some(h_int))?.is_some() {
to_upload.push(CS::Change(h.into()));
break;
}
}
}
}
}
}
assert!(self.ours_ge_dichotomy_set.is_empty());
assert!(self.theirs_ge_dichotomy_set.is_empty());
let d = PushDelta {
to_upload: to_upload.into_iter().rev().collect(),
remote_unrecs: self.remote_unrecs,
unknown_changes: Vec::new(),
};
assert!(d.remote_unrecs.is_empty());
Ok(d)
}
pub fn to_remote_push(
self,
txn: &mut MutTxn<()>,
path: &[String],
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
) -> Result<PushDelta, anyhow::Error> {
let mut to_upload = Vec::new();
let inodes = get_local_inodes(txn, channel, repo, path)?;
if let Some(ref remote_ref) = self.remote_ref {
let mut tags: HashSet<Merkle> = HashSet::new();
for x in txn.rev_iter_tags(&channel.read().tags, None)? {
let (n, m) = x?;
debug!("rev_iter_tags {:?} {:?}", n, m);
if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
if p.b == m.b {
debug!("the remote has tag {:?}", p.a);
break;
}
if p.a != m.a {
}
} else {
tags.insert(m.a.into());
}
}
debug!("tags = {:?}", tags);
for x in txn.reverse_log(&*channel.read(), None)? {
let (_, (h, m)) = x?;
let h_unrecorded = self
.remote_unrecs
.iter()
.any(|(_, hh)| hh == &CS::Change(h.into()));
if !h_unrecorded {
if txn.remote_has_state(remote_ref, &m)?.is_some() {
debug!("remote_has_state: {:?}", m);
break;
}
}
let h_int = txn.get_internal(h)?.unwrap();
let h_deser = Hash::from(h);
if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
&& !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
{
if inodes.is_empty() {
if tags.remove(&m.into()) {
to_upload.push(CS::State(m.into()));
}
to_upload.push(CS::Change(h_deser));
} else {
for p in inodes.iter() {
if txn.get_touched_files(p, Some(h_int))?.is_some() {
to_upload.push(CS::Change(h_deser));
if tags.remove(&m.into()) {
to_upload.push(CS::State(m.into()));
}
break;
}
}
}
}
}
for t in tags.iter() {
if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
if !txn.is_tagged(&remote_ref.lock().tags, n)? {
to_upload.push(CS::State(*t));
}
} else {
debug!("the remote doesn't have state {:?}", t);
}
}
}
let mut unknown_changes = Vec::new();
for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
let change = CS::Change(*h);
if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
unknown_changes.push(change)
}
if *is_tag {
let m_is_known = if let Some(n) = txn
.channel_has_state(txn.states(&*channel.read()), &m.into())
.unwrap()
{
txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
} else {
false
};
if !m_is_known {
unknown_changes.push(CS::State(*m))
}
}
}
Ok(PushDelta {
to_upload: to_upload.into_iter().rev().collect(),
remote_unrecs: self.remote_unrecs,
unknown_changes,
})
}
}
pub fn update_changelist_local_channel(
remote_channel: &str,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
specific_changes: &[String],
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
if !specific_changes.is_empty() {
let mut to_download = Vec::new();
for h in specific_changes {
let h = txn.hash_from_prefix(h)?.0;
if txn.get_revchanges(current_channel, &h)?.is_none() {
to_download.push(CS::Change(h));
}
}
Ok(RemoteDelta {
inodes: HashSet::new(),
to_download,
remote_ref: None,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy: Vec::new(),
theirs_ge_dichotomy_set: HashSet::new(),
remote_unrecs: Vec::new(),
})
} else {
let mut inodes = HashSet::new();
let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
let mut to_download = Vec::new();
inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
change: txn.get_external(&x.change).unwrap().unwrap().into(),
pos: x.pos,
}));
if let Some(remote_channel) = txn.load_channel(remote_channel)? {
let remote_channel = remote_channel.read();
for x in txn.reverse_log(&remote_channel, None)? {
let (_, (h, m)) = x?;
if txn
.channel_has_state(txn.states(&*current_channel.read()), &m)?
.is_some()
{
break;
}
let h_int = txn.get_internal(h)?.unwrap();
if txn
.get_changeset(txn.changes(&*current_channel.read()), h_int)?
.is_none()
{
if inodes_.is_empty()
|| inodes_.iter().any(|&inode| {
txn.get_rev_touched_files(h_int, Some(&inode))
.unwrap()
.is_some()
})
{
to_download.push(CS::Change(h.into()));
}
}
}
}
Ok(RemoteDelta {
inodes,
to_download,
remote_ref: None,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy: Vec::new(),
theirs_ge_dichotomy_set: HashSet::new(),
remote_unrecs: Vec::new(),
})
}
}
impl RemoteRepo {
fn name(&self) -> Option<&str> {
match *self {
RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
RemoteRepo::Local(ref l) => Some(l.name.as_str()),
RemoteRepo::Http(ref h) => Some(h.name.as_str()),
RemoteRepo::LocalChannel(_) => None,
RemoteRepo::None => unreachable!(),
}
}
pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
match *self {
RemoteRepo::Ssh(ref s) => {
if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
Ok(Some(s.name.split_at(sep + 1).1.to_string()))
} else {
Ok(Some(s.name.as_str().to_string()))
}
}
RemoteRepo::Local(ref l) => {
if let Some(file) = l.root.file_name() {
Ok(Some(
file.to_str()
.context("failed to decode local repository name")?
.to_string(),
))
} else {
Ok(None)
}
}
RemoteRepo::Http(ref h) => {
if let Some(name) = libpijul::path::file_name(h.url.path()) {
if !name.trim().is_empty() {
return Ok(Some(name.trim().to_string()));
}
}
Ok(h.url.host().map(|h| h.to_string()))
}
RemoteRepo::LocalChannel(_) => Ok(None),
RemoteRepo::None => unreachable!(),
}
}
pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
if let RemoteRepo::Ssh(s) = self {
s.finish().await?
}
Ok(())
}
pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
&mut self,
txn: &mut T,
path: &[String],
) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
debug!("update_changelist");
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(None);
};
let mut remote = if let Some(name) = self.name() {
txn.open_or_create_remote(id, name)?
} else {
return Ok(None);
};
let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
debug!("update changelist {:?}", n);
let v: Vec<_> = txn
.iter_remote(&remote.lock().remote, n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
let k = (*k.unwrap().0).into();
if k >= n {
Some(k)
} else {
None
}
})
.collect();
for k in v {
debug!("deleting {:?}", k);
txn.del_remote(&mut remote, k)?;
}
let v: Vec<_> = txn
.iter_tags(&remote.lock().tags, n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
let k = (*k.unwrap().0).into();
if k >= n {
Some(k)
} else {
None
}
})
.collect();
for k in v {
debug!("deleting {:?}", k);
txn.del_tags(&mut remote.lock().tags, k)?;
}
debug!("deleted");
let paths = self.download_changelist(txn, &mut remote, n, path).await?;
Ok(Some((paths, remote)))
}
async fn update_changelist_pushpull_from_scratch(
&mut self,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
debug!("no id, starting from scratch");
let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
let mut theirs_ge_dichotomy_set = HashSet::new();
let mut to_download = Vec::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if txn.get_revchanges(current_channel, h)?.is_none() {
to_download.push(CS::Change(*h));
}
if *is_tag {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
if !txn.is_tagged(txn.tags(&*ch), n.into())? {
to_download.push(CS::State(*m));
}
} else {
to_download.push(CS::State(*m));
}
}
}
Ok(RemoteDelta {
inodes,
remote_ref: None,
to_download,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs: Vec::new(),
})
}
pub async fn update_changelist_pushpull(
&mut self,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
force_cache: Option<bool>,
repo: &Repository,
specific_changes: &[String],
is_pull: bool,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
debug!("update_changelist_pushpull");
if let RemoteRepo::LocalChannel(c) = self {
return update_changelist_local_channel(
c,
txn,
path,
current_channel,
repo,
specific_changes,
);
}
let id = if let Some(id) = self.get_id(txn).await? {
debug!("id = {:?}", id);
id
} else {
return self
.update_changelist_pushpull_from_scratch(txn, path, current_channel)
.await;
};
let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
let ours_ge_dichotomy: Vec<(u64, CS)> = txn
.iter_remote(&remote_ref.lock().remote, dichotomy_n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
match k.unwrap() {
(k, libpijul::pristine::Pair { a: hash, .. }) => {
let (k, hash) = (u64::from(*k), Hash::from(*hash));
if k >= dichotomy_n {
Some((k, CS::Change(hash)))
} else {
None
}
}
}
})
.collect();
let (inodes, theirs_ge_dichotomy) =
self.download_changelist_nocache(dichotomy_n, path).await?;
debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
let ours_ge_dichotomy_set = ours_ge_dichotomy
.iter()
.map(|(_, h)| h)
.copied()
.collect::<HashSet<CS>>();
let mut theirs_ge_dichotomy_set = HashSet::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if *is_tag {
theirs_ge_dichotomy_set.insert(CS::State(*m));
}
}
let remote_unrecs = remote_unrecs(
txn,
current_channel,
&ours_ge_dichotomy,
&theirs_ge_dichotomy_set,
)?;
let should_cache = if let Some(true) = force_cache {
true
} else {
remote_unrecs.is_empty()
};
debug!(
"should_cache = {:?} {:?} {:?}",
force_cache, remote_unrecs, should_cache
);
if should_cache {
use libpijul::ChannelMutTxnT;
for (k, t) in ours_ge_dichotomy.iter().copied() {
match t {
CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
CS::Change(_) => {
txn.del_remote(&mut remote_ref, k)?;
}
}
}
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
debug!("theirs: {:?} {:?} {:?}", n, h, m);
txn.put_remote(&mut remote_ref, n, (h, m))?;
if is_tag {
txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
}
}
}
if !specific_changes.is_empty() {
let to_download = specific_changes
.iter()
.map(|h| {
if is_pull {
{
if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
return Ok(CS::State(t.0));
}
}
Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
} else {
if let Ok(t) = txn.state_from_prefix(¤t_channel.read().states, h) {
Ok(CS::State(t.0))
} else {
Ok(CS::Change(txn.hash_from_prefix(h)?.0))
}
}
})
.collect::<Result<Vec<_>, anyhow::Error>>();
Ok(RemoteDelta {
inodes,
remote_ref: Some(remote_ref),
to_download: to_download?,
ours_ge_dichotomy_set,
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs,
})
} else {
let mut to_download: Vec<CS> = Vec::new();
let mut to_download_ = HashSet::new();
for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
let (_, p) = x?;
let h: Hash = p.a.into();
if txn
.channel_has_state(txn.states(¤t_channel.read()), &p.b)
.unwrap()
.is_some()
{
break;
}
if txn.get_revchanges(¤t_channel, &h).unwrap().is_none() {
let h = CS::Change(h);
if to_download_.insert(h.clone()) {
to_download.push(h);
}
}
}
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
debug!(
"update_changelist_pushpull line {}, {:?} {:?}",
line!(),
n,
h
);
let ch = CS::Change(*h);
if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {
if to_download_.insert(ch.clone()) {
to_download.push(ch.clone());
}
if *is_tag {
to_download.push(CS::State(*m));
}
} else if *is_tag {
let has_tag = if let Some(n) =
txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?
{
txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?
} else {
false
};
if !has_tag {
to_download.push(CS::State(*m));
}
}
if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
use libpijul::ChannelMutTxnT;
txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
if *is_tag {
let mut rem = remote_ref.lock();
txn.put_tags(&mut rem.tags, *n, m)?;
}
}
}
Ok(RemoteDelta {
inodes,
remote_ref: Some(remote_ref),
to_download,
ours_ge_dichotomy_set,
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs,
})
}
}
pub async fn download_changelist_nocache(
&mut self,
from: u64,
paths: &[String],
) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
let mut v = Vec::new();
let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
debug!("no cache: {:?}", h);
Ok(v.push((n, h, m, m2)))
};
let r = match *self {
RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
RemoteRepo::LocalChannel(_) => HashSet::new(),
RemoteRepo::None => unreachable!(),
};
Ok((r, v))
}
async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
&mut self,
txn: &T,
remote: &libpijul::pristine::Remote<T>,
) -> Result<u64, anyhow::Error> {
let mut a = 0;
let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
debug!("dichotomy_changelist: {:?} {:?}", u, v);
(u, (&v.b).into())
} else {
debug!("the local copy of the remote has no changes");
return Ok(0);
};
let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
v.into()
} else {
Merkle::zero()
};
debug!("last_state: {:?} {:?}", state, last_statet);
if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
debug!("remote last_state: {:?} {:?}", s, st);
if s == state && st == last_statet {
return Ok(b + 1);
}
}
while a < b {
let mid = (a + b) / 2;
let (mid, state) = {
let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
(a, b.b)
};
let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
b.b.into()
} else {
last_statet
};
let remote_state = self.get_state(txn, Some(mid)).await?;
debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
if let Some((_, remote_state, remote_statet)) = remote_state {
if remote_state == state && remote_statet == statet {
if a == mid {
return Ok(a + 1);
} else {
a = mid;
continue;
}
}
}
if b == mid {
break;
} else {
b = mid
}
}
Ok(a)
}
async fn get_state<T: libpijul::TxnTExt>(
&mut self,
txn: &T,
mid: Option<u64>,
) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => l.get_state(mid),
RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
RemoteRepo::LocalChannel(ref channel) => {
if let Some(channel) = txn.load_channel(&channel)? {
local::get_state(txn, &channel, mid)
} else {
Ok(None)
}
}
RemoteRepo::None => unreachable!(),
}
}
async fn get_id<T: libpijul::TxnTExt + 'static>(
&mut self,
txn: &T,
) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
match *self {
RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
RemoteRepo::Ssh(ref mut s) => s.get_id().await,
RemoteRepo::Http(ref h) => h.get_id().await,
RemoteRepo::LocalChannel(ref channel) => {
if let Some(channel) = txn.load_channel(&channel)? {
Ok(txn.id(&*channel.read()).cloned())
} else {
Err(anyhow::anyhow!(
"Unable to retrieve RemoteId for LocalChannel remote"
))
}
}
RemoteRepo::None => unreachable!(),
}
}
pub async fn archive<W: std::io::Write + Send + 'static>(
&mut self,
prefix: Option<String>,
state: Option<(Merkle, &[Hash])>,
umask: u16,
w: W,
) -> Result<u64, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => {
debug!("archiving local repo");
let changes = libpijul::changestore::filesystem::FileSystem::from_root(
&l.root,
pijul_repository::max_files()?,
);
let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
let conflicts = if let Some((state, extra)) = state {
let txn = l.pristine.arc_txn_begin()?;
let channel = {
let txn = txn.read();
txn.load_channel(&l.channel)?.unwrap()
};
txn.archive_with_state(&changes, &channel, &state, extra, &mut tarball, 0)?
} else {
let txn = l.pristine.arc_txn_begin()?;
let channel = {
let txn = txn.read();
txn.load_channel(&l.channel)?.unwrap()
};
txn.archive(&changes, &channel, &mut tarball)?
};
Ok(conflicts.len() as u64)
}
RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
RemoteRepo::LocalChannel(_) => unreachable!(),
RemoteRepo::None => unreachable!(),
}
}
async fn download_changelist<T: MutTxnTExt>(
&mut self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
let (ref mut txn, ref mut remote) = *a;
txn.put_remote(remote, n, (h, m))?;
if is_tag {
txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
}
Ok(())
};
match *self {
RemoteRepo::Local(ref mut l) => {
l.download_changelist(f, &mut (txn, remote), from, paths)
}
RemoteRepo::Ssh(ref mut s) => {
s.download_changelist(f, &mut (txn, remote), from, paths)
.await
}
RemoteRepo::Http(ref h) => {
h.download_changelist(f, &mut (txn, remote), from, paths)
.await
}
RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
RemoteRepo::None => unreachable!(),
}
}
pub async fn upload_changes<T: MutTxnTExt + 'static>(
&mut self,
txn: &mut T,
local: PathBuf,
to_channel: Option<&str>,
changes: &[CS],
) -> Result<(), anyhow::Error> {
let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
match self {
RemoteRepo::Local(ref mut l) => {
l.upload_changes(upload_bar, local, to_channel, changes)?
}
RemoteRepo::Ssh(ref mut s) => {
s.upload_changes(upload_bar, local, to_channel, changes)
.await?
}
RemoteRepo::Http(ref h) => {
h.upload_changes(upload_bar, local, to_channel, changes)
.await?
}
RemoteRepo::LocalChannel(ref channel) => {
let mut channel = txn.open_or_create_channel(channel)?;
let store = libpijul::changestore::filesystem::FileSystem::from_changes(
local,
pijul_repository::max_files()?,
);
local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
}
RemoteRepo::None => unreachable!(),
}
Ok(())
}
pub async fn download_changes(
&mut self,
progress_bar: ProgressBar,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
path: &mut PathBuf,
full: bool,
) -> Result<bool, anyhow::Error> {
debug!("download_changes");
match *self {
RemoteRepo::Local(ref mut l) => {
l.download_changes(progress_bar, hashes, send, path).await?
}
RemoteRepo::Ssh(ref mut s) => {
s.download_changes(progress_bar, hashes, send, path, full)
.await?
}
RemoteRepo::Http(ref mut h) => {
h.download_changes(progress_bar, hashes, send, path, full)
.await?
}
RemoteRepo::LocalChannel(_) => {
while let Some(c) = hashes.recv().await {
send.send((c, true)).await?;
}
}
RemoteRepo::None => unreachable!(),
}
Ok(true)
}
pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
&mut self,
repo: &mut Repository,
remote: &RemoteRef<T>,
) -> Result<(), anyhow::Error> {
debug!("Downloading identities");
let mut id_path = repo.path.clone();
id_path.push(DOT_DIR);
id_path.push("identities");
let rev = None;
let r = match *self {
RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
RemoteRepo::LocalChannel(_) => 0,
RemoteRepo::None => unreachable!(),
};
remote.set_id_revision(r);
Ok(())
}
pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
match *self {
RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
RemoteRepo::Http(ref mut h) => h.prove(key).await,
RemoteRepo::None => unreachable!(),
_ => Ok(()),
}
}
pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
to_apply: &[CS],
inodes: &HashSet<Position<Hash>>,
do_apply: bool,
) -> Result<Vec<CS>, anyhow::Error> {
let apply_len = to_apply.len() as u64;
let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
let apply_bar = if do_apply {
Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
} else {
None
};
let (mut send, recv) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
let mut change_path_ = repo.path.clone();
change_path_.push(DOT_DIR);
change_path_.push("changes");
let cloned_download_bar = download_bar.clone();
let t = tokio::spawn(async move {
self_
.download_changes(
cloned_download_bar,
&mut hash_recv,
&mut send,
&mut change_path_,
false,
)
.await?;
Ok::<_, anyhow::Error>(self_)
});
let mut change_path_ = repo.changes_dir.clone();
let mut waiting = 0;
let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
let mut asked = HashSet::new();
for h in to_apply {
debug!("to_apply {:?}", h);
match h {
CS::Change(h) => {
libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
}
CS::State(h) => {
libpijul::changestore::filesystem::push_tag_filename(&mut change_path_, h);
}
}
asked.insert(*h);
hash_send.send(*h)?;
waiting += 1;
libpijul::changestore::filesystem::pop_filename(&mut change_path_);
}
let u = self
.download_changes_rec(
repo,
hash_send,
recv,
send_ready,
download_bar,
waiting,
asked,
)
.await?;
let mut ws = libpijul::ApplyWorkspace::new();
let mut to_apply_inodes = HashSet::new();
while let Some(h) = recv_ready.recv().await {
debug!("to_apply: {:?}", h);
let touches_inodes = inodes.is_empty()
|| {
debug!("inodes = {:?}", inodes);
use libpijul::changestore::ChangeStore;
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
})
})
} else {
false
}
}
|| { inodes.iter().any(|i| CS::Change(i.change) == h) };
if touches_inodes {
to_apply_inodes.insert(h);
} else {
continue;
}
if let Some(apply_bar) = apply_bar.clone() {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
}
debug!("applied");
} else {
debug!("not applying {:?}", h)
}
}
let mut result = Vec::with_capacity(to_apply_inodes.len());
for h in to_apply {
if to_apply_inodes.contains(&h) {
result.push(*h)
}
}
debug!("finished");
debug!("waiting for spawned process");
*self = t.await??;
u.await??;
Ok(result)
}
async fn download_changes_rec(
&mut self,
repo: &mut Repository,
send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
send_ready: tokio::sync::mpsc::Sender<CS>,
progress_bar: ProgressBar,
mut waiting: usize,
mut asked: HashSet<CS>,
) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
let mut change_path = repo.changes_dir.clone();
let mut dep_path = repo.changes_dir.clone();
let changes = repo.changes.clone();
let t = tokio::spawn(async move {
if waiting == 0 {
return Ok(());
}
let mut ready = Vec::new();
while let Some((hash, follow)) = recv_signal.recv().await {
debug!("received {:?} {:?}", hash, follow);
if let CS::Change(hash) = hash {
waiting -= 1;
if follow {
libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
std::fs::create_dir_all(change_path.parent().unwrap())?;
use libpijul::changestore::ChangeStore;
let mut needs_dep = false;
for dep in changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
libpijul::changestore::filesystem::push_filename(&mut dep_path, &dep);
let has_dep = std::fs::metadata(&dep_path).is_ok();
libpijul::changestore::filesystem::pop_filename(&mut dep_path);
if !has_dep {
needs_dep = true;
if asked.insert(CS::Change(dep)) {
progress_bar.inc(1);
send_hash.send(CS::Change(dep))?;
waiting += 1
}
}
}
libpijul::changestore::filesystem::pop_filename(&mut change_path);
if !needs_dep {
send_ready.send(CS::Change(hash)).await?;
} else {
ready.push(CS::Change(hash))
}
} else {
send_ready.send(CS::Change(hash)).await?;
}
}
if waiting == 0 {
break;
}
}
info!("waiting loop done");
for r in ready {
send_ready.send(r).await?;
}
std::mem::drop(recv_signal);
Ok(())
});
Ok(t)
}
pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
tag: &[Hash],
) -> Result<(), anyhow::Error> {
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let mut change_path_ = repo.changes_dir.clone();
let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
let cloned_download_bar = download_bar.clone();
let t = tokio::spawn(async move {
self_
.download_changes(
cloned_download_bar,
&mut recv_hash,
&mut send_signal,
&mut change_path_,
false,
)
.await?;
Ok(self_)
});
let mut waiting = 0;
let mut asked = HashSet::new();
for &h in tag.iter() {
waiting += 1;
send_hash.send(CS::Change(h))?;
asked.insert(CS::Change(h));
}
let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
let u = self
.download_changes_rec(
repo,
send_hash,
recv_signal,
send_ready,
download_bar,
waiting,
asked,
)
.await?;
let mut hashes = Vec::new();
let mut ws = libpijul::ApplyWorkspace::new();
{
let mut channel_ = channel.write();
while let Some(hash) = recv_ready.recv().await {
if let CS::Change(ref hash) = hash {
txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
}
hashes.push(hash);
}
}
let r: Result<_, anyhow::Error> = t.await?;
*self = r?;
u.await??;
self.complete_changes(repo, txn, channel, &hashes, false)
.await?;
Ok(())
}
pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
state: Merkle,
) -> Result<(), anyhow::Error> {
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(());
};
self.update_changelist(txn, &[]).await?;
let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let mut to_pull = Vec::new();
let mut found = false;
for x in txn.iter_remote(&remote.lock().remote, 0)? {
let (n, p) = x?;
debug!("{:?} {:?}", n, p);
to_pull.push(CS::Change(p.a.into()));
if p.b == state {
found = true;
break;
}
}
if !found {
bail!("State not found: {:?}", state)
}
self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
.await?;
self.update_identities(repo, &remote).await?;
self.complete_changes(repo, txn, channel, &to_pull, false)
.await?;
Ok(())
}
pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
&mut self,
repo: &pijul_repository::Repository,
txn: &T,
local_channel: &mut ChannelRef<T>,
changes: &[CS],
full: bool,
) -> Result<(), anyhow::Error> {
debug!("complete changes {:?}", changes);
use libpijul::changestore::ChangeStore;
let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let mut changes_dir = repo.changes_dir.clone();
let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
tokio::spawn(async move {
self_
.download_changes(
download_bar,
&mut recv_hash,
&mut send_sig,
&mut changes_dir,
true,
)
.await?;
Ok::<_, anyhow::Error>(self_)
});
for c in changes {
let c = if let CS::Change(c) = c { c } else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
}
if full {
debug!("sending send_hash");
send_hash.send(CS::Change(*c))?;
debug!("sent");
continue;
}
let change = if let Some(&i) = txn.get_internal(&sc)? {
i
} else {
debug!("could not find internal for {:?}", sc);
continue;
};
let v = libpijul::pristine::Vertex {
change,
start: libpijul::pristine::ChangePosition(0u64.into()),
end: libpijul::pristine::ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
send_hash.send(CS::Change(*c))?;
break;
}
}
}
debug!("dropping send_hash");
std::mem::drop(send_hash);
while recv_sig.recv().await.is_some() {}
*self = t.await??;
Ok(())
}
pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
local_channel: &mut ChannelRef<T>,
path: &[String],
) -> Result<(), anyhow::Error> {
let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
x
} else {
bail!("Channel not found")
};
let mut pullable = Vec::new();
{
let rem = remote_changes.lock();
for x in txn.iter_remote(&rem.remote, 0)? {
let (_, p) = x?;
pullable.push(CS::Change(p.a.into()))
}
}
self.pull(repo, txn, local_channel, &pullable, &inodes, true)
.await?;
self.update_identities(repo, &remote_changes).await?;
self.complete_changes(repo, txn, local_channel, &pullable, false)
.await?;
Ok(())
}
}
use libpijul::pristine::{ChangePosition, Position};
use regex::Regex;
lazy_static! {
static ref CHANGELIST_LINE: Regex = Regex::new(
r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
)
.unwrap();
static ref PATHS_LINE: Regex =
Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
}
enum ListLine {
Change {
n: u64,
h: Hash,
m: Merkle,
tag: bool,
},
Position(Position<Hash>),
Error(String),
}
fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
debug!("data = {:?}", data);
if let Some(caps) = CHANGELIST_LINE.captures(data) {
if let (Some(h), Some(m)) = (
Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
) {
return Ok(ListLine::Change {
n: caps.name("num").unwrap().as_str().parse().unwrap(),
h,
m,
tag: caps.name("tag").is_some(),
});
}
}
if data.starts_with("error:") {
return Ok(ListLine::Error(data.split_at(6).1.to_string()));
}
if let Some(caps) = PATHS_LINE.captures(data) {
return Ok(ListLine::Position(Position {
change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
pos: ChangePosition(
caps.name("num")
.unwrap()
.as_str()
.parse::<u64>()
.unwrap()
.into(),
),
}));
}
debug!("offending line: {:?}", data);
bail!("Protocol error")
}
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
txn: &T,
current_channel: &ChannelRef<T>,
ours_ge_dichotomy: &[(u64, CS)],
theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
let mut remote_unrecs = Vec::new();
for (n, hash) in ours_ge_dichotomy {
debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
if theirs_ge_dichotomy_set.contains(hash) {
debug!("still present");
continue;
} else {
let has_it = match hash {
CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
CS::State(state) => {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
txn.is_tagged(txn.tags(&*ch), n.into())?
} else {
false
}
}
};
if has_it {
remote_unrecs.push((*n, *hash))
} else {
continue;
}
}
}
Ok(remote_unrecs)
}