SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
with import <nixpkgs> {};
pkgs.mkShell {
name = "pijul";
nativeBuildInputs = with pkgs; [
clang
pkg-config
];
buildInputs = with pkgs; [
rustc
rustfmt
llvmPackages.libclang
libsodium
openssl
xxHash
zstd
libgit2
] ++ (stdenv.lib.optionals stdenv.isDarwin [
CoreServices
Security
]);
LIBCLANG_PATH = "${llvmPackages.libclang}/lib";
}
#![recursion_limit = "256"]
extern crate proc_macro;
extern crate proc_macro2;
#[macro_use]
extern crate quote;
use proc_macro::TokenStream;
use proc_macro2::*;
use std::iter::FromIterator;
fn name_capital(name: &str) -> String {
name.chars()
.enumerate()
.map(|(i, s)| {
if i == 0 {
s.to_uppercase().nth(0).unwrap()
} else {
s
}
})
.collect()
}
#[proc_macro]
pub fn table(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
assert!(input_iter.next().is_none());
let name_capital = syn::Ident::new(&name_capital(&name), Span::call_site());
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
type #name_capital;
})
}
#[proc_macro]
pub fn sanakirja_table_get(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let name_get = syn::Ident::new(&format!("get_{}", name), Span::call_site());
let name = syn::Ident::new(&name, Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre_ = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre = if !pre_.is_empty() {
quote! {
let (key, value) = #pre_;
}
} else {
quote! {}
};
let post_ = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let post = if post_.is_empty() {
quote! { self.txn.get(&self.#name, key, value) }
} else {
quote! { self.txn.get(&self.#name, key, value) . #post_ }
};
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #name_get <'txn> (&'txn self, key: #key, value: Option<#value>) -> Option<#value> {
use ::sanakirja::Transaction;
#pre
#post
}
})
}
#[proc_macro]
pub fn sanakirja_get(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let name_capital = syn::Ident::new(&name_capital(&name), Span::call_site());
let name_get = syn::Ident::new(&format!("get_{}", name), Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #name_get(&self, db: &Self::#name_capital, key: #key, value: Option<#value>) -> Option<#value> {
use ::sanakirja::Transaction;
self.txn.get(db, key, value)
}
})
}
#[proc_macro]
pub fn table_get(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let name_get = syn::Ident::new(&format!("get_{}", name), Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #name_get<'txn>(&'txn self, key: #key, value: Option<#value>) -> Option<#value>;
})
}
#[proc_macro]
pub fn get(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let name_capital = syn::Ident::new(&name_capital(&name), Span::call_site());
let name_get = syn::Ident::new(&format!("get_{}", name), Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #name_get<'txn>(&'txn self, db: &Self::#name_capital, key: #key, value: Option<#value>) -> Option<#value>;
})
}
fn next(input_iter: &mut proc_macro2::token_stream::IntoIter) -> Vec<TokenTree> {
let mut result = Vec::new();
let mut is_first = true;
loop {
match input_iter.next() {
Some(TokenTree::Punct(p)) => {
if p.as_char() == ',' {
if !is_first {
return result;
}
} else {
result.push(TokenTree::Punct(p))
}
}
Some(e) => result.push(e),
None => return result,
}
is_first = false
}
}
#[proc_macro]
pub fn cursor(input: proc_macro::TokenStream) -> TokenStream {
cursor_(input, false, false, false)
}
#[proc_macro]
pub fn cursor_ref(input: proc_macro::TokenStream) -> TokenStream {
cursor_(input, false, false, true)
}
#[proc_macro]
pub fn iter(input: proc_macro::TokenStream) -> TokenStream {
cursor_(input, false, true, false)
}
#[proc_macro]
pub fn rev_cursor(input: proc_macro::TokenStream) -> TokenStream {
cursor_(input, true, false, false)
}
fn cursor_(input: proc_macro::TokenStream, rev: bool, iter: bool, borrow: bool) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let capital = name_capital(&name);
let cursor_name = syn::Ident::new(&format!("{}Cursor", capital,), Span::call_site());
let name_capital = syn::Ident::new(&name_capital(&name), Span::call_site());
let name_iter = syn::Ident::new(&format!("iter_{}", name), Span::call_site());
let name_next = syn::Ident::new(&format!("cursor_{}_next", name), Span::call_site());
let name_prev = syn::Ident::new(&format!("cursor_{}_prev", name), Span::call_site());
let name_cursor = syn::Ident::new(
&format!("{}cursor_{}", if rev { "rev_" } else { "" }, name),
Span::call_site(),
);
let name_cursor_ref = syn::Ident::new(
&format!("{}cursor_{}_ref", if rev { "rev_" } else { "" }, name),
Span::call_site(),
);
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let cursor_type = if rev {
quote! {
crate::pristine::RevCursor<Self, &'txn Self, Self::#cursor_name, #key, #value>
}
} else {
quote! {
crate::pristine::Cursor<Self, &'txn Self, Self::#cursor_name, #key, #value>
}
};
let def = if rev {
quote! {}
} else {
quote! {
#[doc(hidden)]
type #cursor_name;
#[doc(hidden)]
fn #name_next <'txn> (
&'txn self,
cursor: &mut Self::#cursor_name,
) -> Option<(#key, #value)>;
#[doc(hidden)]
fn #name_prev <'txn> (
&'txn self,
cursor: &mut Self::#cursor_name,
) -> Option<(#key, #value)>;
}
};
let borrow = if borrow {
quote! {
#[doc(hidden)]
fn #name_cursor_ref<RT: std::ops::Deref<Target = Self>>(
txn: RT,
db: &Self::#name_capital,
pos: Option<(#key, Option<#value>)>,
) -> crate::pristine::Cursor<Self, RT, Self::#cursor_name, #key, #value>;
}
} else {
quote! {}
};
let iter = if !iter {
quote! {}
} else {
quote! {
#[doc(hidden)]
fn #name_iter <'txn> (
&'txn self,
k: #key,
v: Option<#value>
) -> #cursor_type;
}
};
assert!(input_iter.next().is_none());
proc_macro::TokenStream::from(quote! {
#def
#[doc(hidden)]
fn #name_cursor<'txn>(
&'txn self,
db: &Self::#name_capital,
pos: Option<(#key, Option<#value>)>,
) -> #cursor_type;
#borrow
#iter
})
}
#[proc_macro]
pub fn sanakirja_cursor(input: proc_macro::TokenStream) -> TokenStream {
sanakirja_cursor_(input, false, false, false)
}
#[proc_macro]
pub fn sanakirja_cursor_ref(input: proc_macro::TokenStream) -> TokenStream {
sanakirja_cursor_(input, false, false, true)
}
#[proc_macro]
pub fn sanakirja_iter(input: proc_macro::TokenStream) -> TokenStream {
sanakirja_cursor_(input, false, true, false)
}
#[proc_macro]
pub fn sanakirja_rev_cursor(input: proc_macro::TokenStream) -> TokenStream {
sanakirja_cursor_(input, true, false, false)
}
fn sanakirja_cursor_(
input: proc_macro::TokenStream,
rev: bool,
iter: bool,
borrow: bool,
) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let cursor_name = syn::Ident::new(
&format!("{}Cursor", name_capital(&name),),
Span::call_site(),
);
let name_capital = syn::Ident::new(&name_capital(&name), Span::call_site());
let name_next = syn::Ident::new(&format!("cursor_{}_next", name), Span::call_site());
let name_prev = syn::Ident::new(&format!("cursor_{}_prev", name), Span::call_site());
let name_cursor = syn::Ident::new(
&format!("{}cursor_{}", if rev { "rev_" } else { "" }, name),
Span::call_site(),
);
let name_cursor_ref = syn::Ident::new(
&format!("{}cursor_{}_ref", if rev { "rev_" } else { "" }, name),
Span::call_site(),
);
let name_iter = syn::Ident::new(
&format!("{}iter_{}", if rev { "rev_" } else { "" }, name),
Span::call_site(),
);
let name = syn::Ident::new(&name, Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let post = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre_init = if !pre.is_empty() {
quote! { let pos = #pre; }
} else {
quote! {}
};
let post = if !post.is_empty() {
quote! { . #post }
} else {
quote! {}
};
let iter = if iter {
quote! {
#[doc(hidden)]
fn #name_iter <'txn> (
&'txn self,
k: #key,
v: Option<#value>
) -> super::Cursor<Self, &'txn Self, Self::#cursor_name, #key, #value> {
self.#name_cursor(&self.#name, Some((k, v)))
}
}
} else {
quote! {}
};
let borrow = if borrow {
quote! {
#[doc(hidden)]
fn #name_cursor_ref <RT: std::ops::Deref<Target = Self>> (
txn: RT,
db: &Self::#name_capital,
pos: Option<(#key, Option<#value>)>,
) -> super::Cursor<Self, RT, Self::#cursor_name, #key, #value> {
#pre_init
let mut cursor = txn.txn.set_cursors(&db, pos).0;
super::Cursor {
cursor,
txn,
marker: std::marker::PhantomData,
}
}
}
} else {
quote! {}
};
let result = proc_macro::TokenStream::from(if rev {
quote! {
#[doc(hidden)]
fn #name_cursor<'txn>(
&'txn self,
db: &Self::#name_capital,
pos: Option<(#key, Option<#value>)>,
) -> super::RevCursor<Self, &'txn Self, Self::#cursor_name, #key, #value> {
#pre_init
let mut cursor = if pos.is_some() {
self.txn.set_cursors(&db, pos).0
} else {
self.txn.set_cursors_last(&db)
};
super::RevCursor {
cursor,
txn: self,
marker: std::marker::PhantomData,
}
}
}
} else {
quote! {
#[doc(hidden)]
type #cursor_name = ::sanakirja::Cursor;
#[doc(hidden)]
fn #name_cursor<'txn>(
&'txn self,
db: &Self::#name_capital,
pos: Option<(#key, Option<#value>)>,
) -> super::Cursor<Self, &'txn Self, Self::#cursor_name, #key, #value> {
#pre_init
let mut cursor = self.txn.set_cursors(&db, pos).0;
super::Cursor {
cursor,
txn: self,
marker: std::marker::PhantomData,
}
}
#borrow
#[doc(hidden)]
fn #name_next <'txn> (
&'txn self,
cursor: &mut Self::#cursor_name,
) -> Option<(#key, #value)> {
(unsafe { ::sanakirja::next(&self.txn, cursor) })
#post
}
#[doc(hidden)]
fn #name_prev <'txn> (
&'txn self,
cursor: &mut Self::#cursor_name,
) -> Option<(#key, #value)> {
(unsafe { ::sanakirja::prev(&self.txn, cursor) })
#post
}
#iter
}
});
result
}
#[proc_macro]
pub fn initialized_cursor(input: proc_macro::TokenStream) -> TokenStream {
initialized_cursor_(input, false)
}
#[proc_macro]
pub fn initialized_rev_cursor(input: proc_macro::TokenStream) -> TokenStream {
initialized_cursor_(input, true)
}
fn initialized_cursor_(input: proc_macro::TokenStream, rev: bool) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let cursor_name = syn::Ident::new(
&format!("{}Cursor", name_capital(&name),),
Span::call_site(),
);
let name_next = syn::Ident::new(&format!("cursor_{}_next", name), Span::call_site());
let name_prev = syn::Ident::new(&format!("cursor_{}_prev", name), Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
if rev {
proc_macro::TokenStream::from(quote! {
impl<T: TxnT, RT: std::ops::Deref<Target = T>> Iterator for crate::pristine::RevCursor<T, RT, T::#cursor_name, #key, #value>
{
type Item = (#key, #value);
fn next(&mut self) -> Option<(#key, #value)> {
self.txn.#name_prev(&mut self.cursor)
}
}
})
} else {
proc_macro::TokenStream::from(quote! {
impl<T: TxnT, RT: std::ops::Deref<Target = T>>
crate::pristine::Cursor<T, RT, T::#cursor_name, #key, #value>
{
pub fn prev(&mut self) -> Option<(#key, #value)> {
self.txn.#name_prev(&mut self.cursor)
}
}
impl<T: TxnT, RT: std::ops::Deref<Target = T>> Iterator for crate::pristine::Cursor<T, RT, T::#cursor_name, #key, #value>
{
type Item = (#key, #value);
fn next(&mut self) -> Option<(#key, #value)> {
self.txn.#name_next(&mut self.cursor)
}
}
})
}
}
#[proc_macro]
pub fn put_del(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let put = syn::Ident::new(&format!("put_{}", name), Span::call_site());
let del = syn::Ident::new(&format!("del_{}", name), Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #put(
&mut self,
k: #key,
e: #value,
) -> Result<bool, anyhow::Error>;
#[doc(hidden)]
fn #del(
&mut self,
k: #key,
e: Option<#value>,
) -> Result<bool, anyhow::Error>;
})
}
#[proc_macro]
pub fn sanakirja_put_del(input: proc_macro::TokenStream) -> TokenStream {
let input = proc_macro2::TokenStream::from(input);
let mut input_iter = input.into_iter();
let name = match input_iter.next() {
Some(TokenTree::Ident(id)) => id.to_string(),
_ => panic!("txn_table: first argument not an identifier"),
};
let put = syn::Ident::new(&format!("put_{}", name), Span::call_site());
let del = syn::Ident::new(&format!("del_{}", name), Span::call_site());
let name = syn::Ident::new(&name, Span::call_site());
let key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre_key = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
let pre_value = proc_macro2::TokenStream::from_iter(next(&mut input_iter).into_iter());
assert!(input_iter.next().is_none());
if pre_key.is_empty() {
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #put(
&mut self,
k: #key,
v: #value,
) -> Result<bool, anyhow::Error> {
Ok(self.txn.put(&mut self.rng, &mut self.#name, k, v)?)
}
#[doc(hidden)]
fn #del(
&mut self,
k: #key,
v: Option<#value>,
) -> Result<bool, anyhow::Error> {
Ok(self.txn.del(&mut self.rng, &mut self.#name, k, v)?)
}
})
} else {
proc_macro::TokenStream::from(quote! {
#[doc(hidden)]
fn #put(
&mut self,
k: #key,
v: #value,
) -> Result<bool, anyhow::Error> {
let k = #pre_key;
let v = #pre_value;
Ok(self.txn.put(&mut self.rng, &mut self.#name, k, v)?)
}
#[doc(hidden)]
fn #del(
&mut self,
k: #key,
v: Option<#value>,
) -> Result<bool, anyhow::Error> {
let k = #pre_key;
let v = v.map(|v| #pre_value);
Ok(self.txn.del(&mut self.rng, &mut self.#name, k, v)?)
}
})
}
}
[package]
name = "pijul-macros"
description = "Macros used to write libpijul."
version = "0.1.0"
authors = ["Pierre-Étienne Meunier <pmeunier@mailbox.org>"]
edition = "2018"
repository = "https://nest.pijul.com/pijul/pijul"
license = "GPL-2.0"
include = [ "Cargo.toml", "src/lib.rs" ]
[lib]
proc-macro = true
[dependencies]
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
regex = "1.4"
use crate::{config, current_dir, Error};
use libpijul::DOT_DIR;
use std::io::Write;
use std::path::PathBuf;
pub struct Repository {
pub pristine: libpijul::pristine::sanakirja::Pristine,
pub changes: libpijul::changestore::filesystem::FileSystem,
pub working_copy: libpijul::working_copy::filesystem::FileSystem,
pub config: config::Config,
pub path: PathBuf,
pub changes_dir: PathBuf,
}
pub const PRISTINE_DIR: &'static str = "pristine";
pub const CHANGES_DIR: &'static str = "changes";
pub const CONFIG_FILE: &'static str = "config";
impl Repository {
pub fn save_config(&self) -> Result<(), anyhow::Error> {
let config = toml::to_string(&self.config)?;
let mut file = std::fs::File::create(&self.path.join(DOT_DIR).join(CONFIG_FILE))?;
file.write_all(config.as_bytes())?;
Ok(())
}
fn find_root_(cur: Option<PathBuf>, dot_dir: &str) -> Result<PathBuf, anyhow::Error> {
let mut cur = if let Some(cur) = cur {
cur
} else {
current_dir()?
};
cur.push(dot_dir);
loop {
debug!("{:?}", cur);
if std::fs::metadata(&cur).is_err() {
cur.pop();
if cur.pop() {
cur.push(DOT_DIR);
} else {
return Err(Error::NoRepoRoot.into());
}
} else {
break;
}
}
Ok(cur)
}
pub fn find_root(cur: Option<PathBuf>) -> Result<Self, anyhow::Error> {
Self::find_root_with_dot_dir(cur, DOT_DIR)
}
pub fn find_root_with_dot_dir(
cur: Option<PathBuf>,
dot_dir: &str,
) -> Result<Self, anyhow::Error> {
let cur = Self::find_root_(cur, dot_dir)?;
let mut pristine_dir = cur.clone();
pristine_dir.push(PRISTINE_DIR);
let mut changes_dir = cur.clone();
changes_dir.push(CHANGES_DIR);
let mut working_copy_dir = cur.clone();
working_copy_dir.pop();
let config_path = cur.join(CONFIG_FILE);
let config = if let Ok(config) = std::fs::read(&config_path) {
if let Ok(toml) = toml::from_slice(&config) {
toml
} else {
return Err((crate::Error::CouldNotReadConfig { path: config_path }).into());
}
} else {
config::Config::default()
};
Ok(Repository {
pristine: libpijul::pristine::sanakirja::Pristine::new(&pristine_dir)?,
working_copy: libpijul::working_copy::filesystem::FileSystem::from_root(
&working_copy_dir,
),
changes: libpijul::changestore::filesystem::FileSystem::from_root(&working_copy_dir),
config,
path: working_copy_dir,
changes_dir,
})
}
pub fn init(path: Option<std::path::PathBuf>) -> Result<Self, anyhow::Error> {
let cur = if let Some(path) = path {
path
} else {
current_dir()?
};
let mut pristine_dir = cur.clone();
pristine_dir.push(DOT_DIR);
pristine_dir.push(PRISTINE_DIR);
if std::fs::metadata(&pristine_dir).is_err() {
std::fs::create_dir_all(&pristine_dir)?;
let mut changes_dir = cur.clone();
changes_dir.push(DOT_DIR);
changes_dir.push(CHANGES_DIR);
Ok(Repository {
pristine: libpijul::pristine::sanakirja::Pristine::new(&pristine_dir)?,
working_copy: libpijul::working_copy::filesystem::FileSystem::from_root(&cur),
changes: libpijul::changestore::filesystem::FileSystem::from_root(&cur),
config: config::Config::default(),
path: cur,
changes_dir,
})
} else {
Err(Error::AlreadyInARepo.into())
}
}
}
use super::{parse_line, RemoteRef};
use crate::repository::Repository;
use crate::Error;
use byteorder::{BigEndian, ReadBytesExt};
use libpijul::pristine::{Base32, ChannelRef, Hash, Merkle, MutTxnT};
use libpijul::MutTxnTExt;
use regex::Regex;
use std::borrow::Cow;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use thrussh::client::Session;
pub struct Ssh {
pub h: thrussh::client::Handle,
pub c: thrussh::client::Channel,
pub channel: String,
pub remote_cmd: String,
pub path: String,
pub is_running: bool,
pub name: String,
}
lazy_static! {
static ref ADDRESS: Regex = Regex::new(
r#"(ssh://)?((?P<user>[^@]+)@)?((?P<host>(\[([^\]]+)\])|([^:/]+)))((:(?P<port>\d+)/)|:|/)(?P<path>.+)"#
)
.unwrap();
}
#[derive(Debug)]
pub struct Remote<'a> {
host: &'a str,
port: u16,
user: Cow<'a, str>,
path: &'a str,
addr: String,
resolved: std::net::SocketAddr,
}
pub fn ssh_remote<'a>(addr: &'a str) -> Option<Remote<'a>> {
let cap = if let Some(cap) = ADDRESS.captures(addr) {
cap
} else {
return None;
};
debug!("ssh_remote: {:?}", cap);
let user = if let Some(u) = cap.name("user") {
Cow::Borrowed(u.as_str())
} else {
Cow::Owned(whoami::username())
};
let host = cap.name("host").unwrap().as_str();
let port: u16 = cap
.name("port")
.map(|x| x.as_str().parse().unwrap())
.unwrap_or(22);
let path = cap.name("path").unwrap().as_str();
let addr = format!("{}:{}", host, port);
use std::net::ToSocketAddrs;
if let Ok(mut res) = addr.to_socket_addrs() {
let resolved = res.next().unwrap();
Some(Remote {
host,
port,
user,
path,
addr,
resolved,
})
} else {
None
}
}
impl<'a> Remote<'a> {
pub async fn connect(&self, name: &str, channel: &str) -> Result<Ssh, anyhow::Error> {
let mut home = dirs_next::home_dir().unwrap();
home.push(".ssh");
home.push("known_hosts");
let client = SshClient {
addr: self.addr.clone(),
known_hosts: home,
last_window_adjustment: SystemTime::now(),
};
let config = Arc::new(thrussh::client::Config::default());
use std::net::ToSocketAddrs;
debug!("client: {:?}", client.addr);
let addr = client.addr.to_socket_addrs()?.next().unwrap();
let mut h = thrussh::client::connect(config, &addr, client).await?;
let mut key_path = dirs_next::home_dir().unwrap().join(".ssh");
// First try agent auth
let authenticated = self.auth_agent(&mut h).await.unwrap_or(false)
|| self.auth_pk(&mut h, &mut key_path).await
|| self.auth_password(&mut h).await?;
if !authenticated {
return Err(Error::NotAuthenticated.into());
}
let c = h.channel_open_session().await?;
let remote_cmd = if let Ok(cmd) = std::env::var("REMOTE_PIJUL") {
cmd
} else {
"pijul".to_string()
};
Ok(Ssh {
h,
c,
channel: channel.to_string(),
remote_cmd,
path: self.path.to_string(),
is_running: false,
name: name.to_string(),
})
}
async fn auth_agent(&self, h: &mut thrussh::client::Handle) -> Result<bool, anyhow::Error> {
let mut authenticated = false;
let mut agent = thrussh_keys::agent::client::AgentClient::connect_env().await?;
let identities = agent.request_identities().await?;
debug!("identities = {:?}", identities);
let mut agent = Some(agent);
for key in identities {
debug!("Trying key {:?}", key);
let fingerprint = key.fingerprint();
if let Some(a) = agent.take() {
debug!("authenticate future");
match h.authenticate_future(self.user.as_ref(), key, a).await {
Ok((a, auth)) => {
if !auth {
writeln!(
std::io::stderr(),
"Key {:?} (with agent) rejected",
fingerprint
)?
}
debug!("auth");
authenticated = auth;
agent = Some(a);
}
Err(e) => {
debug!("not auth {:?}", e);
if let Ok(thrussh_keys::Error::AgentFailure) = e.downcast() {
writeln!(std::io::stderr(), "Failed to sign with agent")?;
}
}
}
}
if authenticated {
return Ok(true);
}
}
Ok(false)
}
async fn auth_pk(&self, h: &mut thrussh::client::Handle, key_path: &mut PathBuf) -> bool {
let mut authenticated = false;
for k in &["id_ed25519", "id_rsa"] {
key_path.push(k);
let k = if let Some(k) = load_secret_key(&key_path, k) {
k
} else {
key_path.pop();
continue;
};
if let Ok(auth) = h
.authenticate_publickey(self.user.as_ref(), Arc::new(k))
.await
{
authenticated = auth
}
key_path.pop();
if authenticated {
return true;
}
}
false
}
async fn auth_password(&self, h: &mut thrussh::client::Handle) -> Result<bool, anyhow::Error> {
let pass = rpassword::read_password_from_tty(Some(&format!(
"Password for {}@{}: ",
self.user, self.host
)))?;
h.authenticate_password(self.user.to_string(), &pass).await
}
}
pub fn load_secret_key(key_path: &Path, k: &str) -> Option<thrussh_keys::key::KeyPair> {
match thrussh_keys::load_secret_key(&key_path, None) {
Ok(k) => Some(k),
Err(e) => {
if let Ok(thrussh_keys::Error::KeyIsEncrypted) = e.downcast() {
let pass = if let Ok(pass) =
rpassword::read_password_from_tty(Some(&format!("Password for key {:?}: ", k)))
{
pass
} else {
return None;
};
if pass.is_empty() {
return None;
}
if let Ok(k) = thrussh_keys::load_secret_key(&key_path, Some(pass.as_bytes())) {
return Some(k);
}
}
None
}
}
}
pub struct SshClient {
addr: String,
known_hosts: PathBuf,
last_window_adjustment: SystemTime,
}
impl thrussh::client::Handler for SshClient {
type FutureBool = futures::future::Ready<Result<(Self, bool), anyhow::Error>>;
type FutureUnit = futures::future::Ready<Result<(Self, Session), anyhow::Error>>;
fn finished_bool(self, b: bool) -> Self::FutureBool {
futures::future::ready(Ok((self, b)))
}
fn finished(self, session: Session) -> Self::FutureUnit {
futures::future::ready(Ok((self, session)))
}
fn check_server_key(
self,
server_public_key: &thrussh_keys::key::PublicKey,
) -> Self::FutureBool {
let mut it = self.addr.split(':');
let addr = it.next().unwrap();
let port = it.next().unwrap_or("22").parse().unwrap();
match thrussh_keys::check_known_hosts_path(addr, port, server_public_key, &self.known_hosts)
{
Ok(e) => {
if e {
futures::future::ready(Ok((self, true)))
} else {
match learn(addr, port, server_public_key) {
Ok(x) => futures::future::ready(Ok((self, x))),
Err(e) => futures::future::ready(Err(e)),
}
}
}
Err(e) => {
error!("Key changed for {:?}", self.addr);
futures::future::ready(Err(e))
}
}
}
fn adjust_window(&mut self, _channel: thrussh::ChannelId, target: u32) -> u32 {
let elapsed = self.last_window_adjustment.elapsed().unwrap();
self.last_window_adjustment = SystemTime::now();
if target >= 10_000_000 {
return target;
}
if elapsed < Duration::from_secs(2) {
target * 2
} else if elapsed > Duration::from_secs(8) {
target / 2
} else {
target
}
}
}
fn learn(addr: &str, port: u16, pk: &thrussh_keys::key::PublicKey) -> Result<bool, anyhow::Error> {
if port == 22 {
print!(
"Unknown key for {:?}, fingerprint {:?}. Learn it (y/N)? ",
addr,
pk.fingerprint()
);
} else {
print!(
"Unknown key for {:?}:{}, fingerprint {:?}. Learn it (y/N)? ",
addr,
port,
pk.fingerprint()
);
}
std::io::stdout().flush()?;
let mut buffer = String::new();
std::io::stdin().read_line(&mut buffer)?;
let buffer = buffer.trim();
if buffer == "Y" || buffer == "y" {
thrussh_keys::learn_known_hosts(addr, port, pk)?;
Ok(true)
} else {
Ok(false)
}
}
impl Ssh {
pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
self.c.eof().await?;
while let Some(msg) = self.c.wait().await {
debug!("msg = {:?}", msg);
match msg {
thrussh::ChannelMsg::Data { .. } => {}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));
if let Ok(data) = std::str::from_utf8(&data) {
writeln!(std::io::stderr(), "{}", data)?;
}
}
thrussh::ChannelMsg::WindowAdjusted { .. } => {}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => error!("wrong message {:?}", msg),
}
}
Ok(())
}
pub async fn get_state(
&mut self,
mid: Option<u64>,
) -> Result<Option<(u64, Merkle)>, anyhow::Error> {
self.run_protocol().await?;
if let Some(mid) = mid {
self.c
.data(format!("state {} {}\n", self.channel, mid).as_bytes())
.await?;
} else {
self.c
.data(format!("state {}\n", self.channel).as_bytes())
.await?;
}
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
// If we can't parse `data` (for example if the
// remote returns the standard "-\n"), this
// returns None.
let mut s = std::str::from_utf8(&data)?.split(' ');
debug!("s = {:?}", s);
if let (Some(n), Some(m)) = (s.next(), s.next()) {
let n = n.parse().unwrap();
return Ok(Some((n, Merkle::from_base32(m.trim().as_bytes()).unwrap())));
} else {
break;
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
if ext == 1 {
debug!("{:?}", std::str::from_utf8(&data))
}
}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
}
Ok(None)
}
pub async fn archive<W: std::io::Write>(
&mut self,
prefix: Option<String>,
state: Option<(Merkle, &[Hash])>,
mut w: W,
) -> Result<u64, anyhow::Error> {
self.run_protocol().await?;
if let Some((ref state, ref extra)) = state {
let mut cmd = format!("archive {} {}", self.channel, state.to_base32(),);
for e in extra.iter() {
cmd.push_str(&format!(" {}", e.to_base32()));
}
if let Some(ref p) = prefix {
cmd.push_str(" :");
cmd.push_str(p)
}
cmd.push('\n');
self.c.data(cmd.as_bytes()).await?;
} else {
self.c
.data(
format!(
"archive {}{}{}\n",
self.channel,
if prefix.is_some() { " :" } else { "" },
prefix.unwrap_or(String::new())
)
.as_bytes(),
)
.await?;
}
let mut len = 0;
let mut conflicts = 0;
let mut len_n = 0;
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
let mut off = 0;
while len_n < 16 && off < data.len() {
if len_n < 8 {
len = (len << 8) | (data[off] as u64);
} else {
conflicts = (conflicts << 8) | (data[off] as u64);
}
len_n += 1;
off += 1;
}
if len_n >= 16 {
w.write_all(&data[off..])?;
len -= (data.len() - off) as u64;
if len == 0 {
break;
}
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
if ext == 1 {
debug!("{:?}", std::str::from_utf8(&data))
}
}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
}
Ok(conflicts)
}
pub async fn run_protocol(&mut self) -> Result<(), anyhow::Error> {
if !self.is_running {
self.is_running = true;
debug!("run_protocol");
self.c
.exec(
true,
format!(
"{} protocol --version {} --repository {}",
self.remote_cmd,
crate::PROTOCOL_VERSION,
self.path
),
)
.await?;
while let Some(msg) = self.c.wait().await {
debug!("msg = {:?}", msg);
match msg {
thrussh::ChannelMsg::Success => break,
thrussh::ChannelMsg::WindowAdjusted { .. } => {}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
_ => {}
}
}
debug!("run_protocol done");
}
Ok(())
}
pub async fn download_changelist<T: MutTxnT>(
&mut self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<(), anyhow::Error> {
self.run_protocol().await?;
debug!("download_changelist");
let mut command = Vec::new();
write!(command, "changelist {} {}", self.channel, from).unwrap();
for p in paths {
write!(command, " {}", p).unwrap()
}
command.push(b'\n');
self.c.data(&command[..]).await?;
debug!("waiting ssh");
'msg: while let Some(msg) = self.c.wait().await {
debug!("msg = {:?}", msg);
match msg {
thrussh::ChannelMsg::Data { data } => {
if &data[..] == b"\n" {
debug!("log done");
break;
} else if let Ok(data) = std::str::from_utf8(&data) {
for l in data.lines() {
if !l.is_empty() {
debug!("line = {:?}", l);
let (n, h, m) = parse_line(l)?;
txn.put_remote(remote, n, (h, m))?;
} else {
break 'msg;
}
}
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
debug!("{:?} {:?}", ext, std::str::from_utf8(&data[..]));
/*return Err((crate::Error::Remote {
msg: std::str::from_utf8(&data[..]).unwrap().to_string()
}).into())*/
}
thrussh::ChannelMsg::WindowAdjusted { .. } => {}
thrussh::ChannelMsg::Eof => {}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
return Err((Error::RemoteExit {
status: exit_status,
})
.into());
}
}
msg => panic!("wrong message {:?}", msg),
}
}
debug!("no msg");
Ok(())
}
pub async fn upload_changes(
&mut self,
mut local: PathBuf,
to_channel: Option<&str>,
changes: &[Hash],
) -> Result<(), anyhow::Error> {
self.run_protocol().await?;
debug!("upload_changes");
for c in changes {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let mut change_file = std::fs::File::open(&local)?;
let change_len = change_file.metadata()?.len();
let mut change = cryptovec::CryptoVec::new_zeroed(change_len as usize);
use std::io::Read;
change_file.read_exact(&mut change[..])?;
let to_channel = if let Some(t) = to_channel {
t
} else {
self.channel.as_str()
};
self.c
.data(format!("apply {} {} {}\n", to_channel, c.to_base32(), change_len).as_bytes())
.await?;
self.c.data(&change[..]).await?;
libpijul::changestore::filesystem::pop_filename(&mut local);
}
Ok(())
}
pub async fn start_change_download(
&mut self,
c: libpijul::pristine::Hash,
full: bool,
) -> Result<(), anyhow::Error> {
self.run_protocol().await?;
debug!("download_change {:?}", full);
if full {
self.c
.data(format!("change {}\n", c.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", c.to_base32()).as_bytes())
.await?;
}
Ok(())
}
pub async fn wait_downloads(
&mut self,
changes_dir: &Path,
hashes: &[libpijul::pristine::Hash],
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
) -> Result<(), anyhow::Error> {
debug!("wait_downloads");
if !self.is_running {
return Ok(());
}
let mut remaining_len = 0;
let mut current: usize = 0;
let mut path = changes_dir.to_path_buf();
libpijul::changestore::filesystem::push_filename(&mut path, &hashes[current]);
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("");
let mut file = std::fs::File::create(&path)?;
'outer: while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
debug!("data = {:?}", &data[..]);
let mut p = 0;
while p < data.len() {
if remaining_len == 0 {
remaining_len = (&data[p..]).read_u64::<BigEndian>().unwrap() as usize;
p += 8;
debug!("remaining_len = {:?}", remaining_len);
}
if data.len() >= p + remaining_len {
file.write_all(&data[p..p + remaining_len])?;
// We have enough data to write the
// file, write it and move to the next
// file.
p += remaining_len;
remaining_len = 0;
file.flush()?;
let mut final_path = path.clone();
final_path.set_extension("change");
debug!("moving {:?} to {:?}", path, final_path);
std::fs::rename(&path, &final_path)?;
debug!("sending");
send.send(hashes[current].clone()).await.unwrap();
debug!("sent");
current += 1;
if current < hashes.len() {
// If we're still waiting for
// another change.
libpijul::changestore::filesystem::pop_filename(&mut path);
libpijul::changestore::filesystem::push_filename(
&mut path,
&hashes[current],
);
std::fs::create_dir_all(&path.parent().unwrap())?;
path.set_extension("");
file = std::fs::File::create(&path)?;
} else {
// Else, just finish.
break 'outer;
}
} else {
// not enough data, we need more.
file.write_all(&data[p..])?;
remaining_len -= data.len() - p;
break;
}
}
}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
debug!("exit: {:?}", exit_status);
if exit_status != 0 {
error!("Remote command returned {:?}", exit_status)
}
self.is_running = false;
return Ok(());
}
msg => {
debug!("{:?}", msg);
}
}
}
debug!("done waiting for downloads");
Ok(())
}
pub async fn clone_channel<T: MutTxnTExt>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
lazy: bool,
) -> Result<(), anyhow::Error> {
self.run_protocol().await?;
self.c
.data(format!("channel {}\n", self.channel).as_bytes())
.await?;
let from_dump_alive = {
let mut from_dump =
libpijul::pristine::channel_dump::ChannelFromDump::new(txn, channel.clone());
while let Some(msg) = self.c.wait().await {
match msg {
thrussh::ChannelMsg::Data { data } => {
debug!("data = {:?}", &data[..]);
if from_dump.read(&data)? {
break;
}
}
thrussh::ChannelMsg::ExtendedData { data, ext } => {
debug!("data = {:?}, ext = {:?}", &data[..], ext);
}
thrussh::ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
error!("Remote command returned {:?}", exit_status)
}
self.is_running = false;
break;
}
msg => {
debug!("msg = {:?}", msg);
}
}
}
from_dump.alive
};
let channel_ = channel.borrow();
debug!("cloned, now downloading changes");
let mut hashes = Vec::new();
if lazy {
for &ch in from_dump_alive.iter() {
let h = txn.get_external(ch).unwrap();
self.c
.data(format!("change {}\n", h.to_base32()).as_bytes())
.await?;
hashes.push(h);
}
} else {
for (_, (ch, _)) in txn.changeid_log(&channel_, 0) {
let h = txn.get_external(ch).unwrap();
self.c
.data(format!("change {}\n", h.to_base32()).as_bytes())
.await?;
hashes.push(h);
}
}
std::mem::drop(channel_);
debug!("hashes = {:?}", hashes);
let (mut send, recv) = tokio::sync::mpsc::channel(100);
self.wait_downloads(&repo.changes_dir, &hashes, &mut send)
.await?;
txn.output_repository_no_pending(&mut repo.working_copy, &repo.changes, channel, "", true)?;
std::mem::drop(recv);
Ok(())
}
}
use crate::repository::*;
use crate::Error;
use libpijul::pristine::{Base32, ChannelRef, Hash, Merkle, MutTxnT, RemoteRef};
use libpijul::DOT_DIR;
use libpijul::{MutTxnTExt, TxnTExt};
use std::io::Write;
use std::path::{Path, PathBuf};
pub mod ssh;
use ssh::*;
pub mod local;
use local::*;
pub enum RemoteRepo {
Local(Local),
Ssh(Ssh),
Http(Http),
None,
}
pub struct Http {
pub url: String,
pub channel: String,
pub client: reqwest::Client,
pub name: String,
}
impl Repository {
pub async fn remote(
&self,
name: &str,
channel: &str,
no_cert_check: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if let Some(name) = self.config.remotes.get(name) {
unknown_remote(name, channel, no_cert_check).await
} else {
unknown_remote(name, channel, no_cert_check).await
}
}
}
pub async fn unknown_remote(
name: &str,
channel: &str,
no_cert_check: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if name.starts_with("http://") || name.starts_with("https://") {
debug!("unknown_remote, http = {:?}", name);
Ok(RemoteRepo::Http(Http {
url: name.to_string(),
channel: channel.to_string(),
client: reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(no_cert_check)
.build()?,
name: name.to_string(),
}))
} else if name.starts_with("ssh://") {
if let Some(ssh) = ssh_remote(name) {
debug!("unknown_remote, ssh = {:?}", ssh);
Ok(RemoteRepo::Ssh(ssh.connect(name, channel).await?))
} else {
Err((Error::RemoteNotFound {
remote: name.to_string(),
})
.into())
}
} else {
let mut dot_dir = Path::new(name).join(DOT_DIR);
let changes_dir = dot_dir.join(CHANGES_DIR);
dot_dir.push(PRISTINE_DIR);
debug!("dot_dir = {:?}", dot_dir);
if let Ok(pristine) = libpijul::pristine::sanakirja::Pristine::new(&dot_dir) {
debug!("pristine done");
Ok(RemoteRepo::Local(Local {
channel: channel.to_string(),
changes_dir,
pristine,
root: Path::new(name).to_path_buf(),
name: name.to_string(),
}))
} else if let Some(ssh) = ssh_remote(name) {
debug!("unknown_remote, ssh = {:?}", ssh);
Ok(RemoteRepo::Ssh(ssh.connect(name, channel).await?))
} else {
Err((Error::RemoteNotFound {
remote: name.to_string(),
})
.into())
}
}
}
impl RemoteRepo {
fn name(&self) -> &str {
match *self {
RemoteRepo::Ssh(ref s) => s.name.as_str(),
RemoteRepo::Local(ref l) => l.name.as_str(),
RemoteRepo::Http(ref h) => h.name.as_str(),
RemoteRepo::None => unreachable!(),
}
}
pub fn repo_name(&self) -> Option<String> {
match *self {
RemoteRepo::Ssh(ref s) => {
if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
Some(s.name.split_at(sep + 1).1.to_string())
} else {
Some(s.name.as_str().to_string())
}
}
RemoteRepo::Local(ref l) => {
if let Some(file) = l.root.file_name() {
Some(file.to_str().unwrap().to_string())
} else {
None
}
}
RemoteRepo::Http(ref h) => {
let url = reqwest::Url::parse(&h.url).unwrap();
if let Some(name) = libpijul::path::file_name(url.path()) {
Some(name.to_string())
} else {
url.host().map(|h| h.to_string())
}
}
RemoteRepo::None => unreachable!(),
}
}
pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
match *self {
RemoteRepo::Ssh(ref mut s) => s.finish().await?,
_ => {}
}
Ok(())
}
pub async fn update_changelist<T: MutTxnT>(
&mut self,
txn: &mut T,
path: &[String],
) -> Result<RemoteRef<T>, anyhow::Error> {
debug!("update_changelist");
let name = self.name();
let mut remote = txn.open_or_create_remote(name).unwrap();
let n = self
.dichotomy_changelist(txn, &remote.borrow().remote)
.await?;
debug!("update changelist {:?}", n);
let v: Vec<_> = txn
.iter_remote(&remote.borrow().remote, n)
.filter_map(|(k, _)| if k >= n { Some(k) } else { None })
.collect();
for k in v {
debug!("deleting {:?}", k);
txn.del_remote(&mut remote, k)?;
}
self.download_changelist(txn, &mut remote, n, path).await?;
Ok(remote)
}
async fn dichotomy_changelist<T: MutTxnT>(
&mut self,
txn: &T,
remote: &T::Remote,
) -> Result<u64, anyhow::Error> {
let mut a = 0;
let (mut b, (_, state)) = if let Some(last) = txn.last_remote(remote) {
last
} else {
debug!("the local copy of the remote has no changes");
return Ok(0);
};
if let Some((_, s)) = self.get_state(Some(b)).await? {
if s == state {
// The local list is already up to date.
return Ok(b + 1);
}
}
// Else, find the last state we have in common with the
// remote, it might be older than the last known state (if
// changes were unrecorded on the remote).
while a < b {
let mid = (a + b) / 2;
let (mid, (_, state)) = txn.get_remote_state(remote, mid).unwrap();
let remote_state = self.get_state(Some(mid)).await?;
debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
if let Some((_, remote_state)) = remote_state {
if remote_state == state {
if a == mid {
return Ok(a + 1);
} else {
a = mid;
continue;
}
}
}
if b == mid {
break;
} else {
b = mid
}
}
Ok(a)
}
async fn get_state(
&mut self,
mid: Option<u64>,
) -> Result<Option<(u64, Merkle)>, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => l.get_state(mid),
RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
RemoteRepo::Http(ref h) => {
debug!("get_state {:?}", h.url);
let url = format!("{}/{}", h.url, DOT_DIR);
let q = if let Some(mid) = mid {
[
("state", format!("{}", mid)),
("channel", h.channel.clone()),
]
} else {
[("state", String::new()), ("channel", h.channel.clone())]
};
let res = h.client.get(&url).query(&q).send().await?;
if !res.status().is_success() {
return Err((crate::Error::Http {
status: res.status(),
})
.into());
}
let resp = res.bytes().await?;
let resp = std::str::from_utf8(&resp)?;
debug!("resp = {:?}", resp);
let mut s = resp.split(' ');
if let (Some(n), Some(m)) = (
s.next().and_then(|s| s.parse().ok()),
s.next().and_then(|m| Merkle::from_base32(m.as_bytes())),
) {
Ok(Some((n, m)))
} else {
Ok(None)
}
}
RemoteRepo::None => unreachable!(),
}
}
pub async fn archive<W: std::io::Write>(
&mut self,
prefix: Option<String>,
state: Option<(Merkle, &[Hash])>,
mut w: W,
) -> Result<u64, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => {
use libpijul::pristine::TxnT;
debug!("archiving local repo");
let changes = libpijul::changestore::filesystem::FileSystem::from_root(&l.root);
let mut tarball = libpijul::output::Tarball::new(w, prefix);
let conflicts = if let Some((state, extra)) = state {
let mut txn = l.pristine.mut_txn_begin();
let mut channel = txn.load_channel(&l.channel).unwrap();
txn.archive_with_state(&changes, &mut channel, state, extra, &mut tarball)?
} else {
let txn = l.pristine.txn_begin()?;
let channel = txn.load_channel(&l.channel).unwrap();
txn.archive(&changes, &channel, &mut tarball)?
};
Ok(conflicts.len() as u64)
}
RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
RemoteRepo::Http(ref h) => {
let url = h.url.clone() + "/" + DOT_DIR;
let res = h.client.get(&url).query(&[("channel", &h.channel)]);
let res = if let Some((ref state, ref extra)) = state {
let mut q = vec![("archive".to_string(), state.to_base32())];
if let Some(pre) = prefix {
q.push(("outputPrefix".to_string(), pre));
}
for e in extra.iter() {
q.push(("change".to_string(), e.to_base32()))
}
res.query(&q)
} else {
res
};
let res = res.send().await?;
if !res.status().is_success() {
return Err((crate::Error::Http {
status: res.status(),
})
.into());
}
use futures_util::StreamExt;
let mut stream = res.bytes_stream();
let mut conflicts = 0;
let mut n = 0;
while let Some(item) = stream.next().await {
let item = item?;
let mut off = 0;
while n < 8 && off < item.len() {
conflicts = (conflicts << 8) | (item[off] as u64);
off += 1;
n += 1
}
w.write_all(&item[off..])?;
}
Ok(conflicts as u64)
}
RemoteRepo::None => unreachable!(),
}
}
async fn download_changelist<T: MutTxnT>(
&mut self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<(), anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => l.download_changelist(txn, remote, from, paths),
RemoteRepo::Ssh(ref mut s) => s.download_changelist(txn, remote, from, paths).await,
RemoteRepo::Http(ref h) => {
let url = h.url.clone() + "/" + DOT_DIR;
let from_ = from.to_string();
let mut query = vec![("changelist", &from_), ("channel", &h.channel)];
for p in paths.iter() {
query.push(("path", p));
}
let res = h.client.get(&url).query(&query).send().await?;
if !res.status().is_success() {
return Err((crate::Error::Http {
status: res.status(),
})
.into());
}
let resp = res.bytes().await?;
if let Ok(data) = std::str::from_utf8(&resp) {
for l in data.lines() {
if !l.is_empty() {
let (n, h, m) = parse_line(l)?;
txn.put_remote(remote, n, (h, m))?;
} else {
break;
}
}
}
Ok(())
}
RemoteRepo::None => unreachable!(),
}
}
pub async fn upload_changes(
&mut self,
mut local: PathBuf,
to_channel: Option<&str>,
changes: &[Hash],
) -> Result<(), anyhow::Error> {
match self {
RemoteRepo::Local(ref mut l) => l.upload_changes(local, to_channel, changes),
RemoteRepo::Ssh(ref mut s) => s.upload_changes(local, to_channel, changes).await,
RemoteRepo::Http(ref h) => {
for c in changes {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let url = h.url.clone() + "/" + DOT_DIR;
let change = std::fs::read(&local)?;
let mut to_channel = if let Some(ch) = to_channel {
vec![("to_channel", ch)]
} else {
Vec::new()
};
let c = c.to_base32();
to_channel.push(("apply", &c));
debug!("url {:?} {:?}", url, to_channel);
h.client
.post(&url)
.query(&to_channel)
.body(change)
.send()
.await?;
libpijul::changestore::filesystem::pop_filename(&mut local);
}
Ok(())
}
RemoteRepo::None => unreachable!(),
}
}
/// Start (and possibly complete) the download of a change.
pub async fn start_change_download(
&mut self,
c: libpijul::pristine::Hash,
path: &mut PathBuf,
full: bool,
) -> Result<bool, anyhow::Error> {
debug!("start_change_download");
libpijul::changestore::filesystem::push_filename(path, &c);
if std::fs::metadata(&path).is_ok() && !full {
debug!("metadata {:?} ok", path);
libpijul::changestore::filesystem::pop_filename(path);
return Ok(false);
}
std::fs::create_dir_all(&path.parent().unwrap())?;
match *self {
RemoteRepo::Local(ref mut l) => l.start_change_download(c, path).await?,
RemoteRepo::Ssh(ref mut s) => s.start_change_download(c, full).await?,
RemoteRepo::Http(ref h) => {
let mut f = std::fs::File::create(&path)?;
let c32 = c.to_base32();
let url = format!("{}/{}", h.url, DOT_DIR);
let mut res = h.client.get(&url).query(&[("change", c32)]).send().await?;
if !res.status().is_success() {
return Err((crate::Error::Http {
status: res.status(),
})
.into());
}
while let Some(chunk) = res.chunk().await? {
f.write_all(&chunk)?;
}
}
RemoteRepo::None => unreachable!(),
}
libpijul::changestore::filesystem::pop_filename(path);
Ok(true)
}
pub async fn wait_downloads(
&mut self,
changes_dir: &Path,
hashes: &[libpijul::pristine::Hash],
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
) -> Result<(), anyhow::Error> {
if hashes.is_empty() {
return Ok(());
}
if let RemoteRepo::Ssh(ref mut s) = *self {
s.wait_downloads(changes_dir, hashes, send).await?
} else {
for h in hashes {
send.send(*h).await?
}
}
Ok(())
}
pub async fn pull<T: MutTxnTExt + TxnTExt>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
to_download: Vec<Hash>,
do_apply: bool,
) -> Result<(), anyhow::Error> {
let (mut send, mut recv) = tokio::sync::mpsc::channel(100);
let mut change_path_ = repo.changes_dir.clone();
let to_download_ = to_download.clone();
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let t = tokio::spawn(async move {
let mut hashes = Vec::new();
for h in to_download_.iter() {
if self_
.start_change_download(*h, &mut change_path_, false)
.await?
{
hashes.push(*h);
}
}
debug!("hashes = {:?}", hashes);
self_
.wait_downloads(&change_path_, &hashes, &mut send)
.await?;
Ok(self_)
});
let mut ws = libpijul::ApplyWorkspace::new();
let mut change_path = repo.changes_dir.clone();
for h in to_download.iter() {
libpijul::changestore::filesystem::push_filename(&mut change_path, &h);
debug!("change_path = {:?}", change_path);
while std::fs::metadata(&change_path).is_err() {
debug!("waiting");
let r = recv.recv().await;
debug!("r = {:?}", r);
if r.is_none() {
break;
}
}
libpijul::changestore::filesystem::pop_filename(&mut change_path);
if do_apply {
println!("Applying {:?}", h.to_base32());
debug!("applying {:?}", h);
txn.apply_change_ws(&repo.changes, channel, *h, &mut ws)?;
} else {
debug!("not applying {:?}", h)
}
}
std::mem::drop(recv);
debug!("waiting for spawned process");
let r: Result<_, anyhow::Error> = t.await?;
debug!("done");
*self = r?;
Ok(())
}
pub async fn clone_tag<T: MutTxnTExt + TxnTExt>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
tag: &[Hash],
) -> Result<(), anyhow::Error> {
let (mut send_signal, mut recv_signal) = tokio::sync::mpsc::channel(100);
let (mut send_hash, mut recv_hash) = tokio::sync::mpsc::channel(100);
let mut change_path_ = repo.changes_dir.clone();
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let t = tokio::spawn(async move {
let mut hashes = Vec::new();
while let Some(hash) = recv_hash.recv().await {
if self_
.start_change_download(hash, &mut change_path_, false)
.await?
{
hashes.push(hash);
}
}
debug!("hashes = {:?}", hashes);
self_
.wait_downloads(&change_path_, &hashes, &mut send_signal)
.await?;
Ok(self_)
});
for &h in tag.iter() {
send_hash.send(h).await?;
}
let mut change_path = repo.changes_dir.clone();
let mut hashes = Vec::new();
while let Some(hash) = recv_signal.recv().await {
libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
std::fs::create_dir_all(change_path.parent().unwrap())?;
use libpijul::changestore::ChangeStore;
hashes.push(hash);
for dep in repo.changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
send_hash.send(dep).await?;
}
libpijul::changestore::filesystem::pop_filename(&mut change_path);
}
std::mem::drop(recv_signal);
std::mem::drop(send_hash);
let mut ws = libpijul::ApplyWorkspace::new();
while let Some(hash) = hashes.pop() {
txn.apply_change_ws(&repo.changes, channel, hash, &mut ws)?;
}
let r: Result<_, anyhow::Error> = t.await?;
*self = r?;
Ok(())
}
pub async fn clone_state<T: MutTxnTExt + TxnTExt>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
state: Merkle,
lazy: bool,
) -> Result<(), anyhow::Error> {
self.update_changelist(txn, &[]).await?;
let name = self.name();
let remote = txn.open_or_create_remote(name).unwrap();
if let RemoteRepo::Ssh(ref mut s) = self {
s.clone_channel(repo, txn, channel, lazy).await?;
let mut to_unrecord = Vec::new();
let mut found = false;
for (n, (h, s)) in txn.iter_rev_remote(&remote.borrow().remote, None) {
debug!("{:?} {:?} {:?}", n, h, s);
if s == state {
found = true;
break;
}
to_unrecord.push(h);
}
if !found {
return Err((Error::StateNotFound { state }).into());
}
self.pull(repo, txn, channel, to_unrecord.clone(), false)
.await?;
for unrec in to_unrecord.iter() {
txn.unrecord(&repo.changes, channel, unrec)?;
}
return Ok(());
}
let mut to_pull = Vec::new();
let mut found = false;
for (n, (h, s)) in txn.iter_remote(&remote.borrow().remote, 0) {
debug!("{:?} {:?} {:?}", n, h, s);
to_pull.push(h);
if s == state {
found = true;
break;
}
}
if !found {
return Err((Error::StateNotFound { state }).into());
}
self.pull(repo, txn, channel, to_pull, true).await?;
Ok(())
}
pub async fn complete_changes<T: MutTxnTExt + TxnTExt>(
&mut self,
repo: &crate::repository::Repository,
txn: &T,
local_channel: &mut ChannelRef<T>,
changes: &[Hash],
full: bool,
) -> Result<(), anyhow::Error> {
use libpijul::changestore::ChangeStore;
let (mut send_hash, mut recv_hash) = tokio::sync::mpsc::channel(100);
let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
let mut self_ = std::mem::replace(self, RemoteRepo::None);
let mut changes_dir = repo.changes_dir.clone();
let t = tokio::spawn(async move {
let mut hashes = Vec::new();
while let Some(h) = recv_hash.recv().await {
debug!("downloading full patch: {:?}", h);
if self_
.start_change_download(h, &mut changes_dir, true)
.await?
{
debug!("push");
hashes.push(h);
}
debug!("done");
}
debug!("waiting");
self_
.wait_downloads(&changes_dir, &hashes, &mut send_sig)
.await?;
let result: Result<_, anyhow::Error> = Ok(self_);
result
});
for c in changes {
if repo.changes.has_contents(*c, txn.get_internal(*c)) {
debug!("has contents {:?}", c);
continue;
}
if full {
debug!("sending send_hash");
send_hash.send(*c).await?;
debug!("sent");
continue;
}
let change = if let Some(i) = txn.get_internal(*c) {
i
} else {
continue;
};
// Check if at least one non-empty vertex from c is still alive.
let v = libpijul::pristine::Vertex {
change,
start: libpijul::pristine::ChangePosition(0),
end: libpijul::pristine::ChangePosition(0),
};
let channel = local_channel.borrow();
for (v_, e) in txn.iter_graph(&channel.graph, v, None) {
if v_.change < change {
continue;
} else if v_.change > change {
break;
}
if e.flag.contains(libpijul::pristine::EdgeFlags::PARENT)
&& !e.flag.contains(libpijul::pristine::EdgeFlags::DELETED)
{
// Alive!
debug!("sending alive");
send_hash.send(*c).await?;
debug!("sent");
break;
}
}
}
debug!("dropping send_hash");
std::mem::drop(send_hash);
while recv_sig.recv().await.is_some() {}
*self = t.await??;
Ok(())
}
pub async fn clone_channel<T: MutTxnTExt + TxnTExt>(
&mut self,
repo: &mut Repository,
txn: &mut T,
local_channel: &mut ChannelRef<T>,
lazy: bool,
path: &[String],
) -> Result<(), anyhow::Error> {
if path.is_empty() {
match *self {
RemoteRepo::Ssh(ref mut s) => {
return s.clone_channel(repo, txn, local_channel, lazy).await
}
_ => {}
}
}
let remote_changes = self.update_changelist(txn, path).await?;
let mut pullable = Vec::new();
for (_, (h, _)) in txn.iter_remote(&remote_changes.borrow().remote, 0) {
pullable.push(h)
}
// let pullable = self.pullable(txn, local_channel, path).await?;
self.pull(repo, txn, local_channel, pullable, true).await
}
}
fn parse_line(data: &str) -> Result<(u64, Hash, Merkle), anyhow::Error> {
debug!("data = {:?}", data);
let mut it = data.split('.');
let n = if let Some(n) = it.next().and_then(|n| n.parse().ok()) {
n
} else {
return Err((Error::ProtocolError {
line: data.as_bytes().to_vec(),
})
.into());
};
debug!("n = {:?}", n);
let h = if let Some(h) = it.next().and_then(|h| Hash::from_base32(h.as_bytes())) {
h
} else {
return Err((Error::ProtocolError {
line: data.as_bytes().to_vec(),
})
.into());
};
debug!("h = {:?}", h);
let m = if let Some(m) = it.next().and_then(|m| {
debug!("m = {:?}", m);
Merkle::from_base32(m.as_bytes())
}) {
m
} else {
return Err((Error::ProtocolError {
line: data.as_bytes().to_vec(),
})
.into());
};
debug!("m = {:?}", m);
if it.next().is_some() {
return Err((Error::ProtocolError {
line: data.as_bytes().to_vec(),
})
.into());
}
Ok((n, h, m))
}
use super::RemoteRef;
use libpijul::pristine::{Base32, Hash, Merkle, MutTxnT, TxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use std::path::{Path, PathBuf};
pub struct Local {
pub channel: String,
pub root: std::path::PathBuf,
pub changes_dir: std::path::PathBuf,
pub pristine: libpijul::pristine::sanakirja::Pristine,
pub name: String,
}
impl Local {
pub fn get_state(&mut self, mid: Option<u64>) -> Result<Option<(u64, Merkle)>, anyhow::Error> {
let txn = self.pristine.txn_begin()?;
let channel = txn.load_channel(&self.channel).unwrap();
if let Some(mid) = mid {
Ok(txn.get_changes(&channel, mid).map(|(_, m)| (mid, m)))
} else {
Ok(txn
.reverse_log(&channel.borrow(), None)
.next()
.map(|(n, (_, m))| (n, m)))
}
}
pub fn download_changelist<T: MutTxnT>(
&mut self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<(), anyhow::Error> {
let store = libpijul::changestore::filesystem::FileSystem::from_root(&self.root);
let remote_txn = self.pristine.txn_begin()?;
let remote_channel = if let Some(channel) = remote_txn.load_channel(&self.channel) {
channel
} else {
debug!("no remote channel named {:?}", self.channel);
return Ok(());
};
let mut paths_ = Vec::new();
for s in paths {
if let Ok((p, _ambiguous)) = remote_txn.follow_oldest_path(&store, &remote_channel, s) {
paths_.push(p)
}
}
for (n, (h, m)) in remote_txn.log(&remote_channel.borrow(), from) {
if n >= from {
debug!(
"downloading changelist item {:?} {:?} {:?}",
n,
h.to_base32(),
m.to_base32()
);
let h_int = remote_txn.get_internal(h).unwrap();
if paths_.is_empty()
|| paths_
.iter()
.any(|x| remote_txn.get_touched_files(*x, Some(h_int)).is_some())
{
txn.put_remote(remote, n, (h, m))?;
}
}
}
Ok(())
}
pub fn upload_changes(
&mut self,
mut local: PathBuf,
to_channel: Option<&str>,
changes: &[Hash],
) -> Result<(), anyhow::Error> {
let store = libpijul::changestore::filesystem::FileSystem::from_root(&self.root);
let mut txn = self.pristine.mut_txn_begin();
let mut channel = txn.open_or_create_channel(to_channel.unwrap_or(&self.channel))?;
let mut ws = libpijul::ApplyWorkspace::new();
for c in changes {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
std::fs::create_dir_all(&self.changes_dir.parent().unwrap())?;
debug!("hard link {:?} {:?}", local, self.changes_dir);
std::fs::hard_link(&local, &self.changes_dir)?;
debug!("hard link done");
libpijul::changestore::filesystem::pop_filename(&mut local);
libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
txn.apply_change_ws(&store, &mut channel, *c, &mut ws)?;
}
let mut repo = libpijul::working_copy::filesystem::FileSystem::from_root(&self.root);
txn.output_repository_no_pending(&mut repo, &store, &mut channel, "", true)?;
txn.commit()?;
Ok(())
}
pub async fn start_change_download(
&mut self,
c: libpijul::pristine::Hash,
path: &Path,
) -> Result<(), anyhow::Error> {
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
debug!("hard link {:?} {:?}", self.changes_dir, path);
std::fs::hard_link(&self.changes_dir, path)?;
debug!("hard link done");
libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
debug!("sent");
Ok(())
}
}
#[macro_use]
extern crate clap;
#[macro_use]
extern crate thiserror;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
use clap::Clap;
use human_panic::setup_panic;
mod commands;
mod config;
mod remote;
mod repository;
use commands::*;
use std::io::Write;
const DEFAULT_CHANNEL: &'static str = "main";
const PROTOCOL_VERSION: usize = 3;
#[derive(Clap, Debug)]
#[clap(version = crate_version!(), author = crate_authors!())]
pub struct Opts {
#[clap(subcommand)]
pub subcmd: SubCommand,
}
#[derive(Clap, Debug)]
pub enum SubCommand {
#[clap(name = "init")]
Init(Init),
#[clap(name = "clone")]
Clone(Clone),
#[clap(name = "record", alias = "rec")]
Record(Record),
#[clap(name = "diff")]
Diff(Diff),
#[clap(name = "log")]
Log(Log),
#[clap(name = "push")]
Push(Push),
#[clap(name = "pull")]
Pull(Pull),
#[clap(name = "change")]
Change(Change),
#[clap(name = "channel")]
Channel(Channel),
#[clap(name = "protocol", setting = clap::AppSettings::Hidden)]
Protocol(Protocol),
#[cfg(feature = "git")]
#[clap(name = "git")]
Git(Git),
#[clap(name = "mv")]
Mv(Mv),
#[clap(name = "ls")]
Ls(Ls),
#[clap(name = "add")]
Add(Add),
#[clap(name = "remove")]
Remove(Remove),
#[clap(name = "reset")]
Reset(Reset),
#[cfg(debug_assertions)]
#[clap(name = "debug")]
Debug(Debug),
#[clap(name = "fork")]
Fork(Fork),
#[clap(name = "unrecord", alias = "unrec", alias = "un")]
Unrecord(Unrecord),
#[clap(name = "apply")]
Apply(Apply),
#[clap(name = "remote")]
Remote(Remote),
#[clap(name = "archive")]
Archive(Archive),
#[clap(name = "credit")]
Credit(Credit),
#[clap(name = "upgrade", setting = clap::AppSettings::Hidden)]
Upgrade(Upgrade),
}
#[derive(Debug, Error)]
pub enum Error {
#[error("No Pijul repository found")]
NoRepoRoot,
#[error("Cannot access working directory")]
CannotAccessWorkingDirectory,
#[error("Already in a repository")]
AlreadyInARepo,
#[error("No such channel: {}", channel)]
NoSuchChannel { channel: String },
#[error("Protocol error. Is this the correct URL?")]
ProtocolError { line: Vec<u8> },
#[error("Not authenticated")]
NotAuthenticated,
#[error("No change message")]
NoChangeMessage,
#[error("Incorrect remote: {}", name)]
IncorrectRemote { name: String },
#[error("Unknown host key")]
UnknownHostKey,
#[error("Cannot record a binary change interactively. Use -a")]
RecordBinaryChange,
#[error("Remote not found: {:?}", remote)]
RemoteNotFound { remote: String },
#[error("No global config directory")]
NoGlobalConfigDir,
#[error("Could not parse global config")]
CouldNotParseGlobal,
#[error("Cannot dry-reset multiple files")]
CannotDryReset,
#[error("Remote error: {}", msg)]
Remote { msg: String },
#[error("Remote exited with status {}", status)]
RemoteExit { status: u32 },
#[error("Missing remote")]
MissingRemote,
#[error("State not found in remote: {:?}", state)]
StateNotFound { state: libpijul::pristine::Merkle },
#[error("Missing dependencies for change {:?}", h)]
MissingDep { h: libpijul::pristine::Hash },
#[error("Ambiguous path: {:?}", path)]
AmbiguousPath { path: String },
#[error("No prefixes given. Use `.` to record the current directory.")]
NoRecordPrefixes,
#[error("HTTP error: {}", status.as_str())]
Http { status: reqwest::StatusCode },
#[error("Could not parse configuration file at {:?}", path)]
CouldNotReadConfig { path: std::path::PathBuf },
#[error("No current channel")]
NoCurrentChannel,
#[error("Channel not found: {:?}", channel)]
ChannelNotFound { channel: String },
#[error("Cannot reset, because there are unrecorded changes")]
UnrecordedChanges,
#[error("Could not infer repository name")]
CouldNotInferRepositoryName { repo: String },
}
#[tokio::main]
async fn main() {
if !cfg!(debug_assertions) {
setup_panic!();
}
env_logger::init();
let opts: Opts = Opts::parse();
if let Err(e) = run(opts).await {
writeln!(std::io::stderr(), "Error: {}", e).unwrap_or(());
std::process::exit(1);
}
}
async fn run(opts: Opts) -> Result<(), anyhow::Error> {
match opts.subcmd {
SubCommand::Log(l) => l.run(),
SubCommand::Init(init) => init.run(),
SubCommand::Clone(clone) => clone.run().await,
SubCommand::Record(record) => record.run().await,
SubCommand::Diff(diff) => diff.run(),
SubCommand::Push(push) => push.run().await,
SubCommand::Pull(pull) => pull.run().await,
SubCommand::Change(change) => change.run(),
SubCommand::Channel(channel) => channel.run(),
SubCommand::Protocol(protocol) => protocol.run(),
#[cfg(feature = "git")]
SubCommand::Git(git) => git.run(),
SubCommand::Mv(mv) => mv.run(),
SubCommand::Ls(ls) => ls.run(),
SubCommand::Add(add) => add.run(),
SubCommand::Remove(remove) => remove.run(),
SubCommand::Reset(reset) => reset.run(),
#[cfg(debug_assertions)]
SubCommand::Debug(debug) => debug.run(),
SubCommand::Fork(fork) => fork.run(),
SubCommand::Unrecord(unrecord) => unrecord.run(),
SubCommand::Apply(apply) => apply.run(),
SubCommand::Remote(remote) => remote.run(),
SubCommand::Archive(archive) => archive.run().await,
SubCommand::Credit(credit) => credit.run(),
SubCommand::Upgrade(upgrade) => upgrade.run(),
}
}
pub fn current_dir() -> Result<std::path::PathBuf, Error> {
std::env::current_dir().map_err(|_| Error::CannotAccessWorkingDirectory)
}
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
pub struct Global {
pub author: libpijul::change::Author,
}
const CONFIG_DIR: &'static str = "pijul";
impl Global {
pub fn load() -> Result<Global, anyhow::Error> {
if let Some(mut dir) = dirs_next::config_dir() {
dir.push(CONFIG_DIR);
dir.push("config.toml");
let s = std::fs::read(&dir);
let s = match s {
Ok(s) => s,
Err(e) => {
if let Some(mut dir) = dirs_next::home_dir() {
dir.push(".pijulconfig");
std::fs::read(&dir)?
} else {
return Err(e.into());
}
}
};
debug!("s = {:?}", s);
if let Ok(c) = toml::from_slice(&s) {
Ok(c)
} else {
Err((crate::Error::CouldNotReadConfig { path: dir }).into())
}
} else {
Err(crate::Error::NoGlobalConfigDir.into())
}
}
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Config {
pub current_channel: Option<String>,
pub default_remote: Option<String>,
pub remotes: HashMap<String, String>,
pub hooks: Option<Hooks>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Hooks {
pub record: Vec<String>,
}
impl Config {
pub fn get_current_channel<'a>(&'a self, alt: Option<&'a String>) -> &'a str {
if let Some(channel) = alt {
channel.as_ref()
} else if let Some(ref channel) = self.current_channel {
channel.as_str()
} else {
crate::DEFAULT_CHANNEL
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Remote_ {
ssh: Option<SshRemote>,
local: Option<String>,
url: Option<String>,
}
#[derive(Debug)]
pub enum Remote {
Ssh(SshRemote),
Local { local: String },
Http { url: String },
None,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SshRemote {
pub addr: String,
}
impl<'de> serde::Deserialize<'de> for Remote {
fn deserialize<D>(deserializer: D) -> Result<Remote, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let r = Remote_::deserialize(deserializer)?;
if let Some(ssh) = r.ssh {
Ok(Remote::Ssh(ssh))
} else if let Some(local) = r.local {
Ok(Remote::Local { local })
} else if let Some(url) = r.url {
Ok(Remote::Http { url })
} else {
Ok(Remote::None)
}
}
}
impl serde::Serialize for Remote {
fn serialize<D>(&self, serializer: D) -> Result<D::Ok, D::Error>
where
D: serde::ser::Serializer,
{
let r = match *self {
Remote::Ssh(ref ssh) => Remote_ {
ssh: Some(ssh.clone()),
local: None,
url: None,
},
Remote::Local { ref local } => Remote_ {
local: Some(local.to_string()),
ssh: None,
url: None,
},
Remote::Http { ref url } => Remote_ {
local: None,
ssh: None,
url: Some(url.to_string()),
},
Remote::None => Remote_ {
local: None,
ssh: None,
url: None,
},
};
r.serialize(serializer)
}
}
use crate::repository::Repository;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::{Hash, MutTxnT, TxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Upgrade {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
}
impl Upgrade {
pub fn run(self) -> Result<(), anyhow::Error> {
let mut channels = HashMap::new();
{
let repo = Repository::find_root(self.repo_path.clone())?;
let txn = repo.pristine.txn_begin()?;
let mut hashes = HashSet::new();
for channel in txn.iter_channels("") {
let channel = channel.borrow();
let name = channel.name();
let e = channels.entry(name.to_string()).or_insert(Vec::new());
hashes.clear();
for (_, (h, _)) in txn.reverse_log(&channel, None) {
if !hashes.insert(h) {
continue;
}
let path = repo.changes.filename(&h);
let change = libpijul::change::v3::LocalChange3::deserialize(
path.to_str().unwrap(),
Some(&h),
)
.unwrap();
e.push((h, change))
}
}
std::fs::rename(repo.path.join(".pijul"), repo.path.join(".pijul.old"))?;
}
let repo2 = Repository::init(self.repo_path)?;
let mut txn2 = repo2.pristine.mut_txn_begin();
let mut translations = HashMap::new();
translations.insert(None, None);
translations.insert(Some(Hash::None), Some(Hash::None));
for (channel_name, mut changes) in channels {
let mut channel = txn2.open_or_create_channel(&channel_name)?;
while let Some((old_h, c)) = changes.pop() {
let h = repo2.changes.save_change(&c.to_v4(&translations))?;
translations.insert(Some(old_h), Some(h));
txn2.apply_change(&repo2.changes, &mut channel, h)?;
}
}
txn2.commit()?;
Ok(())
}
}
use crate::repository::Repository;
use libpijul::pristine::{Base32, MutTxnT, TxnT};
use libpijul::MutTxnTExt;
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Unrecord {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(
about = "identifier of the change (unambiguous prefixes are accepted)",
multiple = true
)]
change_id: Vec<String>,
}
impl Unrecord {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let mut txn = repo.pristine.mut_txn_begin();
let mut stderr = std::io::stderr();
for c in self.change_id.iter() {
let (hash, change_id) = txn.hash_from_prefix(c)?;
if let Some(mut channel) = txn.load_channel(channel_name) {
let mut can_unrecord = true;
let channel_ = channel.borrow();
for (p, d) in txn.iter_revdep(change_id) {
if p < change_id {
continue;
} else if p > change_id {
break;
}
if txn.get_changeset(&channel_.changes, d, None).is_some() {
if can_unrecord {
writeln!(stderr, "Cannot unrecord change {}, because the following changes depend on it:", c)?
}
can_unrecord = false;
writeln!(stderr, " {}", txn.get_external(d).unwrap().to_base32())?
}
}
std::mem::drop(channel_);
if can_unrecord {
txn.unrecord(&repo.changes, &mut channel, &hash)?;
}
}
}
txn.commit()?;
Ok(())
}
}
use crate::repository::Repository;
use crate::Error;
use libpijul::pristine::MutTxnT;
use libpijul::{MutTxnTExt, TxnTExt};
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Reset {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(long = "dry-run")]
dry_run: bool,
#[clap(long = "change")]
change: Option<String>,
files: Vec<PathBuf>,
}
impl Reset {
pub fn run(self) -> Result<(), anyhow::Error> {
let has_repo_path = self.repo_path.is_some();
let mut repo = Repository::find_root(self.repo_path)?;
let mut txn = repo.pristine.mut_txn_begin();
use libpijul::pristine::TxnT;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let mut channel = if let Some(channel) = txn.load_channel(&channel_name) {
channel
} else if self.change.is_some() {
txn.open_or_create_channel(&channel_name)?
} else {
return Err((Error::NoSuchChannel {
channel: channel_name.to_string(),
})
.into());
};
if self.dry_run {
if self.files.len() > 1 {
return Err(Error::CannotDryReset.into());
}
let (pos, _ambiguous) = if has_repo_path {
let root = std::fs::canonicalize(repo.path.join(&self.files[0]))?;
let path = root.strip_prefix(&repo.path)?.to_str().unwrap();
txn.follow_oldest_path(&repo.changes, &channel, &path)?
} else {
let path = self.files[0].to_str().unwrap();
txn.follow_oldest_path(&repo.changes, &channel, &path)?
};
txn.output_file(
&repo.changes,
&channel,
pos,
&mut libpijul::vertex_buffer::Writer::new(std::io::stdout()),
)?;
} else {
let current_channel = repo.config.get_current_channel(None);
if self.channel.is_some()
&& self.channel.as_ref().map(|x| x.as_str()) != Some(current_channel)
{
if let Some(mut channel) = txn.load_channel(current_channel) {
let mut state = libpijul::RecordBuilder::new();
txn.record(
&mut state,
libpijul::Algorithm::default(),
&mut channel,
&mut repo.working_copy,
&repo.changes,
"",
)?;
let rec = state.finish();
debug!("actions = {:?}", rec.actions);
if !rec.actions.is_empty() {
return Err(Error::UnrecordedChanges.into());
}
}
}
if self.channel.is_some() {
repo.config.current_channel = self.channel;
repo.save_config()?;
}
if let Some(ch) = self.change {
let (hash, _) = txn.hash_from_prefix(&ch)?;
txn.apply_change_rec(&repo.changes, &mut channel, hash)?
}
if self.files.is_empty() {
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
} else {
for root in self.files.iter() {
let root = std::fs::canonicalize(&root)?;
let path = root.strip_prefix(&repo.path)?.to_str().unwrap();
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
&path,
true,
)?;
}
}
txn.commit()?
}
Ok(())
}
}
use crate::repository::*;
use crate::Error;
use chrono::Utc;
use libpijul::change::*;
use libpijul::changestore::*;
use libpijul::pristine::{Base32, ChannelRef, MutTxnT, TxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use thrussh_keys::PublicKeyBase64;
#[derive(Clap, Debug)]
pub struct Record {
#[clap(short = 'a', long = "all")]
pub all: bool,
#[clap(short = 'm', long = "message")]
pub message: Option<String>,
#[clap(long = "author")]
pub author: Option<String>,
#[clap(long = "channel")]
pub channel: Option<String>,
#[clap(long = "repository")]
pub repo_path: Option<PathBuf>,
#[clap(long = "timestamp")]
pub timestamp: Option<i64>,
#[clap(short = 'S')]
pub sign: bool,
#[clap(long = "stdin")]
pub stdin: bool,
#[clap(long = "tag")]
pub tag: bool,
#[clap(long = "amend")]
pub amend: Option<String>,
pub prefixes: Vec<PathBuf>,
}
impl Record {
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path.clone())?;
let mut stdout = std::io::stdout();
let mut stderr = std::io::stderr();
if let Some(ref hooks) = repo.config.hooks {
for h in hooks.record.iter() {
let mut proc = std::process::Command::new("bash")
.current_dir(&repo.path)
.args(&["-c", &h])
.spawn()?;
let status = proc.wait()?;
if !status.success() {
writeln!(stderr, "Hook {:?} exited with code {:?}", h, status)?;
std::process::exit(status.code().unwrap_or(1))
}
}
}
let mut txn = repo.pristine.mut_txn_begin();
let mut channel =
txn.open_or_create_channel(repo.config.get_current_channel(self.channel.as_ref()))?;
let sign = self.sign;
let header = if let Some(ref amend) = self.amend {
let (h, _) = txn.hash_from_prefix(amend)?;
let header = repo.changes.get_header(&h)?;
txn.unrecord(&repo.changes, &mut channel, &h)?;
header
} else {
self.header()
};
let result = self.record(
&mut txn,
&mut channel,
&mut repo.working_copy,
&repo.changes,
&repo.path,
header,
)?;
if let Some((mut change, updates, hash)) = result {
let hash = hash.unwrap();
if sign {
let mut key_path = dirs_next::home_dir().unwrap().join(".ssh");
if let Some((pk, signature)) = sign_hash(&mut key_path, hash).await? {
let sig = toml::Value::try_from(vec![Signature {
public_key: pk,
timestamp: change.header.timestamp,
signature: signature,
}])?;
let mut toml = toml::map::Map::new();
toml.insert("signatures".to_string(), sig);
change.unhashed = Some(toml.into());
let hash2 = repo.changes.save_change(&change).unwrap();
assert_eq!(hash2, hash);
}
}
txn.apply_local_change(&mut channel, &change, hash, &updates)?;
writeln!(stdout, "Hash: {}", hash.to_base32())?;
txn.commit()?;
} else {
writeln!(stderr, "Nothing to record")?;
}
Ok(())
}
fn header(&self) -> ChangeHeader {
let authors = if let Some(ref a) = self.author {
vec![libpijul::change::Author {
name: a.clone(),
full_name: None,
email: None,
}]
} else if let Ok(global) = crate::config::Global::load() {
vec![global.author]
} else {
Vec::new()
};
ChangeHeader {
message: self.message.clone().unwrap_or(String::new()),
authors,
description: None,
timestamp: if let Some(t) = self.timestamp {
chrono::DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(t, 0), chrono::Utc)
} else {
Utc::now()
},
}
}
fn fill_relative_prefixes(&mut self) -> Result<(), anyhow::Error> {
let cwd = std::env::current_dir()?;
for p in self.prefixes.iter_mut() {
if p.is_relative() {
*p = cwd.join(&p);
}
}
Ok(())
}
fn record<T: TxnT + TxnTExt + MutTxnTExt, C: ChangeStore>(
mut self,
txn: &mut T,
channel: &mut ChannelRef<T>,
working_copy: &mut libpijul::working_copy::FileSystem,
changes: &C,
repo_path: &Path,
header: ChangeHeader,
) -> Result<
Option<(
Change,
HashMap<usize, libpijul::InodeUpdate>,
Option<libpijul::pristine::Hash>,
)>,
anyhow::Error,
> {
let mut state = libpijul::RecordBuilder::new();
if self.prefixes.is_empty() {
txn.record(
&mut state,
libpijul::Algorithm::default(),
channel,
working_copy,
changes,
"",
)?
} else {
self.fill_relative_prefixes()?;
working_copy.record_prefixes(
txn,
channel,
changes,
&mut state,
repo_path,
&self.prefixes,
)?;
}
let mut rec = state.finish();
if rec.actions.is_empty() {
return Ok(None);
}
let actions = rec
.actions
.into_iter()
.map(|rec| rec.globalize(txn))
.collect();
let change =
LocalChange::make_change(txn, channel, actions, rec.contents, header, Vec::new());
let file_name = |local: &Local, _| -> String { format!("{}:{}", local.path, local.line) };
debug!("has_binary = {:?}", rec.has_binary_files);
let mut change = if self.all {
change
} else if rec.has_binary_files {
return Err(Error::RecordBinaryChange.into());
} else {
let mut o = Vec::new();
change.write(changes, None, file_name, true, &mut o)?;
let mut with_errors: Option<Vec<u8>> = None;
let change = loop {
let mut bytes = if let Some(ref o) = with_errors {
edit::edit_bytes(&o[..])?
} else {
edit::edit_bytes(&o[..])?
};
if bytes.iter().all(|c| (*c as char).is_whitespace()) {
return Ok(None);
}
let mut change = std::io::BufReader::new(std::io::Cursor::new(&bytes));
if let Ok(change) =
Change::read_and_deps(&mut change, &mut rec.updatables, txn, channel)
{
break change;
}
let mut err = SYNTAX_ERROR.as_bytes().to_vec();
err.append(&mut bytes);
with_errors = Some(err)
};
if change.changes.is_empty() {
return Ok(None);
}
change
};
if change.header.message.trim().is_empty() {
return Err(Error::NoChangeMessage.into());
}
let (dependencies, extra_known) = if self.tag {
full_dependencies(txn, channel)
} else {
dependencies(txn, channel, change.changes.iter())
};
change.dependencies = dependencies;
change.extra_known = extra_known;
debug!("saving change");
let hash = changes.save_change(&change).unwrap();
debug!("saved");
Ok(Some((change, rec.updatables, Some(hash))))
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Signature {
public_key: String,
timestamp: chrono::DateTime<chrono::Utc>,
signature: String,
}
async fn sign_hash(
key_path: &mut PathBuf,
hash: libpijul::pristine::Hash,
) -> Result<Option<(String, String)>, anyhow::Error> {
let to_sign = hash.to_bytes();
match thrussh_keys::agent::client::AgentClient::connect_env().await {
Ok(agent) => {
let mut agent = Some(agent);
for k in &["id_ed25519.pub", "id_rsa.pub"] {
key_path.push(k);
if let Ok(key) = thrussh_keys::load_public_key(&key_path) {
debug!("key");
if let Some(a) = agent.take() {
debug!("authenticate future");
if let (_, Ok(sig)) = a.sign_request_base64(&key, &to_sign).await {
key_path.pop();
let key = key.public_key_base64();
return Ok(Some((key, sig)));
}
}
}
key_path.pop();
}
}
Err(e) => {
error!("{:?}", e);
}
}
for k in &["id_ed25519", "id_rsa"] {
key_path.push(k);
if let Some(k) = crate::remote::ssh::load_secret_key(&key_path, k) {
key_path.pop();
let pk = k.public_key_base64();
return Ok(Some((pk, k.sign_detached(&to_sign)?.to_base64())));
} else {
key_path.pop();
}
}
Ok(None)
}
const SYNTAX_ERROR: &'static str = "# Syntax errors, please try again.
# Alternatively, you may delete the entire file (including this
# comment to abort).
";
use crate::repository::Repository;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::{MutTxnT, TxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use regex::Regex;
use std::collections::HashSet;
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Remote {
#[clap(subcommand)]
subcmd: Option<SubRemote>,
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
}
#[derive(Clap, Debug)]
pub enum SubRemote {
#[clap(name = "delete")]
Delete { remote: String },
#[clap(name = "list")]
List,
}
impl Remote {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let mut stdout = std::io::stdout();
match self.subcmd {
None | Some(SubRemote::List) => {
let txn = repo.pristine.txn_begin()?;
for r in txn.iter_remotes("") {
writeln!(stdout, " {}", r.name())?;
}
}
Some(SubRemote::Delete { remote }) => {
let mut txn = repo.pristine.mut_txn_begin();
if !txn.drop_named_remote(&remote)? {
writeln!(std::io::stderr(), "Remote not found: {:?}", remote)?
} else {
txn.commit()?;
}
}
}
Ok(())
}
}
#[derive(Clap, Debug)]
pub struct Push {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(short = 'a')]
all: bool,
#[clap(short = 'k', about = "Do not check certificates")]
no_cert_check: bool,
#[clap(last = true)]
changes: Vec<String>,
#[clap(long = "path")]
path: Option<String>,
to: Option<String>,
#[clap(long = "to-channel")]
to_channel: Option<String>,
}
#[derive(Clap, Debug)]
pub struct Pull {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(long = "all", short = 'a')]
all: bool,
#[clap(short = 'k', about = "Do not check certificates")]
no_cert_check: bool,
#[clap(
long = "full",
about = "Download full changes, even when not necessary"
)]
full: bool, // This can't be symmetric with push
#[clap(
last = true,
about = "Pull changes from the local repository, not necessarily from a channel"
)]
changes: Vec<String>, // For local changes only, can't be symmetric.
#[clap(long = "path")]
path: Option<String>,
from: Option<String>,
#[clap(long = "from-channel")]
from_channel: Option<String>,
}
lazy_static! {
static ref CHANNEL: Regex = Regex::new(r#"([^:]*)(:(.*))?"#).unwrap();
}
impl Push {
pub async fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let remote_name = if let Some(ref rem) = self.to {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
return Err(crate::Error::MissingRemote.into());
};
let mut push_channel = None;
let remote_channel = if let Some(ref c) = self.to_channel {
let c = CHANNEL.captures(c).unwrap();
push_channel = c.get(3).map(|x| x.as_str());
let c = c.get(1).unwrap().as_str();
if c.is_empty() {
channel_name
} else {
c
}
} else {
channel_name
};
debug!("remote_channel = {:?} {:?}", remote_channel, push_channel);
let mut remote = repo
.remote(&remote_name, remote_channel, self.no_cert_check)
.await?;
let mut txn = repo.pristine.mut_txn_begin();
let mut paths = if let Some(p) = self.path {
vec![p.to_string()]
} else {
vec![]
};
let remote_changes = remote.update_changelist(&mut txn, &paths).await?;
let channel = txn.open_or_create_channel(channel_name)?;
let path = if let Some(path) = paths.pop() {
let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, &path)?;
if ambiguous {
return Err((crate::Error::AmbiguousPath { path: path.clone() }).into());
}
Some(p)
} else {
None
};
let mut to_upload = Vec::new();
for (_, (h, m)) in txn.reverse_log(&channel.borrow(), None) {
if txn.remote_has_state(&remote_changes, m) {
break;
}
let h_int = txn.get_internal(h).unwrap();
if !txn.remote_has_change(&remote_changes, h) {
if let Some(ref p) = path {
if txn.get_touched_files(*p, Some(h_int)).is_some() {
to_upload.push(h)
}
} else {
to_upload.push(h)
}
}
}
if to_upload.is_empty() {
return Ok(());
}
to_upload.reverse();
debug!("to_upload = {:?}", to_upload);
let to_upload = if !self.all {
let o = make_changelist(&repo.changes, &to_upload)?;
let u = parse_changelist(&edit::edit_bytes(&o[..])?);
check_deps(&repo.changes, &to_upload, &u)?;
u
} else {
to_upload
};
debug!("to_upload = {:?}", to_upload);
remote
.upload_changes(repo.changes_dir.clone(), push_channel, &to_upload)
.await?;
txn.commit()?;
remote.finish().await?;
Ok(())
}
}
impl Pull {
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path)?;
let mut txn = repo.pristine.mut_txn_begin();
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let mut channel = txn.open_or_create_channel(channel_name)?;
debug!("{:?}", repo.config);
let remote_name = if let Some(ref rem) = self.from {
rem
} else if let Some(ref def) = repo.config.default_remote {
def
} else {
return Err(crate::Error::MissingRemote.into());
};
let from_channel = if let Some(ref c) = self.from_channel {
c
} else {
crate::DEFAULT_CHANNEL
};
let mut remote = repo
.remote(&remote_name, from_channel, self.no_cert_check)
.await?;
debug!("downloading");
let to_download = if self.changes.is_empty() {
let paths = if let Some(p) = self.path {
vec![p.to_string()]
} else {
vec![]
};
let remote_changes = remote.update_changelist(&mut txn, &paths).await?;
debug!("changelist done");
let mut to_download = Vec::new();
for (_, (h, m)) in txn.iter_remote(&remote_changes.borrow().remote, 0) {
if txn.channel_has_state(&channel, m) {
break;
} else if txn.get_revchanges(&channel, h).is_none() {
to_download.push(h)
}
}
to_download.reverse();
to_download
} else {
let r: Result<Vec<libpijul::pristine::Hash>, anyhow::Error> = self
.changes
.iter()
.map(|h| Ok(txn.hash_from_prefix(h)?.0))
.collect();
r?
};
if to_download.is_empty() {
return Ok(());
}
debug!("recording");
let recorded = txn.record_all(
libpijul::Algorithm::default(),
&mut channel,
&mut repo.working_copy,
&repo.changes,
"",
)?;
let hash = if recorded.actions.is_empty() {
None
} else {
Some(txn.apply_recorded(&mut channel, recorded, &repo.changes)?)
};
remote
.pull(
&mut repo,
&mut txn,
&mut channel,
to_download.clone(),
self.all,
)
.await?;
if !self.all {
let o = make_changelist(&repo.changes, &to_download)?;
let d = parse_changelist(&edit::edit_bytes(&o[..])?);
check_deps(&repo.changes, &to_download, &d)?;
let mut ws = libpijul::ApplyWorkspace::new();
debug!("to_download = {:?}", to_download);
for h in d.iter() {
txn.apply_change_rec_ws(&repo.changes, &mut channel, *h, &mut ws)?;
}
}
debug!("completing changes");
remote
.complete_changes(&repo, &txn, &mut channel, &to_download, self.full)
.await?;
remote.finish().await?;
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
if let Some(h) = hash {
txn.unrecord(&repo.changes, &mut channel, &h)?;
repo.changes.del_change(&h)?;
}
txn.commit()?;
Ok(())
}
}
/// Make the "changelist", i.e. the list of patches, editable in a
/// text editor.
fn make_changelist<S: ChangeStore>(
changes: &S,
pullable: &[libpijul::pristine::Hash],
) -> Result<Vec<u8>, anyhow::Error> {
use libpijul::pristine::Base32;
let mut v = Vec::new();
writeln!(
v,
"# Please select the changes to pull. The lines that contain just a
# valid hash, and no other character (except possibly a newline), will
# be pulled.\n"
)
.unwrap();
let mut first_p = true;
for p in pullable {
if !first_p {
writeln!(v, "").unwrap();
}
first_p = false;
writeln!(v, "{}\n", p.to_base32()).unwrap();
let change = changes.get_header(&p)?;
write!(v, " Author: [").unwrap();
let mut first = true;
for a in change.authors.iter() {
if !first {
write!(v, ", ").unwrap();
}
first = false;
write!(v, "{}", a).unwrap();
}
writeln!(v, "]").unwrap();
writeln!(v, " Date: {}\n", change.timestamp).unwrap();
for l in change.message.lines() {
writeln!(v, " {}", l).unwrap();
}
}
Ok(v)
}
fn parse_changelist(o: &[u8]) -> Vec<libpijul::pristine::Hash> {
use libpijul::pristine::Base32;
if let Ok(o) = std::str::from_utf8(o) {
o.lines()
.filter_map(|l| libpijul::pristine::Hash::from_base32(l.as_bytes()))
.collect()
} else {
Vec::new()
}
}
fn check_deps<C: ChangeStore>(
c: &C,
original: &[libpijul::pristine::Hash],
now: &[libpijul::pristine::Hash],
) -> Result<(), anyhow::Error> {
let original_: HashSet<_> = original.iter().collect();
let now_: HashSet<_> = now.iter().collect();
for n in now {
// check that all of `now`'s deps are in now or not in original
for d in c.get_dependencies(n)? {
if original_.get(&d).is_some() && now_.get(&d).is_none() {
return Err((crate::Error::MissingDep { h: *n }).into());
}
}
}
Ok(())
}
use crate::repository::Repository;
use crate::Error;
use byteorder::{BigEndian, WriteBytesExt};
use libpijul::pristine::{Base32, ChannelRef, Hash, MutTxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use regex::Regex;
use std::collections::HashMap;
use std::io::BufWriter;
use std::io::{BufRead, Read, Write};
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Protocol {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "version")]
version: usize,
}
lazy_static! {
static ref STATE: Regex = Regex::new(r#"state\s+(\S+)(\s+([0-9]+)?)\s+"#).unwrap();
static ref CHANGELIST: Regex = Regex::new(r#"changelist\s+(\S+)\s+([0-9]+)(.*)\s+"#).unwrap();
static ref CHANGELIST_PATHS: Regex = Regex::new(r#""((\\")|[^"])+""#).unwrap();
static ref CHANGE: Regex = Regex::new(r#"((change)|(partial))\s+([^ ]*)\s+"#).unwrap();
static ref APPLY: Regex = Regex::new(r#"apply\s+(\S+)\s+(S+)\s+([0-9]+)\s+"#).unwrap();
static ref CHANNEL: Regex = Regex::new(r#"channel\s+(\S+)\s+"#).unwrap();
static ref ARCHIVE: Regex =
Regex::new(r#"archive\s+(\S+)\s*(( ([^:]+))*)( :(.*))?\n"#).unwrap();
}
fn load_channel<T: MutTxnT>(txn: &T, name: &str) -> Result<ChannelRef<T>, Error> {
if let Some(c) = txn.load_channel(name) {
Ok(c)
} else {
Err((Error::NoSuchChannel {
channel: name.to_string(),
})
.into())
}
}
const PARTIAL_CHANGE_SIZE: u64 = 1 << 20;
impl Protocol {
pub fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path)?;
let mut txn = repo.pristine.mut_txn_begin();
let mut ws = libpijul::ApplyWorkspace::new();
use libpijul::pristine::TxnT;
let mut buf = String::new();
let mut buf2 = vec![0; 4096 * 10];
let s = std::io::stdin();
let mut s = s.lock();
let o = std::io::stdout();
let o = o.lock();
let mut applied = HashMap::new();
let mut o = BufWriter::new(o);
debug!("reading");
while s.read_line(&mut buf)? > 0 {
debug!("{:?}", buf);
if let Some(cap) = STATE.captures(&buf) {
let channel = load_channel(&txn, &cap[1])?;
let init = if let Some(u) = cap.get(3) {
u.as_str().parse().ok()
} else {
None
};
if let Some(pos) = init {
for (n, (_, m)) in txn.log(&channel.borrow(), pos) {
if n < pos {
continue;
} else if n > pos {
writeln!(o, "-")?;
break;
} else {
writeln!(o, "{} {}", n, m.to_base32())?;
break;
}
}
} else {
if let Some((n, (_, m))) = txn.reverse_log(&channel.borrow(), None).next() {
writeln!(o, "{} {}", n, m.to_base32())?
} else {
writeln!(o, "-")?;
}
}
o.flush()?;
} else if let Some(cap) = CHANGELIST.captures(&buf) {
let channel = load_channel(&txn, &cap[1])?;
let from: u64 = cap[2].parse().unwrap();
let mut paths = Vec::new();
for r in CHANGELIST_PATHS.captures_iter(&cap[3]) {
let s: String = r[0].parse().unwrap();
if let Ok((p, ambiguous)) = txn.follow_oldest_path(&repo.changes, &channel, &s)
{
if ambiguous {
return Err((Error::ProtocolError {
line: buf.as_bytes().to_vec(),
})
.into());
}
paths.push(p)
} else {
return Err((Error::ProtocolError {
line: buf.as_bytes().to_vec(),
})
.into());
}
}
for (n, (h, m)) in txn.log(&channel.borrow(), from) {
let h_int = txn.get_internal(h).unwrap();
if paths.is_empty()
|| paths
.iter()
.any(|x| txn.get_touched_files(*x, Some(h_int)).is_some())
{
writeln!(o, "{}.{}.{}", n, h.to_base32(), m.to_base32())?
}
}
writeln!(o, "")?;
o.flush()?;
} else if let Some(cap) = CHANGE.captures(&buf) {
let h_ = &cap[4];
let h = if let Some(h) = Hash::from_base32(h_.as_bytes()) {
h
} else {
return Err((Error::ProtocolError {
line: buf.as_bytes().to_vec(),
})
.into());
};
libpijul::changestore::filesystem::push_filename(&mut repo.changes_dir, &h);
debug!("repo = {:?}", repo.changes_dir);
let mut f = std::fs::File::open(&repo.changes_dir)?;
let size = std::fs::metadata(&repo.changes_dir)?.len();
let size = if &cap[1] == "change" || size <= PARTIAL_CHANGE_SIZE {
size
} else {
libpijul::change::Change::size_no_contents(&mut f)?
};
o.write_u64::<BigEndian>(size)?;
let mut size = size as usize;
while size > 0 {
if size < buf2.len() {
buf2.truncate(size as usize);
}
let n = f.read(&mut buf2[..])?;
if n == 0 {
break;
}
size -= n;
o.write_all(&buf2[..n])?;
}
o.flush()?;
libpijul::changestore::filesystem::pop_filename(&mut repo.changes_dir);
} else if let Some(cap) = APPLY.captures(&buf) {
let h = if let Some(h) = Hash::from_base32(cap[2].as_bytes()) {
h
} else {
return Err((Error::ProtocolError {
line: buf.as_bytes().to_vec(),
})
.into());
};
let mut path = repo.changes_dir.clone();
libpijul::changestore::filesystem::push_filename(&mut path, &h);
let size: usize = cap[3].parse().unwrap();
buf2.resize(size, 0);
s.read_exact(&mut buf2)?;
std::fs::write(&path, &buf2)?;
libpijul::change::Change::deserialize(&path.to_string_lossy(), Some(&h))?;
let mut channel = load_channel(&txn, &cap[1])?;
txn.apply_change_ws(&repo.changes, &mut channel, h, &mut ws)?;
applied.insert(cap[4].to_string(), channel);
} else if let Some(cap) = CHANNEL.captures(&buf) {
let channel = load_channel(&txn, &cap[1])?;
let channel = channel.borrow();
for d in libpijul::pristine::channel_dump::dump_channel(&txn, channel) {
o.write_all(&d)?;
}
o.flush()?;
} else if let Some(cap) = ARCHIVE.captures(&buf) {
let mut w = Vec::new();
let mut tarball = libpijul::output::Tarball::new(
&mut w,
cap.get(6).map(|x| x.as_str().to_string()),
);
let channel = load_channel(&txn, &cap[1])?;
let conflicts = if let Some(caps) = cap.get(2) {
debug!("caps = {:?}", caps.as_str());
let mut hashes = caps.as_str().split(' ').filter(|x| !x.is_empty());
let state: libpijul::pristine::Merkle = hashes.next().unwrap().parse().unwrap();
let extra: Vec<libpijul::pristine::Hash> =
hashes.map(|x| x.parse().unwrap()).collect();
debug!("state = {:?}, extra = {:?}", state, extra);
if txn.current_state(&channel.borrow()) == Some(state) && extra.is_empty() {
txn.archive(&repo.changes, &channel, &mut tarball)?
} else {
use rand::Rng;
let fork_name: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(30)
.collect();
let mut fork = txn.fork(&channel, &fork_name)?;
let conflicts = txn.archive_with_state(
&repo.changes,
&mut fork,
state,
&extra,
&mut tarball,
)?;
txn.drop_channel(&fork_name)?;
conflicts
}
} else {
txn.archive(&repo.changes, &channel, &mut tarball)?
};
std::mem::drop(tarball);
let mut o = std::io::stdout();
o.write_u64::<BigEndian>(w.len() as u64)?;
o.write_u64::<BigEndian>(conflicts.len() as u64)?;
o.write_all(&w)?;
o.flush()?;
} else {
error!("unmatched")
}
buf.clear();
}
let applied_nonempty = !applied.is_empty();
for (_, mut channel) in applied {
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
}
if applied_nonempty {
txn.commit()?;
}
Ok(())
}
}
mod init;
pub use init::Init;
mod clone;
pub use clone::Clone;
mod pushpull;
pub use pushpull::*;
mod log;
pub use self::log::Log;
mod record;
pub use record::Record;
mod diff;
pub use diff::Diff;
mod change;
pub use change::Change;
mod protocol;
pub use protocol::Protocol;
#[cfg(feature = "git")]
mod git;
#[cfg(feature = "git")]
pub use git::Git;
mod channel;
pub use channel::*;
mod reset;
pub use reset::*;
mod fork;
pub use fork::*;
mod unrecord;
pub use unrecord::*;
mod file_operations;
pub use file_operations::*;
mod apply;
pub use apply::*;
mod archive;
pub use archive::*;
mod credit;
pub use credit::*;
#[cfg(debug_assertions)]
mod debug;
#[cfg(debug_assertions)]
pub use debug::*;
mod upgrade;
pub use upgrade::*;
use crate::repository::Repository;
use crate::Error;
use libpijul::changestore::*;
use libpijul::pristine::Base32;
use libpijul::TxnTExt;
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Log {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(long = "hash-only")]
hash_only: bool,
#[clap(long = "state")]
states: bool,
#[clap(long = "description")]
descriptions: bool,
}
impl Log {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
let txn = repo.pristine.txn_begin()?;
use libpijul::pristine::TxnT;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let channel = if let Some(channel) = txn.load_channel(channel_name) {
channel
} else {
return Err((Error::NoSuchChannel {
channel: channel_name.to_string(),
})
.into());
};
let changes = repo.changes;
let mut stdout = std::io::stdout();
if self.hash_only {
for h in libpijul::change::full_dependencies(&txn, &channel).0 {
writeln!(stdout, "{}", h.to_base32())?
}
} else {
let states = self.states;
for (_, (h, mrk)) in txn.reverse_log(&channel.borrow(), None) {
let change = changes.get_change(&h)?;
writeln!(stdout, "Change {}", h.to_base32())?;
writeln!(stdout, "Author: {:?}", change.header.authors)?;
writeln!(stdout, "Date: {}", change.header.timestamp)?;
if states {
writeln!(stdout, "State: {}", mrk.to_base32())?;
}
writeln!(stdout, "\n {}\n", change.header.message)?;
if self.descriptions {
if let Some(ref descr) = change.header.description {
writeln!(stdout, "\n {}\n", descr)?;
}
}
}
}
Ok(())
}
}
use crate::repository::*;
use libpijul::pristine::MutTxnT;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Init {
#[clap(long = "channel")]
channel: Option<String>,
path: Option<PathBuf>,
}
impl Init {
pub fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::init(self.path)?;
let mut txn = repo.pristine.mut_txn_begin();
let channel_name = self.channel.unwrap_or(crate::DEFAULT_CHANNEL.to_string());
txn.open_or_create_channel(&channel_name)?;
repo.config.current_channel = Some(channel_name);
repo.save_config()?;
txn.commit()?;
Ok(())
}
}
use crate::repository::*;
use libpijul::pristine::*;
use libpijul::*;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::rc::Rc;
#[derive(Clap, Debug)]
pub struct Git {
#[clap(long = "stats")]
stats: Option<PathBuf>,
pub repo_path: Option<PathBuf>,
#[clap(default_value = "0")]
check: usize,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Pijul channel changed since last import. Please unrecord channel {} to state {}", channel, state.to_base32())]
MerkleChanged {
channel: String,
state: libpijul::pristine::Merkle,
},
}
struct OpenRepo {
repo: Repository,
stats: Option<std::fs::File>,
n: usize,
check: usize,
current_commit: Option<git2::Oid>,
}
impl Git {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = if let Ok(repo) = Repository::find_root(self.repo_path.clone()) {
repo
} else {
Repository::init(self.repo_path.clone())?
};
let git = git2::Repository::open(&repo.path)?;
let head = git.head()?;
info!("Loading history…");
let oid = head.target().unwrap();
let mut path_git = repo.path.join(libpijul::DOT_DIR);
path_git.push("git");
std::fs::create_dir_all(&path_git)?;
let mut env_git = ::sanakirja::Env::new(&path_git, 1 << 15)?;
let dag = Dag::dfs(&git, oid, &mut env_git)?;
trace!(target: "dag", "{:?}", dag);
info!("Done");
let mut pristine = repo.path.join(DOT_DIR);
pristine.push(PRISTINE_DIR);
std::fs::create_dir_all(&pristine)?;
let mut repo = OpenRepo {
repo,
stats: self.stats.and_then(|f| std::fs::File::create(f).ok()),
n: 0,
check: self.check,
current_commit: None,
};
import(&git, &mut env_git, &mut repo, &dag)?;
Ok(())
}
}
#[derive(Debug)]
struct Dag {
children: BTreeMap<git2::Oid, Vec<git2::Oid>>,
parents: BTreeMap<git2::Oid, Vec<git2::Oid>>,
root: Vec<(git2::Oid, Option<libpijul::pristine::Merkle>)>,
}
impl Dag {
/// Load a Git repository in memory. The main reason this is
/// needed is to compute the *backward* relations from a commit to
/// its parents.
fn dfs(
git: &git2::Repository,
oid: git2::Oid,
env_git: &mut ::sanakirja::Env,
) -> Result<Self, anyhow::Error> {
use ::sanakirja::Transaction;
let mut stack = vec![git.find_commit(oid)?];
let mut oids_set = BTreeSet::new();
let mut dag = Dag {
children: BTreeMap::new(),
parents: BTreeMap::new(),
root: Vec::new(),
};
oids_set.insert(oid.clone());
let mut txn_git = ::sanakirja::Env::mut_txn_begin(env_git)?;
let db: ::sanakirja::Db<git2::Oid, libpijul::pristine::Merkle> =
if let Some(db) = txn_git.root(0) {
db
} else {
txn_git.create_db()?
};
let mut state = HashMap::new();
for (commit, merk) in txn_git.iter(&db, None) {
state.insert(commit, merk);
}
debug!("state = {:?}", state);
while let Some(commit) = stack.pop() {
if let Some(state) = state.get(&commit.id()) {
dag.root.push((commit.id(), Some(*state)));
continue;
}
let mut has_parents = false;
for p in commit.parents() {
trace!("parent {:?}", p);
dag.children
.entry(p.id())
.or_insert(Vec::new())
.push(commit.id());
dag.parents
.entry(commit.id())
.or_insert(Vec::new())
.push(p.id());
if oids_set.insert(p.id()) {
stack.push(p);
}
has_parents = true
}
if !has_parents {
dag.root.push((commit.id(), None))
}
}
txn_git.set_root(0, db);
::sanakirja::Commit::commit(txn_git)?;
Ok(dag)
}
fn collect_dead_parents<T: MutTxnTExt>(
&self,
oid: &git2::Oid,
todo: &mut Todo,
txn: &mut T,
) -> Result<(), anyhow::Error> {
if let Some(parents) = self.parents.get(oid) {
debug!("parents {:?}", parents);
for p in parents {
let rc = todo.refs.get_mut(p).unwrap();
*rc -= 1;
if *rc == 0 {
let p_name = format!("{}", p);
debug!("dropping channel {:?}", p_name);
txn.drop_channel(&p_name)?;
}
}
}
Ok(())
}
fn insert_children_in_todo(&self, oid: &git2::Oid, todo: &mut Todo) {
if let Some(c) = self.children.get(&oid) {
for child in c {
debug!("child = {:?}", c);
if todo.next_todo_set.insert(*child) {
todo.next_todo.push(*child);
}
*todo.refs.entry(*oid).or_insert(0) += 1;
}
} else {
debug!("no children")
}
}
}
#[derive(Debug)]
struct Todo {
todo: Vec<git2::Oid>,
todo_set: HashSet<git2::Oid>,
next_todo: Vec<git2::Oid>,
next_todo_set: HashSet<git2::Oid>,
// For each key k, number of items in the union of todo and
// next_todo that have k as a parent. Moreover, all commits that
// were imported are in this map.
refs: HashMap<git2::Oid, usize>,
}
impl Todo {
fn new() -> Self {
Todo {
todo: Vec::new(),
todo_set: HashSet::new(),
next_todo: Vec::new(),
next_todo_set: HashSet::new(),
refs: HashMap::new(),
}
}
fn swap_next(&mut self, todo: Vec<git2::Oid>) {
self.todo = todo;
std::mem::swap(&mut self.todo, &mut self.next_todo);
self.todo_set.clear();
std::mem::swap(&mut self.todo_set, &mut self.next_todo_set);
}
fn insert_next(&mut self, oid: git2::Oid) {
if self.next_todo_set.insert(oid) {
self.next_todo.push(oid)
}
}
fn is_empty(&self) -> bool {
self.todo.is_empty()
}
fn all_processed(&self, parents: &[git2::Oid]) -> bool {
parents.iter().all(|x| self.refs.contains_key(x))
}
}
/// Import the entire Git DAG into Pijul.
fn import(
git: &git2::Repository,
env_git: &mut ::sanakirja::Env,
repo: &mut OpenRepo,
dag: &Dag,
) -> Result<(), anyhow::Error> {
let mut ws = libpijul::ApplyWorkspace::new();
let mut todo = Todo::new();
let txn = repo.repo.pristine.mut_txn_begin();
for &(oid, merkle) in dag.root.iter() {
if let Some(merkle) = merkle {
let oid_ = format!("{}", oid);
let channel = txn.load_channel(&oid_).unwrap();
let (_, (_, merkle_)) = txn
.changeid_rev_log(&channel.borrow(), None)
.next()
.unwrap();
if merkle != merkle_ {
return Err((Error::MerkleChanged {
channel: oid_,
state: merkle,
})
.into());
}
if let Some(children) = dag.children.get(&oid) {
*todo.refs.entry(oid).or_insert(0) += children.len();
for c in children.iter() {
todo.insert_next(*c);
}
}
} else {
todo.insert_next(oid);
if let Some(parents) = dag.parents.get(&oid) {
for p in parents.iter() {
*todo.refs.entry(*p).or_insert(0) += 1;
}
}
}
}
std::mem::drop(txn);
todo.swap_next(Vec::new());
while !todo.is_empty() {
info!("TODO: {:?}", todo);
let mut todo_ = std::mem::replace(&mut todo.todo, Vec::new());
{
let mut txn = repo.repo.pristine.mut_txn_begin();
let mut draining = todo_.drain(..);
while let Some(oid) = draining.next() {
let mut channel = if let Some(parents) = dag.parents.get(&oid) {
// If we don't have all the parents, continue.
if !todo.all_processed(&parents) {
todo.insert_next(oid);
continue;
}
let first_parent = parents.iter().next().unwrap();
let parent_name = format!("{}", first_parent);
let parent_channel = txn.load_channel(&parent_name).unwrap();
let name = format!("{}", oid);
let channel = txn.fork(&parent_channel, &name)?;
channel
} else {
// Create a new channel for this commit.
let name = format!("{}", oid);
let channel = txn.open_or_create_channel(&name)?;
channel
};
let mut stats = Stats::new(oid);
import_commit_parents(
repo,
dag,
&mut txn,
&mut channel,
&oid,
&mut ws,
&mut stats,
)?;
let state = import_commit(git, repo, &mut txn, &mut channel, &oid, &mut stats)?;
save_state(env_git, &oid, state)?;
dag.collect_dead_parents(&oid, &mut todo, &mut txn)?;
dag.insert_children_in_todo(&oid, &mut todo);
if let Some(ref mut f) = repo.stats {
stats.write(repo.n, &repo.repo.path, f)?
}
// Just add the remaining commits to the todo list,
// because we prefer to move each channel as far as
// possible before switching channels.
while let Some(oid) = draining.next() {
todo.insert_next(oid)
}
}
txn.commit()?;
}
todo.swap_next(todo_)
}
Ok(())
}
fn save_state(
git: &mut ::sanakirja::Env,
oid: &git2::Oid,
state: libpijul::pristine::Merkle,
) -> Result<(), anyhow::Error> {
use ::sanakirja::{Commit, Transaction};
let mut txn = ::sanakirja::Env::mut_txn_begin(git)?;
let mut db: ::sanakirja::Db<git2::Oid, libpijul::pristine::Merkle> =
if let Some(db) = txn.root(0) {
db
} else {
txn.create_db()?
};
txn.put(&mut rand::thread_rng(), &mut db, *oid, state)?;
txn.set_root(0, db);
txn.commit()?;
Ok(())
}
fn make_apply_plan<T: TxnTExt>(
repo: &OpenRepo,
txn: &T,
channel: &ChannelRef<T>,
dag: &Dag,
oid: &git2::Oid,
) -> (bool, Vec<(libpijul::pristine::Hash, u64)>) {
let mut to_apply = Vec::new();
let mut to_apply_set = BTreeSet::new();
let mut needs_output = false;
if let Some(parents) = dag.parents.get(&oid) {
for p in parents {
// If one of the parents is not the repo's current commit,
// then we're doing either a merge or a checkout of
// another branch. If that is the case, we need to output
// the entire repository to update the
// tree/revtree/inodes/revinodes tables.
if let Some(current_commit) = repo.current_commit {
if current_commit != *p {
needs_output = true
}
}
let p_name = format!("{}", p);
let p_channel = txn.load_channel(&p_name).unwrap();
for (n, (h, _)) in txn.log(&p_channel.borrow(), 0) {
if txn.has_change(&channel, h).is_none() {
if to_apply_set.insert(h) {
to_apply.push((h, n));
}
}
}
}
} else {
needs_output = true
}
// Since we're pulling from multiple channels, the change numbers
// are not necessarily in order (especially since we've
// de-duplicated using `to_apply_set`.
to_apply.sort_by(|a, b| a.1.cmp(&b.1));
(needs_output, to_apply)
}
/// Apply the changes corresponding to a commit's parents to `channel`.
fn import_commit_parents<T: TxnTExt + MutTxnTExt>(
repo: &mut OpenRepo,
dag: &Dag,
txn: &mut T,
channel: &mut ChannelRef<T>,
oid: &git2::Oid,
ws: &mut libpijul::ApplyWorkspace,
stats: &mut Stats,
) -> Result<(), anyhow::Error> {
// Apply all the parent's logs to `channel`
let (needs_output, to_apply) = make_apply_plan(repo, txn, channel, dag, oid);
let parent_application_time = std::time::Instant::now();
for h in to_apply.iter() {
debug!("to_apply {:?}", h)
}
for (h, _) in to_apply.iter() {
info!("applying {:?} to {:?}", h, channel.borrow().name());
txn.apply_change_ws(&repo.repo.changes, channel, *h, ws)?;
if repo.check > 0 && repo.n % repo.check == 0 {
check_alive(&repo.repo.changes, txn, channel, line!())?;
}
}
if repo.check > 0 && repo.n % repo.check == 0 {
check_alive(&repo.repo.changes, txn, channel, line!())?;
}
stats.parent_application_time = if to_apply.is_empty() {
std::time::Duration::from_secs(0)
} else {
parent_application_time.elapsed()
};
if repo.check > 0 && repo.n % repo.check == 0 && !to_apply.is_empty() {
txn.check_channel_log(&channel);
}
debug!(
"last_recorded {:?}, name {:?}",
repo.repo.config.current_channel,
channel.borrow().name()
);
stats.output_time = if !to_apply.is_empty() || needs_output {
debug!("outputting");
let output_time = std::time::Instant::now();
txn.output_repository_no_pending(
&mut repo.repo.working_copy,
&repo.repo.changes,
channel,
"",
false,
)?;
let t = output_time.elapsed();
if repo.check > 0 && repo.n % repo.check == 0 {
check_alive(&repo.repo.changes, txn, channel, line!())?;
}
t
} else {
std::time::Duration::from_secs(0)
};
if repo.check > 0 && repo.n % repo.check == 0 {
check_tree_inodes(txn, &channel.borrow());
}
Ok(())
}
/// Reset to the Git commit specified by `child`, telling Pijul which
/// files were moved in the reset.
fn git_reset<'a, T: TxnTExt + MutTxnTExt>(
git: &'a git2::Repository,
repo: &mut OpenRepo,
txn: &mut T,
channel: &mut ChannelRef<T>,
child: &git2::Oid,
stats: &mut Stats,
) -> Result<(git2::Object<'a>, BTreeSet<PathBuf>), anyhow::Error> {
// Reset the Git branch.
debug!("resetting the git branch to {:?}", child);
let reset_time = std::time::Instant::now();
let object = git.find_object(*child, None)?;
let reset_was_useful = Rc::new(RefCell::new(false));
let mut builder = git2::build::CheckoutBuilder::new();
let repo_path = repo.repo.path.clone();
let reset_was_useful_ = reset_was_useful.clone();
builder
.force()
.remove_untracked(true)
.remove_ignored(true)
.progress(move |file, a, b| {
debug!("Git progress: {:?} {:?} {:?}", file, a, b);
if let Some(file) = file {
let file = repo_path.join(file);
if let Ok(meta) = std::fs::metadata(&file) {
if !meta.file_type().is_symlink() {
*reset_was_useful_.borrow_mut() = true
}
}
}
});
builder.notify(|notif, file, _, _, _| {
info!("Git reset: {:?} {:?}", notif, file);
true
});
git.reset(&object, git2::ResetType::Hard, Some(&mut builder))?;
repo.current_commit = Some(*child);
stats.reset_time = reset_time.elapsed();
debug!("reset done");
let mut prefixes = BTreeSet::new();
{
let commit = object.as_commit().unwrap();
let new_tree = commit.tree().unwrap();
debug!("inspecting commit");
let git_diff_time = std::time::Instant::now();
for parent in commit.parents() {
let old_tree = parent.tree().unwrap();
let mut diff = git
.diff_tree_to_tree(Some(&old_tree), Some(&new_tree), None)
.unwrap();
diff.find_similar(None).unwrap();
let mut moves = Vec::new();
for delta in diff.deltas() {
let old_path = delta.old_file().path().unwrap();
let new_path = delta.new_file().path().unwrap();
match delta.status() {
git2::Delta::Renamed => {
info!(
"mv {:?} {:?}",
old_path.to_string_lossy(),
new_path.to_string_lossy()
);
if let Ok((vertex, _)) = txn.follow_oldest_path(
&repo.repo.changes,
&channel,
&old_path.to_string_lossy(),
) {
if let Some(inode) = txn.get_revinodes(vertex, None) {
if let Some(old_path) = libpijul::fs::inode_filename(txn, inode) {
debug!(
"moving {:?} ({:?}) from {:?} to {:?}",
inode, vertex, old_path, new_path
);
let mut tmp_path = new_path.to_path_buf();
tmp_path.pop();
use rand::Rng;
let s: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(30)
.collect();
tmp_path.push(&s);
if let Err(e) =
txn.move_file(&old_path, &tmp_path.to_string_lossy())
{
error!("{}", e);
} else {
moves.push((tmp_path, new_path));
}
}
}
}
let new_path = new_path.to_path_buf();
prefixes.insert(new_path);
}
git2::Delta::Deleted => {
let old_path = old_path.to_path_buf();
prefixes.insert(old_path);
}
_ => {
if delta.new_file().mode() != git2::FileMode::Link {
debug!("delta old = {:?} new = {:?}", old_path, new_path);
let old_path = old_path.to_path_buf();
let new_path = new_path.to_path_buf();
prefixes.insert(old_path);
prefixes.insert(new_path);
}
}
}
}
debug!("moves = {:?}", moves);
for (a, b) in moves.drain(..) {
if let Err(e) = txn.move_file(&a.to_string_lossy(), &b.to_string_lossy()) {
error!("{}", e);
}
}
}
stats.git_diff_time = git_diff_time.elapsed();
debug!("done inspecting commit");
if prefixes.contains(std::path::Path::new("")) {
prefixes.clear();
}
info!("record prefixes {:?}", prefixes);
}
Ok((object, prefixes))
}
/// Reset to the Git commit specified as `child`, and record the
/// corresponding change in Pijul.
fn import_commit<T: TxnTExt + MutTxnTExt>(
git: &git2::Repository,
repo: &mut OpenRepo,
txn: &mut T,
channel: &mut ChannelRef<T>,
child: &git2::Oid,
stats: &mut Stats,
) -> Result<libpijul::pristine::Merkle, anyhow::Error> {
let (object, prefixes) = git_reset(git, repo, txn, channel, child, stats)?;
for p in prefixes.iter() {
if let Ok(m) = std::fs::metadata(&p) {
if m.is_dir() {
txn.add_dir(p.to_str().unwrap()).unwrap_or(());
} else {
txn.add_file(p.to_str().unwrap()).unwrap_or(());
}
}
}
let commit = object.as_commit().unwrap();
let signature = commit.author();
// Record+Apply
info!("recording on channel {:?}", channel.borrow().name());
let record_time = std::time::Instant::now();
let prefix_vec: Vec<_> = prefixes.into_iter().collect();
let rec = record_apply(
txn,
channel,
&mut repo.repo.working_copy,
&repo.repo.changes,
&repo.repo.path,
&prefix_vec,
libpijul::change::ChangeHeader {
message: commit.message().unwrap().to_string(),
authors: vec![libpijul::change::Author {
name: signature.name().unwrap().to_string(),
email: signature.email().map(|e| e.to_string()),
full_name: None,
}],
description: None,
timestamp: chrono::DateTime::from_utc(
chrono::NaiveDateTime::from_timestamp(signature.when().seconds(), 0),
chrono::Utc,
),
},
);
let (n_actions, hash, state) = match rec {
Ok(x) => x,
Err(e) => match e.downcast() {
Ok(libpijul::Error::ChangeAlreadyOnChannel { hash }) => {
error!("change already on channel: {:?}", hash);
return Ok(txn.current_state(&channel.borrow()).unwrap());
}
Ok(e) => return Err(e.into()),
Err(e) => return Err(e),
},
};
stats.record_time = record_time.elapsed();
if repo.check > 0 && repo.n % repo.check == 0 {
check_alive(&repo.repo.changes, txn, channel, line!())?;
}
stats.n_actions = n_actions;
stats.hash = hash;
if let Some(ref mut cur) = repo.repo.config.current_channel {
cur.clear();
cur.push_str(channel.borrow().name());
} else {
repo.repo.config.current_channel = Some(channel.borrow().name().to_string())
}
repo.repo.save_config()?;
if repo.check > 0 && repo.n % repo.check == 0 {
check_tree_inodes(txn, &channel.borrow());
}
repo.n += 1;
if let Some(state) = state {
Ok(state)
} else {
Ok(txn.current_state(&channel.borrow()).unwrap())
}
}
fn record_apply<T: TxnT + TxnTExt + MutTxnTExt, C: libpijul::changestore::ChangeStore>(
txn: &mut T,
channel: &mut ChannelRef<T>,
working_copy: &mut libpijul::working_copy::FileSystem,
changes: &C,
repo_path: &Path,
prefixes: &[PathBuf],
header: libpijul::change::ChangeHeader,
) -> Result<
(
usize,
Option<libpijul::pristine::Hash>,
Option<libpijul::pristine::Merkle>,
),
anyhow::Error,
> {
let mut state = libpijul::RecordBuilder::new();
working_copy.record_prefixes(txn, channel, changes, &mut state, repo_path, prefixes)?;
let rec = state.finish();
if rec.actions.is_empty() {
return Ok((0, None, txn.current_state(&channel.borrow())));
}
let actions: Vec<_> = rec
.actions
.into_iter()
.map(|rec| rec.globalize(txn))
.collect();
let n = actions.len();
let (dependencies, extra_known) = libpijul::change::dependencies(txn, channel, actions.iter());
let mut change = libpijul::change::LocalChange::make_change(
txn,
channel,
actions,
rec.contents,
header,
Vec::new(),
);
change.dependencies = dependencies;
change.extra_known = extra_known;
debug!("saving change");
let hash = changes.save_change(&change).unwrap();
debug!("saved");
let (_, m) = txn.apply_local_change(channel, &change, hash, &rec.updatables)?;
Ok((n, Some(hash), Some(m)))
}
struct Stats {
child: git2::Oid,
n_changes: usize,
parent_application_time: std::time::Duration,
output_time: std::time::Duration,
reset_time: std::time::Duration,
git_diff_time: std::time::Duration,
record_time: std::time::Duration,
n_actions: usize,
n_files: usize,
n_dirs: usize,
total_size: u64,
changes_size: u64,
pristine_size: u64,
hash: Option<libpijul::pristine::Hash>,
}
impl Stats {
fn new(child: git2::Oid) -> Self {
let z = std::time::Duration::from_secs(0);
Stats {
child,
n_changes: 0,
parent_application_time: z,
output_time: z,
reset_time: z,
git_diff_time: z,
record_time: z,
n_actions: 0,
n_files: 0,
n_dirs: 0,
total_size: 0,
changes_size: 0,
pristine_size: 0,
hash: None,
}
}
fn write(
&mut self,
n: usize,
repo_path: &Path,
f: &mut std::fs::File,
) -> Result<(), anyhow::Error> {
// Count files.
let mut walk = ignore::WalkBuilder::new(&repo_path);
walk.add_ignore(DOT_DIR).unwrap();
for f in walk.build() {
let meta = f?.metadata()?;
if meta.is_dir() {
self.n_dirs += 1
} else {
self.n_files += 1;
self.total_size += meta.len();
}
}
let dot_dir = repo_path.join(DOT_DIR);
let pristine_dir = dot_dir.join(PRISTINE_DIR);
let changes_dir = dot_dir.join(CHANGES_DIR);
if let Ok(walk) = std::fs::read_dir(&pristine_dir) {
for f in walk {
let meta = f?.metadata()?;
self.pristine_size += meta.len();
}
}
if let Ok(walk) = std::fs::read_dir(&changes_dir) {
for f in walk {
let meta = f?.metadata()?;
self.changes_size += meta.len();
self.n_changes += 1
}
}
let timers = libpijul::get_timers();
writeln!(
f, "{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}",
self.child,
n,
self.parent_application_time.as_secs_f64(),
timers.alive_output.as_secs_f64(),
timers.alive_retrieve.as_secs_f64(),
timers.alive_graph.as_secs_f64(),
timers.alive_contents.as_secs_f64(),
timers.alive_write.as_secs_f64(),
timers.apply.as_secs_f64(),
timers.record.as_secs_f64(),
timers.repair_context.as_secs_f64(),
timers.check_cyclic_paths.as_secs_f64(),
timers.find_alive.as_secs_f64(),
self.output_time.as_secs_f64(),
self.reset_time.as_secs_f64(),
self.git_diff_time.as_secs_f64(),
self.record_time.as_secs_f64(),
self.n_actions,
self.n_files,
self.n_dirs,
self.total_size,
self.changes_size,
self.pristine_size,
if let Some(ref h) = self.hash { h.to_base32() } else { String::new() },
)?;
libpijul::reset_timers();
Ok(())
}
}
/// Check that each alive vertex in the graph is reachable, and vice-versa.
fn check_alive<T: TxnT, C: libpijul::changestore::ChangeStore>(
changes: &C,
txn: &T,
channel: &ChannelRef<T>,
line: u32,
) -> Result<(), anyhow::Error> {
let (alive, reachable) = txn.check_alive(&channel);
let mut h = BTreeSet::new();
if !alive.is_empty() {
for (k, file) in alive.iter() {
debug!("alive = {:?}, file = {:?}", k, file);
h.insert(file);
}
}
if !reachable.is_empty() {
for (k, file) in reachable.iter() {
debug!("reachable = {:?}, file = {:?}", k, file);
h.insert(file);
}
}
for file in h.iter() {
let file_ = file.unwrap().start_pos();
let mut f = std::fs::File::create(&format!("debug_{:?}", file_))?;
let graph = libpijul::alive::retrieve::retrieve(txn, &channel.borrow(), file_);
graph.debug(changes, txn, &channel.borrow(), false, false, &mut f)?;
let mut f = std::fs::File::create(&format!("debug_all_{:?}", file_))?;
txn.debug_root(channel, file.unwrap(), &mut f)?;
}
if !h.is_empty() {
panic!("alive call line {}", line);
}
Ok(())
}
/// Check that each inode in the inodes table maps to an alive vertex,
/// and that each inode in the tree table is reachable by only one
/// path.
fn check_tree_inodes<T: TxnT>(txn: &T, channel: &Channel<T>) {
// Sanity check
for (inode, vertex) in txn.iter_inodes() {
let mut inode_ = inode;
while !inode_.is_root() {
if let Some(next) = txn.get_revtree(inode_, None) {
inode_ = next.parent_inode;
} else {
panic!("inode = {:?}, inode_ = {:?}", inode, inode_);
}
}
if !txn.is_alive(&channel, vertex.inode_vertex()) {
for e in txn.iter_adjacent(
&channel,
vertex.inode_vertex(),
EdgeFlags::empty(),
EdgeFlags::all(),
) {
error!("{:?} {:?} {:?}", inode, vertex, e)
}
panic!(
"inode {:?}, vertex {:?}, is not alive, {:?}",
inode,
vertex,
txn.tree_path(vertex)
)
}
}
let mut h = HashMap::new();
let id0 = OwnedPathId {
parent_inode: Inode::ROOT,
basename: libpijul::small_string::SmallString::new(),
};
for (id, inode) in txn.iter_tree(id0, None) {
if let Some(inode_) = h.insert(id.clone(), inode) {
panic!("id {:?} maps to two inodes: {:?} {:?}", id, inode, inode_);
}
}
}
use crate::repository::Repository;
use libpijul::pristine::{MutTxnT, TxnT};
use libpijul::MutTxnTExt;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Fork {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel", conflicts_with = "change")]
channel: Option<String>,
#[clap(long = "change", conflicts_with = "channel")]
change: Option<String>,
to: String,
}
impl Fork {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let mut txn = repo.pristine.mut_txn_begin();
if let Some(ref ch) = self.change {
let (hash, _) = txn.hash_from_prefix(ch)?;
let mut channel = txn.open_or_create_channel(&self.to)?;
txn.apply_change_rec(&repo.changes, &mut channel, hash)?
} else {
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
if let Some(channel) = txn.load_channel(channel_name) {
txn.fork(&channel, &self.to)?;
}
}
txn.commit()?;
Ok(())
}
}
use crate::repository::Repository;
use libpijul::pristine::MutTxnT;
use libpijul::{MutTxnTExt, TxnTExt};
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Mv {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
paths: Vec<PathBuf>,
}
impl Mv {
pub fn run(mut self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
let to = if let Some(to) = self.paths.pop() {
to
} else {
return Ok(());
};
let to = path(&self.repo_path, to);
let is_dir = if let Ok(m) = std::fs::metadata(&to) {
m.is_dir()
} else {
false
};
if !is_dir && self.paths.len() > 1 {
return Ok(());
}
let mut txn = repo.pristine.mut_txn_begin();
for p in self.paths {
debug!("p = {:?}", p);
let source = std::fs::canonicalize(&path(&self.repo_path, p.clone()))?;
let target = if is_dir { to.join(p) } else { to.clone() };
debug!("target = {:?}", target);
std::fs::rename(&source, &target)?;
let target = std::fs::canonicalize(&target)?;
let source = source.strip_prefix(&repo.path)?;
let target = target.strip_prefix(&repo.path)?;
debug!("moving {:?} -> {:?}", source, target);
txn.move_file(&source.to_string_lossy(), &target.to_string_lossy())?
}
txn.commit()?;
Ok(())
}
}
fn path(root: &Option<PathBuf>, path: PathBuf) -> PathBuf {
if let Some(ref p) = root {
p.join(path)
} else {
path
}
}
#[derive(Clap, Debug)]
pub struct Ls {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
}
impl Ls {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
let txn = repo.pristine.txn_begin()?;
let mut stdout = std::io::stdout();
for (_, p) in txn.iter_working_copy() {
writeln!(stdout, "{}", p)?;
}
Ok(())
}
}
#[derive(Clap, Debug)]
pub struct Add {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
paths: Vec<PathBuf>,
}
impl Add {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
let mut txn = repo.pristine.mut_txn_begin();
let mut stderr = std::io::stderr();
for path in self.paths.iter() {
debug!("{:?}", path);
if let Some(p) = path.file_name() {
if let Some(p) = p.to_str() {
if p.ends_with("~") || (p.starts_with("#") && p.ends_with("#")) {
continue;
}
}
}
let path = path.canonicalize()?;
let meta = std::fs::metadata(&path)?;
let path = if let Ok(path) = path.strip_prefix(&repo.path) {
path
} else {
continue;
};
let path_str = path.to_str().unwrap();
if !txn.is_tracked(&path_str) {
writeln!(stderr, "Adding {:?}", path)?;
info!("Adding {:?}", path);
txn.add(&path_str, meta.is_dir())?
}
}
txn.commit()?;
Ok(())
}
}
#[derive(Clap, Debug)]
pub struct Remove {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
paths: Vec<PathBuf>,
}
impl Remove {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
let mut txn = repo.pristine.mut_txn_begin();
for path in self.paths.iter() {
debug!("{:?}", path);
if let Some(p) = path.file_name() {
if let Some(p) = p.to_str() {
if p.ends_with("~") || (p.starts_with("#") && p.ends_with("#")) {
continue;
}
}
}
let path = path.canonicalize()?;
let path = if let Ok(path) = path.strip_prefix(&repo.path) {
path
} else {
continue;
};
let path_str = path.to_str().unwrap();
if txn.is_tracked(&path_str) {
txn.remove_file(&path_str)?;
}
}
txn.commit()?;
Ok(())
}
}
use crate::repository::*;
use libpijul::change::*;
use libpijul::pristine::MutTxnT;
use libpijul::MutTxnTExt;
use std::collections::BTreeMap;
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Diff {
#[clap(long = "repository")]
pub repo_path: Option<PathBuf>,
#[clap(long = "json")]
pub json: bool,
#[clap(long = "channel")]
pub channel: Option<String>,
#[clap(long = "tag")]
pub tag: bool,
pub prefixes: Vec<PathBuf>,
}
impl Diff {
pub fn run(mut self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path.clone())?;
let mut txn = repo.pristine.mut_txn_begin();
let mut stdout = std::io::stdout();
let mut channel =
txn.open_or_create_channel(repo.config.get_current_channel(self.channel.as_ref()))?;
let mut state = libpijul::RecordBuilder::new();
if self.prefixes.is_empty() {
txn.record(
&mut state,
libpijul::Algorithm::default(),
&mut channel,
&mut repo.working_copy,
&repo.changes,
"",
)?
} else {
self.fill_relative_prefixes()?;
repo.working_copy.record_prefixes(
&mut txn,
&mut channel,
&repo.changes,
&mut state,
&repo.path,
&self.prefixes,
)?;
}
let rec = state.finish();
if rec.actions.is_empty() {
return Ok(());
}
let actions = rec
.actions
.into_iter()
.map(|rec| rec.globalize(&txn))
.collect();
let mut change = LocalChange::make_change(
&txn,
&channel,
actions,
rec.contents,
ChangeHeader::default(),
Vec::new(),
);
let (dependencies, extra_known) = if self.tag {
full_dependencies(&txn, &channel)
} else {
dependencies(&txn, &channel, change.changes.iter())
};
change.dependencies = dependencies;
change.extra_known = extra_known;
if self.json {
let mut changes = BTreeMap::new();
for ch in change.changes.iter() {
changes.entry(ch.path()).or_insert(Vec::new()).push(Status {
operation: match ch {
Record::FileMove { .. } => "file move",
Record::FileDel { .. } => "file del",
Record::FileUndel { .. } => "file undel",
Record::SolveNameConflict { .. } => "solve name conflict",
Record::UnsolveNameConflict { .. } => "unsolve name conflict",
Record::FileAdd { .. } => "file add",
Record::Edit { .. } => "edit",
Record::Replacement { .. } => "replacement",
Record::SolveOrderConflict { .. } => "solve order conflict",
Record::UnsolveOrderConflict { .. } => "unsolve order conflict",
Record::ResurrectZombies { .. } => "resurrect zombies",
},
line: ch.line(),
});
}
serde_json::to_writer_pretty(&mut std::io::stdout(), &changes)?;
writeln!(stdout, "")?;
} else {
change.write(
&repo.changes,
None,
|local: &libpijul::change::Local, _| -> String {
format!("{}:{}", local.path, local.line)
},
true,
&mut std::io::stdout(),
)?
}
Ok(())
}
fn fill_relative_prefixes(&mut self) -> Result<(), anyhow::Error> {
let cwd = std::env::current_dir()?;
for p in self.prefixes.iter_mut() {
if p.is_relative() {
*p = cwd.join(&p);
}
}
Ok(())
}
}
#[derive(Debug, Serialize)]
struct Status {
operation: &'static str,
line: Option<usize>,
}
use crate::repository::Repository;
use crate::Error;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Debug {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
}
impl Debug {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
let txn = repo.pristine.txn_begin()?;
use libpijul::pristine::TxnT;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let channel = if let Some(channel) = txn.load_channel(&channel_name) {
channel
} else {
return Err((Error::NoSuchChannel {
channel: channel_name.to_string(),
})
.into());
};
let channel = channel.borrow();
txn.debug(&channel, std::io::stdout())?;
Ok(())
}
}
use crate::repository::Repository;
use crate::Error;
use libpijul::pristine::{ChangeId, Channel, EdgeFlags, TxnT, Vertex};
use libpijul::vertex_buffer::VertexBuffer;
use libpijul::TxnTExt;
use std::collections::HashSet;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Credit {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
file: PathBuf,
}
impl Credit {
pub fn run(self) -> Result<(), anyhow::Error> {
let has_repo_path = self.repo_path.is_some();
let mut repo = Repository::find_root(self.repo_path)?;
let txn = repo.pristine.txn_begin()?;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let channel = if let Some(channel) = txn.load_channel(&channel_name) {
channel
} else {
return Err((Error::NoSuchChannel {
channel: channel_name.to_string(),
})
.into());
};
if self.channel.is_some() {
repo.config.current_channel = self.channel;
repo.save_config()?;
}
let (pos, _ambiguous) = if has_repo_path {
let root = std::fs::canonicalize(repo.path.join(&self.file))?;
let path = root.strip_prefix(&repo.path)?.to_str().unwrap();
txn.follow_oldest_path(&repo.changes, &channel, &path)?
} else {
let path = self.file.to_str().unwrap();
txn.follow_oldest_path(&repo.changes, &channel, &path)?
};
let channel_ = channel.borrow();
txn.output_file(
&repo.changes,
&channel,
pos,
&mut Creditor::new(std::io::stdout(), &txn, &channel_),
)?;
Ok(())
}
}
pub struct Creditor<'a, W: std::io::Write, T: TxnT> {
w: W,
buf: Vec<u8>,
new_line: bool,
changes: HashSet<ChangeId>,
txn: &'a T,
channel: &'a Channel<T>,
}
impl<'a, W: std::io::Write, T: TxnT> Creditor<'a, W, T> {
pub fn new(w: W, txn: &'a T, channel: &'a Channel<T>) -> Self {
Creditor {
w,
new_line: true,
buf: Vec::new(),
txn,
channel,
changes: HashSet::new(),
}
}
}
impl<'a, W: std::io::Write, T: TxnT> VertexBuffer for Creditor<'a, W, T> {
fn output_line<C: FnOnce(&mut Vec<u8>) -> Result<(), anyhow::Error>>(
&mut self,
v: Vertex<ChangeId>,
c: C,
) -> Result<(), anyhow::Error> {
debug!("outputting vertex {:?}", v);
self.buf.clear();
c(&mut self.buf)?;
use libpijul::pristine::Base32;
if !v.change.is_root() {
self.changes.clear();
for e in self
.txn
.iter_adjacent(self.channel, v, EdgeFlags::PARENT, EdgeFlags::all())
{
self.changes.insert(e.introduced_by);
}
if !self.new_line {
write!(self.w, "\n")?;
}
let mut is_first = true;
for c in self.changes.drain() {
write!(
self.w,
"{}{}",
if is_first { "" } else { ", " },
c.to_base32()
)?;
is_first = false;
}
writeln!(self.w, "")?;
}
let ends_with_newline = self.buf.ends_with(b"\n");
if let Ok(s) = std::str::from_utf8(&self.buf[..]) {
for l in s.lines() {
self.w.write_all(b"> ")?;
self.w.write_all(l.as_bytes())?;
self.w.write_all(b"\n")?;
}
}
if !self.buf.is_empty() {
// empty "lines" (such as in the beginning of a file)
// don't change the status of self.new_line.
self.new_line = ends_with_newline;
}
Ok(())
}
fn output_conflict_marker(&mut self, s: &str) -> Result<(), anyhow::Error> {
if !self.new_line {
self.w.write(s.as_bytes())?;
} else {
self.w.write(&s.as_bytes()[1..])?;
}
Ok(())
}
}
use crate::repository::*;
use libpijul::pristine::MutTxnT;
use libpijul::MutTxnTExt;
use std::path::PathBuf;
use tempfile::TempDir;
#[derive(Clap, Debug)]
pub struct Clone {
#[clap(long = "lazy", about = "only download changes with alive contents")]
lazy: bool,
#[clap(long = "channel", about = "set the remote channel", default_value = crate::DEFAULT_CHANNEL)]
channel: String,
#[clap(
long = "change",
about = "clone this change and its dependencies",
conflicts_with = "state"
)]
change: Option<String>,
#[clap(long = "state", about = "clone this state", conflicts_with = "change")]
state: Option<String>,
#[clap(long = "path", about = "clone this path", multiple(true))]
partial_paths: Vec<String>,
#[clap(short = 'k', about = "Do not check certificates")]
no_cert_check: bool,
remote: String,
path: Option<PathBuf>,
}
impl Clone {
pub async fn run(self) -> Result<(), anyhow::Error> {
let mut remote =
crate::remote::unknown_remote(&self.remote, &self.channel, self.no_cert_check).await?;
let path = if let Some(path) = self.path {
if path.is_relative() {
let mut p = std::env::current_dir()?;
p.push(path);
p
} else {
path
}
} else if let Some(path) = remote.repo_name() {
let mut p = std::env::current_dir()?;
p.push(path);
p
} else {
return Err((crate::Error::CouldNotInferRepositoryName { repo: self.remote }).into());
};
debug!("path = {:?}", path);
let parent = std::fs::canonicalize(path.parent().unwrap())?;
let temp = TempDir::new_in(&parent)?;
debug!("temp = {:?}", temp.path());
let mut repo = Repository::init(Some(temp.path().to_path_buf()))?;
let mut txn = repo.pristine.mut_txn_begin();
let mut channel = txn.open_or_create_channel(&self.channel)?;
if let Some(ref change) = self.change {
let h = change.parse()?;
remote
.clone_tag(&mut repo, &mut txn, &mut channel, &[h])
.await?
} else if let Some(ref state) = self.state {
let h = state.parse()?;
remote
.clone_state(&mut repo, &mut txn, &mut channel, h, self.lazy)
.await?
} else {
remote
.clone_channel(
&mut repo,
&mut txn,
&mut channel,
self.lazy,
&self.partial_paths,
)
.await?;
}
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
txn.commit()?;
repo.config.current_channel = Some(self.channel);
repo.save_config()?;
std::fs::rename(&temp.into_path(), &path)?;
Ok(())
}
}
use crate::repository::Repository;
use libpijul::pristine::{MutTxnT, TxnT};
use libpijul::MutTxnTExt;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Checkout {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
channel: String,
}
impl Checkout {
pub fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path)?;
debug!("{:?}", repo.config);
let mut txn = repo.pristine.mut_txn_begin();
if let Some(mut channel) = txn.load_channel(&self.channel) {
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
}
txn.commit()?;
repo.config.current_channel = Some(self.channel);
repo.save_config()?;
Ok(())
}
}
use crate::repository::Repository;
use libpijul::pristine::{MutTxnT, TxnT};
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Channel {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(subcommand)]
subcmd: Option<SubCommand>,
}
#[derive(Clap, Debug)]
pub enum SubCommand {
#[clap(name = "delete")]
Delete { delete: String },
#[clap(name = "rename")]
Rename { from: String, to: Option<String> },
}
impl Channel {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path)?;
let mut stdout = std::io::stdout();
let current = if let Some(ref c) = repo.config.current_channel {
Some(c.as_str())
} else {
None
};
match self.subcmd {
None => {
let txn = repo.pristine.txn_begin()?;
for channel in txn.iter_channels("") {
let channel = channel.borrow();
let name = channel.name();
if current == Some(name) {
writeln!(stdout, "* {}", name)?;
} else {
writeln!(stdout, " {}", name)?;
}
}
}
Some(SubCommand::Delete { ref delete }) => {
let mut txn = repo.pristine.mut_txn_begin();
txn.drop_channel(delete)?;
txn.commit()?;
}
Some(SubCommand::Rename { ref from, ref to }) => {
let mut txn = repo.pristine.mut_txn_begin();
let (from, to) = if let Some(to) = to {
(from.as_str(), to.as_str())
} else if let Some(current) = current {
(current, from.as_str())
} else {
return Err(crate::Error::NoCurrentChannel.into());
};
let mut channel = if let Some(channel) = txn.load_channel(from) {
channel
} else {
return Err((crate::Error::ChannelNotFound {
channel: from.to_string(),
})
.into());
};
txn.rename_channel(&mut channel, to)?;
txn.commit()?;
}
}
Ok(())
}
}
use crate::repository::*;
use libpijul::change::Local;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::*;
use libpijul::*;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Change {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
hash: Option<String>,
}
impl Change {
pub fn run(self) -> Result<(), anyhow::Error> {
let repo = Repository::find_root(self.repo_path.clone())?;
let txn = repo.pristine.txn_begin()?;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let channel = txn.load_channel(channel_name).unwrap();
let changes = repo.changes;
let hash = if let Some(hash) = self.hash {
txn.hash_from_prefix(&hash)?.0
} else if let Some((_, (h, _))) = txn.reverse_log(&channel.borrow(), None).next() {
h
} else {
return Ok(());
};
let change = changes.get_change(&hash).unwrap();
let file_name = |l: &Local, inode: Position<Option<Hash>>| -> String {
if txn.get_revchanges(&channel, hash).is_some() {
let inode = Position {
change: inode.change.unwrap_or(hash),
pos: inode.pos,
};
format!(
"{}:{}",
txn.find_youngest_path(&changes, &channel, inode).unwrap().0,
l.line
)
} else {
format!("{}:{}", l.path, l.line)
}
};
let o = std::io::stdout();
let mut o = o.lock();
change.write(&changes, Some(hash), file_name, true, &mut o)?;
Ok(())
}
}
use crate::repository::Repository;
use libpijul::pristine::{Hash, Merkle, TxnT};
use libpijul::{MutTxnTExt, TxnTExt};
use std::io::Write;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Archive {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(long = "remote")]
remote: Option<String>,
#[clap(short = 'k', about = "Do not check certificates")]
no_cert_check: bool,
#[clap(long = "state")]
state: Option<String>,
#[clap(long = "change", multiple = true)]
change: Vec<String>,
#[clap(long = "prefix")]
prefix: Option<String>,
#[clap(short = 'o')]
name: String,
}
impl Archive {
pub async fn run(self) -> Result<(), anyhow::Error> {
let state: Option<Merkle> = if let Some(ref state) = self.state {
Some(state.parse()?)
} else {
None
};
let mut extra: Vec<Hash> = Vec::new();
for h in self.change.iter() {
extra.push(h.parse()?);
}
if let Some(ref rem) = self.remote {
debug!("unknown");
let mut remote = crate::remote::unknown_remote(
rem,
if let Some(ref channel) = self.channel {
channel
} else {
crate::DEFAULT_CHANNEL
},
self.no_cert_check,
)
.await?;
let mut p = std::path::Path::new(&self.name).to_path_buf();
if !self.name.ends_with(".tar.gz") {
p.set_extension("tar.gz");
}
let mut f = std::fs::File::create(&p)?;
remote
.archive(self.prefix, state.map(|x| (x, &extra[..])), &mut f)
.await?;
} else if let Ok(repo) = Repository::find_root(self.repo_path.clone()) {
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let mut p = std::path::Path::new(&self.name).to_path_buf();
if !self.name.ends_with(".tar.gz") {
p.set_extension("tar.gz");
}
let mut f = std::fs::File::create(&p)?;
let mut tarball = libpijul::output::Tarball::new(&mut f, self.prefix);
let conflicts = if let Some(state) = state {
let mut txn = repo.pristine.mut_txn_begin();
let mut channel = txn.load_channel(&channel_name).unwrap();
txn.archive_with_state(
&repo.changes,
&mut channel,
state,
&extra[..],
&mut tarball,
)?
} else {
let txn = repo.pristine.txn_begin()?;
let channel = txn.load_channel(&channel_name).unwrap();
txn.archive(&repo.changes, &channel, &mut tarball)?
};
if !conflicts.is_empty() {
writeln!(
std::io::stderr(),
"There were {} conflicts",
conflicts.len()
)?
}
}
Ok(())
}
}
use crate::repository::Repository;
use crate::Error;
use libpijul::changestore::ChangeStore;
use libpijul::pristine::MutTxnT;
use libpijul::MutTxnTExt;
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Clap, Debug)]
pub struct Apply {
#[clap(long = "repository")]
repo_path: Option<PathBuf>,
#[clap(long = "channel")]
channel: Option<String>,
#[clap(long = "deps-only")]
deps_only: bool,
change: Option<String>,
}
impl Apply {
pub fn run(self) -> Result<(), anyhow::Error> {
let mut repo = Repository::find_root(self.repo_path)?;
let mut txn = repo.pristine.mut_txn_begin();
use libpijul::pristine::TxnT;
let channel_name = repo.config.get_current_channel(self.channel.as_ref());
let mut channel = if let Some(channel) = txn.load_channel(&channel_name) {
channel
} else if self.change.is_some() {
txn.open_or_create_channel(&channel_name)?
} else {
return Err((Error::NoSuchChannel {
channel: channel_name.to_string(),
})
.into());
};
let hash = if let Some(ch) = self.change {
if let Ok(h) = txn.hash_from_prefix(&ch) {
h.0
} else {
let change = libpijul::change::Change::deserialize(&ch, None)?;
repo.changes.save_change(&change)?
}
} else {
let mut change = std::io::BufReader::new(std::io::stdin());
let change = libpijul::change::Change::read(&mut change, &mut HashMap::new())?;
repo.changes.save_change(&change)?
};
if self.deps_only {
txn.apply_deps_rec(&repo.changes, &mut channel, hash)?;
} else {
txn.apply_change_rec(&repo.changes, &mut channel, hash)?;
}
txn.output_repository_no_pending(
&mut repo.working_copy,
&repo.changes,
&mut channel,
"",
true,
)?;
txn.commit()?;
Ok(())
}
}
[package]
name = "pijul"
description = "The sound distributed version control system."
version = "1.0.0-alpha.1"
authors = ["Pierre-Étienne Meunier <pe@pijul.org>"]
edition = "2018"
repository = "https://nest.pijul.com/pijul/pijul"
license = "GPL-2.0"
include = [
"Cargo.toml",
"src",
"src/commands",
"src/commands/log.rs",
"src/commands/protocol.rs",
"src/commands/apply.rs",
"src/commands/debug.rs",
"src/commands/checkout.rs",
"src/commands/file_operations.rs",
"src/commands/clone.rs",
"src/commands/git.rs",
"src/commands/record.rs",
"src/commands/change.rs",
"src/commands/diff.rs",
"src/commands/unrecord.rs",
"src/commands/channel.rs",
"src/commands/init.rs",
"src/commands/mod.rs",
"src/commands/archive.rs",
"src/commands/reset.rs",
"src/commands/fork.rs",
"src/commands/pushpull.rs",
"src/config.rs",
"src/repository.rs",
"src/main.rs",
"src/remote",
"src/remote/local.rs",
"src/remote/ssh.rs",
"src/remote/mod.rs",
]
[features]
git = [ "git2", "sanakirja/git2" ]
default = [ ]
[dependencies]
human-panic = "1.0"
clap = "3.0.0-beta.2"
anyhow = "1.0"
thiserror = "1.0"
libpijul = { version = "1.0.0-alpha.1", features = [ "tarball" ] }
chrono = { version = "0.4" }
ignore = "0.4"
env_logger = "0.8"
log = "0.4"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
toml = "0.5"
tokio = { version = "0.2", features = [ "rt-threaded", "macros", "sync" ] }
thrussh = "0.29"
thrussh-keys = "0.18.2"
cryptovec = "0.5"
reqwest = { version = "0.10", features = [ "stream" ] }
byteorder = "1.3"
sanakirja = "0.13.1"
futures = "0.3"
dirs-next = "2.0"
lazy_static = "1.4"
regex = "1.4"
whoami = "0.9"
rpassword = "5.0"
git2 = { version = "0.13", optional = true }
rand = "0.7"
edit = "0.1"
data-encoding = "2.3"
futures-util = "0.3"
tempfile = "3.1"
// org id jgSEtEI/xIjz/bF+vtGtYbEA9bNIeFWLqnZT+M51S64=
use crate::pristine::InodeMetadata;
#[cfg(feature = "ondisk-repos")]
pub mod filesystem;
#[cfg(feature = "ondisk-repos")]
pub use filesystem::FileSystem;
pub mod memory;
pub use memory::Memory;
pub trait WorkingCopy {
fn create_dir_all(&mut self, path: &str) -> Result<(), anyhow::Error>;
fn file_metadata(&self, file: &str) -> Result<InodeMetadata, anyhow::Error>;
fn read_file(&self, file: &str, buffer: &mut Vec<u8>) -> Result<(), anyhow::Error>;
fn modified_time(&self, file: &str) -> Result<std::time::SystemTime, anyhow::Error>;
fn remove_path(&mut self, name: &str) -> Result<(), anyhow::Error>;
fn rename(&mut self, former: &str, new: &str) -> Result<(), anyhow::Error>;
fn set_permissions(&mut self, name: &str, permissions: u16) -> Result<(), anyhow::Error>;
fn write_file<A, F: FnOnce(&mut dyn std::io::Write) -> Result<A, anyhow::Error>>(
&mut self,
file: &str,
writer: F,
) -> Result<A, anyhow::Error>;
}
// org id 0KxFIPg0ga5vhSwltRkYUH0GqkExY80aPF4KPWp42YI=
use super::*;
use crate::pristine::InodeMetadata;
use std::collections::HashMap;
use std::time::SystemTime;
#[derive(Debug)]
pub struct Memory {
pub files: FileTree,
pub last_modified: SystemTime,
}
#[derive(Debug)]
pub struct FileTree {
children: HashMap<String, Inode>,
}
#[derive(Debug)]
enum Inode {
File {
meta: InodeMetadata,
last_modified: SystemTime,
contents: Vec<u8>,
},
Directory {
meta: InodeMetadata,
last_modified: SystemTime,
children: FileTree,
},
}
impl Memory {
pub fn new() -> Self {
Memory {
files: FileTree {
children: HashMap::new(),
},
last_modified: SystemTime::now(),
}
}
pub fn list_files(&self) -> Vec<String> {
let mut result = Vec::new();
let mut current_files = vec![(String::new(), &self.files)];
let mut next_files = Vec::new();
loop {
if current_files.is_empty() {
break;
}
for (path, tree) in current_files.iter() {
for (name, inode) in tree.children.iter() {
let mut path = path.clone();
crate::path::push(&mut path, name);
match inode {
Inode::File { .. } => {
result.push(path);
}
Inode::Directory { ref children, .. } => {
result.push(path.clone());
next_files.push((path, children))
}
}
}
}
std::mem::swap(&mut current_files, &mut next_files);
next_files.clear();
}
result
}
pub fn add_file(&mut self, file: &str, file_contents: Vec<u8>) {
let file_meta = InodeMetadata::new(0o644, false);
let last = SystemTime::now();
self.add_inode(
file,
Inode::File {
meta: file_meta,
last_modified: last,
contents: file_contents,
},
)
}
pub fn add_dir(&mut self, file: &str) {
let file_meta = InodeMetadata::new(0o755, true);
let last = SystemTime::now();
self.add_inode(
file,
Inode::Directory {
meta: file_meta,
last_modified: last,
children: FileTree {
children: HashMap::new(),
},
},
)
}
fn add_inode(&mut self, file: &str, inode: Inode) {
let mut file_tree = &mut self.files;
let last = SystemTime::now();
self.last_modified = last;
let file = file.split('/').filter(|c| !c.is_empty());
let mut p = file.peekable();
while let Some(f) = p.next() {
if p.peek().is_some() {
let entry = file_tree
.children
.entry(f.to_string())
.or_insert(Inode::Directory {
meta: InodeMetadata::new(0o755, true),
children: FileTree {
children: HashMap::new(),
},
last_modified: last,
});
match *entry {
Inode::Directory {
ref mut children, ..
} => file_tree = children,
_ => panic!("Not a directory"),
}
} else {
file_tree.children.insert(f.to_string(), inode);
break;
}
}
}
fn get_file(&self, file: &str) -> Option<&Inode> {
debug!("get_file {:?}", file);
debug!("repo = {:?}", self);
let mut t = Some(&self.files);
let mut inode = None;
let mut it = file.split('/').filter(|c| !c.is_empty());
while let Some(c) = it.next() {
debug!("c = {:?}", c);
inode = t.take().unwrap().children.get(c);
debug!("inode = {:?}", inode);
match inode {
Some(Inode::Directory { ref children, .. }) => t = Some(children),
_ => break,
}
}
inode
}
fn get_file_mut<'a>(&'a mut self, file: &str) -> Option<&'a mut Inode> {
debug!("get_file_mut {:?}", file);
debug!("repo = {:?}", self);
let mut t = Some(&mut self.files);
let mut it = file.split('/').filter(|c| !c.is_empty()).peekable();
self.last_modified = SystemTime::now();
while let Some(c) = it.next() {
debug!("c = {:?}", c);
let inode_ = t.take().unwrap().children.get_mut(c);
debug!("inode = {:?}", inode_);
if it.peek().is_none() {
return inode_;
}
match inode_ {
Some(Inode::Directory {
ref mut children, ..
}) => t = Some(children),
_ => return None,
}
}
None
}
fn remove_path_(&mut self, path: &str) -> Option<Inode> {
debug!("remove_path {:?}", path);
debug!("repo = {:?}", self);
let mut t = Some(&mut self.files);
let mut it = path.split('/').filter(|c| !c.is_empty());
let mut c = it.next().unwrap();
self.last_modified = SystemTime::now();
loop {
debug!("c = {:?}", c);
let next_c = it.next();
let t_ = t.take().unwrap();
let next_c = if let Some(next_c) = next_c {
next_c
} else {
return t_.children.remove(c);
};
let inode = t_.children.get_mut(c);
c = next_c;
debug!("inode = {:?}", inode);
match inode {
Some(Inode::Directory {
ref mut children, ..
}) => t = Some(children),
_ => return None,
}
}
}
}
impl WorkingCopy for Memory {
fn create_dir_all(&mut self, file: &str) -> Result<(), anyhow::Error> {
if self.get_file(file).is_none() {
let last = SystemTime::now();
self.add_inode(
file,
Inode::Directory {
meta: InodeMetadata::new(0o755, true),
children: FileTree {
children: HashMap::new(),
},
last_modified: last,
},
);
}
Ok(())
}
fn file_metadata(&self, file: &str) -> Result<InodeMetadata, anyhow::Error> {
match self.get_file(file) {
Some(Inode::Directory { meta, .. }) => Ok(*meta),
Some(Inode::File { meta, .. }) => Ok(*meta),
None => Err((crate::Error::FileNotFound {
path: file.to_string(),
})
.into()),
}
}
fn read_file(&self, file: &str, buffer: &mut Vec<u8>) -> Result<(), anyhow::Error> {
match self.get_file(file) {
Some(Inode::Directory { .. }) => panic!("Not a file: {:?}", file),
Some(Inode::File { ref contents, .. }) => {
buffer.extend(contents);
Ok(())
}
None => {
return Err((crate::Error::FileNotFound {
path: file.to_string(),
})
.into())
}
}
}
fn modified_time(&self, _file: &str) -> Result<std::time::SystemTime, anyhow::Error> {
Ok(self.last_modified)
}
fn remove_path(&mut self, path: &str) -> Result<(), anyhow::Error> {
self.remove_path_(path);
Ok(())
}
fn rename(&mut self, old: &str, new: &str) -> Result<(), anyhow::Error> {
debug!("rename {:?} to {:?}", old, new);
if let Some(inode) = self.remove_path_(old) {
self.add_inode(new, inode)
}
Ok(())
}
fn set_permissions(&mut self, file: &str, permissions: u16) -> Result<(), anyhow::Error> {
debug!("set_permissions {:?}", file);
match self.get_file_mut(file) {
Some(Inode::File { ref mut meta, .. }) => {
*meta = InodeMetadata::new(permissions as usize, false);
}
Some(Inode::Directory { ref mut meta, .. }) => {
*meta = InodeMetadata::new(permissions as usize, true);
}
None => panic!("file not found: {:?}", file),
}
Ok(())
}
fn write_file<A, F: FnOnce(&mut dyn std::io::Write) -> Result<A, anyhow::Error>>(
&mut self,
file: &str,
writer: F,
) -> Result<A, anyhow::Error> {
match self.get_file_mut(file) {
Some(Inode::File {
ref mut contents, ..
}) => {
contents.clear();
writer(contents)
}
None => {
let mut contents = Vec::new();
let last_modified = SystemTime::now();
let a = writer(&mut contents)?;
self.add_inode(
file,
Inode::File {
meta: InodeMetadata::new(0o644, false),
contents,
last_modified,
},
);
Ok(a)
}
_ => panic!("not a file: {:?}", file),
}
}
}
// org id c2D/NoY1VKCjNo0OezNVrmuG67Szl/Bfhi3G2Z7tcLU=
use super::*;
use crate::pristine::InodeMetadata;
use ignore::WalkBuilder;
use std::path::{Path, PathBuf};
pub struct FileSystem {
root: PathBuf,
}
pub fn get_prefix(
repo_path: Option<&Path>,
prefix: &Path,
) -> Result<(PathBuf, String), anyhow::Error> {
let mut p = String::new();
let repo = if let Some(repo) = repo_path {
std::fs::canonicalize(repo)?
} else {
std::env::current_dir()?
};
debug!("get prefix {:?}", repo);
let prefix_ = if let Ok(prefix_) = std::fs::canonicalize(repo.join(&prefix)) {
prefix_
} else {
repo.join(&prefix)
};
debug!("get prefix {:?}", prefix_);
if let Ok(prefix) = prefix_.strip_prefix(repo) {
for c in prefix.components() {
if !p.is_empty() {
p.push('/');
}
let c: &std::path::Path = c.as_ref();
p.push_str(&c.to_string_lossy())
}
}
Ok((prefix_, p))
}
impl FileSystem {
pub fn from_root<P: AsRef<Path>>(root: P) -> Self {
FileSystem {
root: root.as_ref().to_path_buf(),
}
}
pub fn record_prefixes<
T: crate::MutTxnTExt + crate::TxnTExt,
C: crate::changestore::ChangeStore,
P: AsRef<Path>,
>(
&mut self,
txn: &mut T,
channel: &mut crate::pristine::ChannelRef<T>,
changes: &C,
state: &mut crate::RecordBuilder,
repo_path: &Path,
prefixes: &[P],
) -> Result<(), anyhow::Error> {
for prefix in prefixes.iter() {
if let Err(e) =
self.record_prefix(txn, channel, changes, state, repo_path, prefix.as_ref())
{
eprintln!("{}", e)
}
}
if prefixes.is_empty() {
if let Err(e) =
self.record_prefix(txn, channel, changes, state, repo_path, Path::new(""))
{
eprintln!("{}", e)
}
}
Ok(())
}
pub fn record_prefix<
T: crate::MutTxnTExt + crate::TxnTExt,
C: crate::changestore::ChangeStore,
>(
&mut self,
txn: &mut T,
channel: &mut crate::pristine::ChannelRef<T>,
changes: &C,
state: &mut crate::RecordBuilder,
repo_path: &Path,
prefix: &Path,
) -> Result<(), anyhow::Error> {
debug!("record_prefix {:?}", prefix);
let repo_path_ = std::fs::canonicalize(repo_path)?;
if let Ok((full, prefix)) = get_prefix(Some(&repo_path), prefix) {
debug!("full = {:?}", full);
let meta = std::fs::metadata(&full);
debug!("meta = {:?}", meta);
debug!("{:?}", full.strip_prefix(&repo_path_));
if let Ok(meta) = meta {
if meta.is_dir() {
let mut walk = WalkBuilder::new(&full);
walk.standard_filters(true);
let walk = walk.build();
for entry in walk {
let entry = entry?;
let p = entry.path();
if let Some(p) = p.file_name() {
if let Some(p) = p.to_str() {
if p.ends_with("~") || (p.starts_with("#") && p.ends_with("#")) {
continue;
}
}
}
debug!("entry path = {:?} {:?}", entry.path(), repo_path);
if let Ok(path) = entry.path().strip_prefix(&repo_path_) {
let path_str = path.to_str().unwrap();
if !txn.is_tracked(&path_str) {
eprintln!("Adding {:?}", path);
info!("Adding {:?}", path);
txn.add(path_str, entry.file_type().unwrap().is_dir())?
} else {
debug!("already tracked {:?}", path_str)
}
} else {
debug!("entry = {:?}", entry.path());
}
}
} else if let Ok(path) = full.strip_prefix(&repo_path_) {
let path_str = path.to_str().unwrap();
if !txn.is_tracked(&path_str) {
eprintln!("Adding {:?}", path);
info!("Adding file {:?}", path);
txn.add(path_str, false)?
}
}
}
debug!("recording from prefix {:?}", prefix);
txn.record(
state,
crate::Algorithm::default(),
channel,
self,
changes,
&prefix,
)?;
debug!("recorded");
}
Ok(())
}
}
impl WorkingCopy for FileSystem {
fn create_dir_all(&mut self, file: &str) -> Result<(), anyhow::Error> {
Ok(std::fs::create_dir_all(&self.root.join(file))?)
}
fn file_metadata(&self, file: &str) -> Result<InodeMetadata, anyhow::Error> {
let attr = std::fs::metadata(&self.root.join(file))?;
let permissions = permissions(&attr).unwrap_or(0o755);
debug!("permissions = {:?}", permissions);
Ok(InodeMetadata::new(permissions & 0o777, attr.is_dir()))
}
fn read_file(&self, file: &str, buffer: &mut Vec<u8>) -> Result<(), anyhow::Error> {
use std::io::Read;
let mut f = std::fs::File::open(&self.root.join(file))?;
f.read_to_end(buffer)?;
Ok(())
}
fn modified_time(&self, file: &str) -> Result<std::time::SystemTime, anyhow::Error> {
let attr = std::fs::metadata(&self.root.join(file))?;
Ok(attr.modified()?)
}
fn remove_path(&mut self, path: &str) -> Result<(), anyhow::Error> {
let path = self.root.join(path);
if let Ok(meta) = std::fs::metadata(&path) {
if let Err(e) = if meta.is_dir() {
std::fs::remove_dir_all(&path)
} else {
std::fs::remove_file(&path)
} {
error!("while deleting {:?}: {:?}", path, e);
}
}
Ok(())
}
fn rename(&mut self, former: &str, new: &str) -> Result<(), anyhow::Error> {
let former = self.root.join(former);
let new = self.root.join(new);
if let Some(p) = new.parent() {
std::fs::create_dir_all(p)?
}
if let Err(e) = std::fs::rename(&former, &new) {
error!("while renaming {:?} to {:?}: {:?}", former, new, e)
}
Ok(())
}
#[cfg(not(windows))]
fn set_permissions(&mut self, name: &str, permissions: u16) -> Result<(), anyhow::Error> {
use std::os::unix::fs::PermissionsExt;
let name = self.root.join(name);
debug!("set_permissions: {:?}", name);
let metadata = std::fs::metadata(&name)?;
let mut current = metadata.permissions();
debug!(
"setting mode for {:?} to {:?} (currently {:?})",
name, permissions, current
);
current.set_mode(permissions as u32);
std::fs::set_permissions(name, current)?;
Ok(())
}
#[cfg(windows)]
fn set_permissions(&mut self, _name: &str, _permissions: u16) -> Result<(), anyhow::Error> {
Ok(())
}
fn write_file<A, F: FnOnce(&mut dyn std::io::Write) -> Result<A, anyhow::Error>>(
&mut self,
file: &str,
writer: F,
) -> Result<A, anyhow::Error> {
let path = self.root.join(file);
if let Some(p) = path.parent() {
std::fs::create_dir_all(p)?
}
std::fs::remove_file(&path).unwrap_or(());
let mut file = std::io::BufWriter::new(std::fs::File::create(&path)?);
writer(&mut file)
}
}
#[cfg(not(windows))]
fn permissions(attr: &std::fs::Metadata) -> Option<usize> {
use std::os::unix::fs::PermissionsExt;
Some(attr.permissions().mode() as usize)
}
#[cfg(windows)]
fn permissions(_: &std::fs::Metadata) -> Option<usize> {
None
}
// org id YHNNzZV5am1BYRE9P9yHBV54v3BJ2n5DcyZHps06YG4=
use crate::pristine::*;
pub const START_MARKER: &'static str = "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n";
pub const SEPARATOR: &'static str = "\n================================\n";
pub const END_MARKER: &'static str = "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n";
/// A trait for outputting keys and their contents. This trait allows
/// to retain more information about conflicts than directly
/// outputting as bytes to a `Write`. The diff algorithm uses that
/// information, for example.
pub trait VertexBuffer {
fn output_line<F: FnOnce(&mut Vec<u8>) -> Result<(), anyhow::Error>>(
&mut self,
key: Vertex<ChangeId>,
contents: F,
) -> Result<(), anyhow::Error>;
fn output_conflict_marker(&mut self, s: &str) -> Result<(), anyhow::Error>;
fn begin_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(START_MARKER)
}
fn begin_zombie_conflict(&mut self) -> Result<(), anyhow::Error> {
self.begin_conflict()
}
fn begin_cyclic_conflict(&mut self) -> Result<(), anyhow::Error> {
self.begin_conflict()
}
fn conflict_next(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(SEPARATOR)
}
fn end_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(END_MARKER)
}
fn end_zombie_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(END_MARKER)
}
fn end_cyclic_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(END_MARKER)
}
}
pub(crate) struct ConflictsWriter<'a, 'b, W: std::io::Write> {
pub w: W,
pub lines: usize,
pub new_line: bool,
pub path: &'b str,
pub conflicts: &'a mut Vec<crate::output::Conflict>,
pub buf: Vec<u8>,
}
impl<'a, 'b, W: std::io::Write> ConflictsWriter<'a, 'b, W> {
pub fn new(w: W, path: &'b str, conflicts: &'a mut Vec<crate::output::Conflict>) -> Self {
ConflictsWriter {
w,
new_line: true,
lines: 1,
path,
conflicts,
buf: Vec::new(),
}
}
}
impl<'a, 'b, W: std::io::Write> std::ops::Deref for ConflictsWriter<'a, 'b, W> {
type Target = W;
fn deref(&self) -> &Self::Target {
&self.w
}
}
impl<'a, 'b, W: std::io::Write> std::ops::DerefMut for ConflictsWriter<'a, 'b, W> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.w
}
}
impl<'a, 'b, W: std::io::Write> VertexBuffer for ConflictsWriter<'a, 'b, W> {
fn output_line<C: FnOnce(&mut Vec<u8>) -> Result<(), anyhow::Error>>(
&mut self,
_: Vertex<ChangeId>,
c: C,
) -> Result<(), anyhow::Error> {
self.buf.clear();
c(&mut self.buf)?;
let ends_with_newline = self.buf.ends_with(b"\n");
self.lines += self.buf.iter().filter(|c| **c == b'\n').count();
self.w.write_all(&self.buf)?;
if !self.buf.is_empty() {
// empty "lines" (such as in the beginning of a file)
// don't change the status of self.new_line.
self.new_line = ends_with_newline;
}
Ok(())
}
fn output_conflict_marker(&mut self, s: &str) -> Result<(), anyhow::Error> {
debug!("output_conflict_marker {:?}", self.new_line);
if !self.new_line {
self.lines += 2;
self.w.write(s.as_bytes())?;
} else {
self.lines += 1;
debug!("{:?}", &s.as_bytes()[1..]);
self.w.write(&s.as_bytes()[1..])?;
}
self.new_line = true;
Ok(())
}
fn begin_conflict(&mut self) -> Result<(), anyhow::Error> {
self.conflicts.push(crate::output::Conflict::Order {
path: self.path.to_string(),
line: self.lines,
});
self.output_conflict_marker(START_MARKER)
}
fn begin_zombie_conflict(&mut self) -> Result<(), anyhow::Error> {
self.conflicts.push(crate::output::Conflict::Zombie {
path: self.path.to_string(),
line: self.lines,
});
self.output_conflict_marker(START_MARKER)
}
fn begin_cyclic_conflict(&mut self) -> Result<(), anyhow::Error> {
self.conflicts.push(crate::output::Conflict::Cyclic {
path: self.path.to_string(),
line: self.lines,
});
self.output_conflict_marker(START_MARKER)
}
}
pub struct Writer<W: std::io::Write> {
w: W,
buf: Vec<u8>,
new_line: bool,
}
impl<W: std::io::Write> Writer<W> {
pub fn new(w: W) -> Self {
Writer {
w,
new_line: true,
buf: Vec::new(),
}
}
pub fn into_inner(self) -> W {
self.w
}
}
impl<W: std::io::Write> std::ops::Deref for Writer<W> {
type Target = W;
fn deref(&self) -> &Self::Target {
&self.w
}
}
impl<W: std::io::Write> std::ops::DerefMut for Writer<W> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.w
}
}
impl<W: std::io::Write> VertexBuffer for Writer<W> {
fn output_line<C: FnOnce(&mut Vec<u8>) -> Result<(), anyhow::Error>>(
&mut self,
_: Vertex<ChangeId>,
c: C,
) -> Result<(), anyhow::Error> {
self.buf.clear();
c(&mut self.buf)?;
let ends_with_newline = self.buf.ends_with(b"\n");
self.w.write_all(&self.buf[..])?;
if !self.buf.is_empty() {
// empty "lines" (such as in the beginning of a file)
// don't change the status of self.new_line.
self.new_line = ends_with_newline;
}
Ok(())
}
fn output_conflict_marker(&mut self, s: &str) -> Result<(), anyhow::Error> {
debug!("output_conflict_marker {:?}", self.new_line);
if !self.new_line {
self.w.write(s.as_bytes())?;
} else {
debug!("{:?}", &s.as_bytes()[1..]);
self.w.write(&s.as_bytes()[1..])?;
}
Ok(())
}
fn begin_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(START_MARKER)
}
fn begin_zombie_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(START_MARKER)
}
fn begin_cyclic_conflict(&mut self) -> Result<(), anyhow::Error> {
self.output_conflict_marker(START_MARKER)
}
}
// org id zbhEFFLH9dRwVwwwwfF92Y0wFF4fPrXmDJAX4GwqE9A=
pub(crate) struct Vector2<A> {
v: Vec<A>,
bounds: Vec<usize>,
}
impl<A> Vector2<A> {
pub(crate) fn new() -> Self {
Vector2 {
v: Vec::new(),
bounds: vec![0],
}
}
pub(crate) fn len(&self) -> usize {
self.bounds.len() - 1
}
pub(crate) fn with_capacities(total: usize, n: usize) -> Self {
let mut bounds = Vec::with_capacity(n);
bounds.push(0);
Vector2 {
v: Vec::with_capacity(total),
bounds,
}
}
pub(crate) fn push_to_last(&mut self, a: A) {
assert!(self.bounds.len() > 1);
*self.bounds.last_mut().unwrap() += 1;
self.v.push(a)
}
pub(crate) fn push(&mut self) {
self.bounds.push(self.v.len())
}
pub(crate) fn last_mut(&mut self) -> Option<&mut [A]> {
if self.bounds.len() >= 2 {
let i = self.bounds.len() - 2;
Some(&mut self.v[self.bounds[i]..self.bounds[i + 1]])
} else {
None
}
}
}
impl<A> std::ops::Index<usize> for Vector2<A> {
type Output = [A];
fn index(&self, i: usize) -> &[A] {
&self.v[self.bounds[i]..self.bounds[i + 1]]
}
}
impl<A> std::ops::IndexMut<usize> for Vector2<A> {
fn index_mut(&mut self, i: usize) -> &mut [A] {
&mut self.v[self.bounds[i]..self.bounds[i + 1]]
}
}
impl<A: std::fmt::Debug> std::fmt::Debug for Vector2<A> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "[")?;
for i in 0..self.bounds.len() - 1 {
if i > 0 {
write!(fmt, ", ")?
}
write!(fmt, "{:?}", &self[i])?
}
write!(fmt, "]")?;
Ok(())
}
}
#[test]
fn test_v2() {
let mut v: Vector2<usize> = Vector2::new();
v.push();
v.push_to_last(0);
v.push_to_last(1);
v.push_to_last(2);
v.push();
v.push_to_last(4);
v.push_to_last(5);
v.push_to_last(6);
assert_eq!(&v[0], &[0, 1, 2][..]);
assert_eq!(&v[1], &[4, 5, 6][..]);
}
#[test]
#[should_panic]
fn test_v2_() {
let w: Vector2<usize> = Vector2::new();
println!("{:?}", &w[0]);
}
// org id hPrt74aGC9MoSCkacm35YMV1UsqOTCIc9O8ApjoChHI=
use crate::change::*;
use crate::changestore::*;
use crate::pristine::*;
use crate::small_string::*;
// org id XWsbPEqsD3paeM/LxVDyIr2Q565JHhSez0T72hx9cHs=
pub fn undo_file_addition<T: MutTxnT>(
txn: &mut T,
change_id: ChangeId,
new_vertex: &NewVertex<Option<Hash>>,
) -> Result<(), anyhow::Error> {
if new_vertex.start == new_vertex.end {
let pos = Position {
change: change_id,
pos: new_vertex.start,
};
if let Some(inode) = txn.get_revinodes(pos, None) {
let inode = inode.to_owned();
txn.del_revinodes(pos, None)?;
txn.del_inodes(inode, None)?;
}
}
Ok(())
}
// org id aM0a1a29Y251MOlVfR8GafvillHOtUZNFlHypSf5e0c=
pub fn undo_file_deletion<T: MutTxnT, P: ChangeStore>(
txn: &mut T,
changes: &P,
channel: &Channel<T>,
change_id: ChangeId,
newedges: &EdgeMap<Option<Hash>>,
) -> Result<(), anyhow::Error> {
for e in newedges.edges.iter().rev() {
assert!(!e.flag.contains(EdgeFlags::PARENT));
let source = txn.find_block_end(&channel, txn.internal_pos(&e.from, change_id)?)?;
if e.flag.contains(EdgeFlags::FOLDER) && e.to.start_pos() == e.to.end_pos() {
let dest = txn.internal_pos(&e.to.start_pos(), change_id)?;
restore(txn, changes, channel, source, dest)?
}
}
Ok(())
}
// org id jpnOhTUKi2/wrb/uh1iJyGyO0F8zmYxoJlnFRItxZo0=
fn restore<T: MutTxnT, P: ChangeStore>(
txn: &mut T,
changes: &P,
channel: &Channel<T>,
source: Vertex<ChangeId>,
dest: Position<ChangeId>,
) -> Result<(), anyhow::Error> {
let mut stack = vec![(source, dest)];
let mut return_value = None;
while let Some((source, dest)) = stack.pop() {
if let Some(parent_inode) = return_value {
return_value = Some(restore_inode(txn, changes, source, dest, parent_inode)?);
continue;
}
let source_parent = txn
.iter_adjacent(
&channel,
source,
EdgeFlags::PARENT | EdgeFlags::FOLDER,
EdgeFlags::all(),
)
.filter(|e| e.flag.contains(EdgeFlags::PARENT | EdgeFlags::FOLDER))
.next()
.unwrap()
.dest;
// org id tgKUYrZqx9aEX8jZs1AQbH/VCnJJAtXyCdfty6XGk3I=
if source_parent.change.is_root() {
return_value = Some(restore_inode(txn, changes, source, dest, Inode::ROOT)?)
} else if let Some(inode) = txn.get_revinodes(source_parent, None) {
return_value = Some(restore_inode(txn, changes, source, dest, inode)?)
} else {
let grandparent = find_youngest_parent(txn, channel, source_parent.inode_vertex())?;
stack.push((source, dest));
stack.push((grandparent, source_parent));
}
}
Ok(())
}
// org id 6G9wZJSePFPM3jnKlWo/js7upKaYQwFWH9ep4l2BR8Y=
fn restore_inode<T: MutTxnT, P: ChangeStore>(
txn: &mut T,
changes: &P,
source: Vertex<ChangeId>,
dest: Position<ChangeId>,
parent_inode: Inode,
) -> Result<Inode, anyhow::Error> {
let inode = crate::fs::create_new_inode(txn);
let mut name = Vec::new();
let (_, basename) = changes.get_file_name(|h| txn.get_external(h), source, &mut name)?;
let basename = SmallString::from_str(basename);
let file_id = OwnedPathId {
parent_inode,
basename,
};
txn.put_tree(file_id.as_file_id(), inode)?;
txn.put_revtree(inode, file_id.as_file_id())?;
txn.replace_inodes(inode, dest)?;
txn.replace_revinodes(dest, inode)?;
Ok(inode)
}
// org id RM0rj9DnLmh6LYZN5VK6ADMYoieH0mZJcaFzBEZzf9c=
fn find_youngest_parent<T: TxnT>(
txn: &T,
channel: &Channel<T>,
current: Vertex<ChangeId>,
) -> Result<Vertex<ChangeId>, crate::Error> {
let mut next = None;
for e in txn
.iter_adjacent(
channel,
current,
EdgeFlags::FOLDER | EdgeFlags::PARENT,
EdgeFlags::FOLDER | EdgeFlags::PARENT | EdgeFlags::DELETED | EdgeFlags::BLOCK,
)
.filter(|e| e.flag.contains(EdgeFlags::FOLDER | EdgeFlags::PARENT))
{
if e.flag.contains(EdgeFlags::DELETED) {
let age = txn
.get_changeset(&channel.changes, e.introduced_by, None)
.unwrap();
if let Some((ref mut age0, ref mut v)) = next {
if age > *age0 {
*age0 = age;
*v = e.dest
}
} else {
next = Some((age, e.dest))
}
} else {
next = Some((0, e.dest));
break;
}
}
txn.find_block_end(channel, next.unwrap().1)
}
// org id T313j+8glo3jbLVV755vMhVhm9EW2lx+7wowRud9Bfo=
pub fn undo_file_reinsertion<T: MutTxnT>(
txn: &mut T,
change_id: ChangeId,
newedges: &EdgeMap<Option<Hash>>,
) -> Result<(), anyhow::Error> {
for e in newedges.edges.iter() {
assert!(!e.flag.contains(EdgeFlags::PARENT));
// org id HBoHMB0n95+Pap0r6KHw3Kj1HRX0MyKXyd0dYieT2cs=
if e.to.start_pos() == e.to.end_pos() {
let position = txn.internal_pos(&e.to.start_pos(), change_id)?;
if let Some(inode) = txn.get_revinodes(position, None) {
let inode = inode.to_owned();
txn.del_revinodes(position, None)?;
txn.del_inodes(inode, None)?;
}
}
}
Ok(())
}
// org id IuiCX9taX7EZv/Aqz/UZc5r7v/fH7MJn3/QIj46oHwc=
use crate::apply;
use crate::change::*;
use crate::changestore::*;
use crate::missing_context::*;
use crate::pristine::*;
use crate::Error;
use std::collections::{HashMap, HashSet};
mod working_copy;
pub fn unrecord<T: MutTxnT, P: ChangeStore>(
txn: &mut T,
channel: &mut ChannelRef<T>,
changes: &P,
hash: &Hash,
) -> Result<bool, anyhow::Error> {
let change = changes.get_change(hash)?;
let change_id = if let Some(h) = txn.get_internal(*hash) {
h
} else {
return Ok(false);
};
let unused = unused_in_other_channels(txn, &channel, change_id);
let mut channel = channel.r.borrow_mut();
del_channel_changes(txn, &mut channel, change_id)?;
unapply(txn, &mut channel, changes, change_id, &change)?;
if unused {
assert!(txn.get_revdep(change_id, None).is_none());
while txn.del_dep(change_id, None)? {}
txn.del_external(change_id, None)?;
txn.del_internal(*hash, None)?;
for dep in change.dependencies.iter() {
let dep = txn.get_internal(*dep).unwrap();
txn.del_revdep(dep, Some(change_id))?;
}
Ok(false)
} else {
Ok(true)
}
}
// org id 4kyMdTywL4dWm7Vw3kXLcabNNGbFzSP3FTUptNMLPwg=
fn del_channel_changes<T: MutTxnT>(
txn: &mut T,
channel: &mut Channel<T>,
change_id: ChangeId,
) -> Result<(), anyhow::Error> {
let timestamp = if let Some(ts) = txn.get_changeset(&channel.changes, change_id, None) {
ts
} else {
return Err((Error::ChangeNotOnChannel { change_id }).into());
};
for (p, d) in txn.iter_revdep(change_id) {
if p < change_id {
continue;
} else if p > change_id {
break;
}
if txn.get_changeset(&channel.changes, d, None).is_some() {
return Err((Error::ChangeIsDependedUpon { change_id }).into());
}
}
txn.del_changes(channel, change_id, timestamp)?;
Ok(())
}
// org id F70s1W/0AmTkJZfg47g3+OXxINmwCgXU7G5j5QZkvh8=
fn unused_in_other_channels<T: TxnT>(
txn: &mut T,
channel: &ChannelRef<T>,
change_id: ChangeId,
) -> bool {
let channel = channel.borrow();
for br in txn.iter_channels("") {
let br = br.borrow();
if br.name != channel.name {
if txn.get_changeset(&br.changes, change_id, None).is_some() {
return false;
}
}
}
true
}
// org id UmzPcrZozUPTljK9LrlZa2FIXJwwvHpPmfRQ9uV4qTs=
fn unapply<T: MutTxnT, C: ChangeStore>(
txn: &mut T,
channel: &mut Channel<T>,
changes: &C,
change_id: ChangeId,
change: &Change,
) -> Result<(), anyhow::Error> {
let mut clean_inodes = HashSet::new();
let mut ws = Workspace::new();
for change_ in change.changes.iter().rev().flat_map(|r| r.rev_iter()) {
match *change_ {
&