DO2Y5TY5JQISUHCVNPI2FXO7WWZVJQ3LGPWF4DNADMGZRIO6PT2QC
2GQCLJZGIXMTKDVMYIIQJDOR5EXGBZS5FKH2S4DTN25WKKBUMQQQC
FGVTXN52IVYIDIBPFOOUE7QQAZMPJQRADGKXW3DENIC6EVS4AYFAC
3OW3YNZXF6DR2EI5XS7PDHUUZMARDU2JUJFJKIEWXQNSMCIBVAOQC
YD7QFAD7XC35U5N4DOS3B4MBJL4KWSHVEZ6OLO5RYXOUPFQNPQPAC
ILZ44DEYAPNWZRHHEML2GPNSMGP4QXXI4GCL4T24R7A4LKGRV23AC
JZADJIA3P3EOKPBGEKEXJVGWHNF2SIHYNNMB3XFNPBU4BTVGM3YQC
TA5VXGFGDBLENEH4SRHD7FOUXPXRKIHN7DIBUREW3DZHMDVJNNCAC
5MRZLKBHOFFUAJELWL34XILG2XVWPUEHOFPDXBK5ASCO6H26MBFAC
CQ3FIUY4HDRAI3EQWJ7D5VHLC4EBO2LHWVRHX56XA6LRGEKP63EAC
L2VH4BYK3IULLGBHXMZJWKRKDQY43QEMQRTXFJCNRDE7PODLXWTAC
L4JXJHWXYNCL4QGJXNKKTOKKTAXKKXBJUUY7HFZGEUZ5A2V5H34QC
SXEYMYF7P4RZMZ46WPL4IZUTSQ2ATBWYZX7QNVMS3SGOYXYOHAGQC
UDHP4ZVBQZT2VBURB2MDCU2IZDNMCAFSIUKWRBDQ5BWMFKSN2LYQC
3WO4H2MMMQTYKCPBWYE67IRAEX7DFA2XAOMIKICA6BYDN23K6DQQC
BNPSVXIC72C3WT33YKCH766OBLLNCS7POX6U6JXZSQQPJF2M22MQC
MU5GSJAW65PEG3BRYUKZ7O37BPHW3MOX3S5E2RFOXKGUOJEEDQ5AC
RM225IDQR36MNBMN7OT2R562M4FUD6L34RY7A3WDBBETZIIDKZ5AC
XWETQ4DE4KL2GQWSEBE5NENLTSLIDF7RIWOCCVWJFQOVQJE5P33AC
Q7CAYX5N2GFOGMZL3VXVWORMAPWEOECXE22BLXK7Q4WEPS4CE2SAC
KTTKF3RWYAK2YSH2DYYW5QVG4KSNGWUBJBFHKE24OJ7LFCBF5FEAC
367UBQ6KNAKUEWG32R4QRJ6H7IE7NAZFOPTC3ZOE4Z6E44RV3ISQC
ISCWVXO6UN37V2QMR75ZWS7H5E7WYZCGR6TX3EDFOFRVCAPMIVUAC
2D7P2VKJASU7QDQZHGCLBIT6G2V5WUFYLWTCEVVEI2EZHGM6XYRAC
TKEVOH7HXON7SOBGXTUDHAHO2U2GPTQRNESP6ERKUQAS526OZIRAC
WLUID7NANDWTN5GOECNEKFTLZF3MUVS7K26YWLYLSGJ56G63NV4QC
IVLLXQ5ZWZDKHO4TNQG3TPXN34H6Y2WXPAGSO4PWCYNSKUZWOEJQC
K4CVMIUKNWBZ676IKSR5MYKTCDPPCRGWVAGYU772CE2B3AGAP4KQC
7HOBLRD43W2R5OVOYZVDO5LYPBZS7OSLDS7FH4NVNMBL3AZGMLSQC
2V33SO6IVPKBRPIJ3U3WYD6QKGN36BJVPP3M2YUDN7GTX3YH5UCAC
3ZAS64J675LPNHM2X32RH45B4A2LGK7NAIFDGQBQLDDSZVWEBTIQC
A3RM526Y7LUXNYW4TL56YKQ5GVOK2R5D7JJVTSQ6TT5MEXIR6YAAC
EUZFFJSOWV4PXDFFPDAFBHFUUMOFEU6ST7JH57YYRRR2SEOXLN6QC
I24UEJQLCH2SOXA4UHIYWTRDCHSOPU7AFTRUOTX7HZIAV4AZKYEQC
YN63NUZO4LVJ7XPMURDULTXBVJKW5MVCTZ24R7Z52QMHO3HPDUVQC
27RZYCM3XP72CW7FYGE373YAFD6EEZOZ4YAHEKV6JE5L6Z5N3JNAC
76PCXGML77EZWTRI5E6KHLVRAFTJ2AB5YRN5EKOYNAPKTWY2KCGAC
X6YFD4WVMUYJCR5IYPJH6UKYVWSA7DKBRVJ6XQFXHOE2TRYUTAHAC
5SLOJYHGPMZVCOE3IS7ICNMJJYX3RBT6CDG5MAV6T4CJIOW7YZ6QC
CCLLB7OIFNFYJZTG3UCI7536TOCWSCSXR67VELSB466R24WLJSDAC
GNMZNKB46GTPTWBR452FITHPBCMYPSDLV5VZQSY7BX6OJHWTWTZAC
I52XSRUH5RVHQBFWVMAQPTUSPAJ4KNVID2RMI3UGCVKFLYUO6WZAC
OIOMXESDNMLOTMNYCZZBYSBAQTYPAXXMUHTLA2AYCMNHZMPSLX2AC
AI73GKAO5QBPR6YGW7H5UNZYAEGYGIHAFO6DM2DWCPMVYLHE547QC
FBXYP7QM7SG6P2JDJVQPPCRKJE3GVYXNQ5GVV4GRDUNG6Q4ZRDJQC
IQ4FCHPZYGTZHCQHUIRCMUI5LCHIDSJCM2AZXGRJARWLCPPLXZOQC
VBMXB443FGZL6DLT6KAP2ICFCCQNXCUMDEUL67HB4CNKFMBBNSSAC
GYXIF25T2BCTCWCQI5DQOF3F4HBWDJUDJUMZ2WLHVBIOYATJTFAQC
KI2AFWOSN3PTGBGYQ7UKHFOZERZWEUWQ4AQNADG5S4QDJ53ESXFAC
SLJ3OHD4F6GJGZ3SV2D7DMR3PXYHPSI64X77KZ3RJ24EGEX6ZNQAC
AAXP2534BWX2ZUDZZHUMLYDBMGFGUH32CNRA3KOLER3JKOIJUZLAC
QL6K2ZM35B3NIXEMMCJWUSFXOBQHAGXRDMO7ID5DCKTJH4QJVY7QC
L3RCAPPKPURGFWF4TKDVIJRRMPJDMQAZ6T5CITGMS7KP5ROZ7IWAC
5OGOE4VWS5AIG4U2UYLLIGA3HY6UB7SNQOSESHNXBLET3VQXFBZAC
C267PHOH3QJBSBEWQB3J7PPOOXIUKM3DIIZIPLHPU4D5OXRCGLZAC
AEPEFS7O3YT7CRRFYQVJWUXUUSRGJ6K6XZQVK62B6N74UXOIFWYAC
5HF7C67M4DZMYTCIG32XEQQ662AHQMIHTHUK7TAVSO52XLMFBZPAC
4H2XTVJ2BNXDNHQ3RQTMOG3I4NRGZT7JDLC2GRINS56TIYTYTO4QC
BAUL3WR2ACY2HCJIM7K6HJOJ3UXDJISGLMDCSPH3WMPGJPL5AR4QC
44BN7FWSIXKG75IJUTCXLJE7VANNQFPRHQXTPLQHFU7AKGLSPQRAC
XSRTXUAS3DXJA42TZESMETFVTKU2OBUDGDE4N5F2CVWI4CLOUJ4AC
3KRGVQFUWFHPOGZOXVTJYNCM4XBRVYITAEOVPKBSAZ5GZIUO5KVQC
Y6EVFMTA6FOH3OQH6QCSWMI3F6SYZT2FSHO6GF4M3ICENDCWFM4QC
M5FK3ABTKBDG6HHW32G7UKRJEJQKD2U7BPXNZ3HVHBKULWVV6CTQC
L5IUD2DSLEK4SYPF6PLNO7C3TZEFYFHNM42HGEHY5VWW5MHD7CXAC
WZVCLZKY34KQBQU6YBGJLQCDADBQ67LQVDNRVCMQVY3O3C3EIWSQC
7ZROQSSN2M3LW6ASYMM6DPR5AERWV4K4TKWKEBKTCEJPMIJAHHXQC
ZRUPLBBTT4S6S7A3LOAHG4ONYEGPA5CFO4L2XBCNFKK45MWX3BDAC
FDEVV5NGUMTEULP25EFYFZEVICWYLGV7XMED25PNKD36DL4NA46AC
N3X5YP7PV2XVQKRRWSRCGJG34HZPLV4BGBLZGJG55KGIB7ORJ77QC
FXEDPLRI7PXLDXV634ZA6D5Q3ZWG3ESTKJTMRPJ4MAHI7PKU3M6AC
IIV3EL2XYI2X7HZWKXEXQFAE3R3KC2Q7SGOT3Q332HSENMYVF32QC
5BRU2RRWOQBMS2V3RQM7PRFR5UILYZ73GISHAKJA6KIZGC5M2MFAC
ZDK3GNDBWXJ2OXFDYB72ZCEBGLBF4MKE5K3PVHDZATHJ7HJIDPRQC
RIAA2QKFQM2BIDU3KIUXI5VYIUKTIHD2BURROGY6UXNJ7KRPY4IQC
VO5OQW4W2656DIYYRNZ3PO7TQ4JOKQ3GVWE5ALUTYVMX3WMXJOYQC
libpijul::changestore::filesystem::push_filename(
final_path,
&hashes[*current],
);
final_path.set_extension("change");
debug!("moving {:?} to {:?}", path, final_path);
std::fs::create_dir_all(&final_path.parent().unwrap())?;
let r = std::fs::rename(&path, &final_path);
libpijul::changestore::filesystem::pop_filename(final_path);
r?;
match hashes[*current] {
CS::Change(ref h) => {
libpijul::changestore::filesystem::push_filename(final_path, h);
debug!("moving {:?} to {:?}", path, final_path);
std::fs::create_dir_all(&final_path.parent().unwrap())?;
let r = std::fs::rename(&path, &final_path);
libpijul::changestore::filesystem::pop_filename(final_path);
r?;
}
CS::State(h) => {
libpijul::changestore::filesystem::push_tag_filename(
final_path, &h,
);
debug!("moving {:?} to {:?}", path, final_path);
std::fs::create_dir_all(&final_path.parent().unwrap())?;
let r = std::fs::rename(&path, &final_path);
libpijul::changestore::filesystem::pop_filename(final_path);
r?;
}
}
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let mut change_file = std::fs::File::open(&local)?;
let change_len = change_file.metadata()?.len();
let mut change = thrussh::CryptoVec::new_zeroed(change_len as usize);
use std::io::Read;
change_file.read_exact(&mut change[..])?;
self.c
.data(format!("apply {} {} {}\n", to_channel, c.to_base32(), change_len).as_bytes())
.await?;
self.c.data(&change[..]).await?;
libpijul::changestore::filesystem::pop_filename(&mut local);
match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let mut change_file = std::fs::File::open(&local)?;
let change_len = change_file.metadata()?.len();
let mut change = thrussh::CryptoVec::new_zeroed(change_len as usize);
use std::io::Read;
change_file.read_exact(&mut change[..])?;
self.c
.data(
format!("apply {} {} {}\n", to_channel, c.to_base32(), change_len)
.as_bytes(),
)
.await?;
self.c.data(&change[..]).await?;
libpijul::changestore::filesystem::pop_filename(&mut local);
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;
let mut v = Vec::new();
tag_file.short(&mut v)?;
self.c
.data(
format!("tagup {} {} {}\n", c.to_base32(), to_channel, v.len())
.as_bytes(),
)
.await?;
self.c.data(&v[..]).await?;
libpijul::changestore::filesystem::pop_filename(&mut local);
}
}
c: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,
sender: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
sender: &mut tokio::sync::mpsc::Sender<CS>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,
sender: Option<&mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>>,
c: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
sender: Option<&mut tokio::sync::mpsc::Sender<CS>>,
if full {
self.c
.data(format!("change {}\n", h.to_base32()).as_bytes())
.await?;
} else {
self.c
.data(format!("partial {}\n", h.to_base32()).as_bytes())
.await?;
match h {
CS::Change(h) if full => {
self.c
.data(format!("change {}\n", h.to_base32()).as_bytes())
.await?;
}
CS::Change(h) => {
self.c
.data(format!("partial {}\n", h.to_base32()).as_bytes())
.await?;
}
CS::State(h) => {
self.c
.data(format!("tag {}\n", h.to_base32()).as_bytes())
.await?;
}
let mut tags: HashSet<Merkle> = HashSet::new();
for x in txn.rev_iter_tags(&channel.read().tags, None)? {
let (n, m) = x?;
debug!("rev_iter_tags {:?} {:?}", n, m);
// First, if the remote has exactly the same first n tags, break.
if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
if p.b == m.b {
debug!("the remote has tag {:?}", p.a);
break;
}
if p.a != m.a {
// What to do here? It is possible that state
// `n` is a different state than `m.a` in the
// remote, and is also tagged.
}
} else {
tags.insert(m.a.into());
}
}
debug!("tags = {:?}", tags);
if txn.remote_has_state(remote_ref, &m)? {
break;
let h_unrecorded = self
.remote_unrecs
.iter()
.any(|(_, hh)| hh == &CS::Change(h.into()));
if !h_unrecorded {
if txn.remote_has_state(remote_ref, &m)?.is_some() {
debug!("remote_has_state: {:?}", m);
break;
}
let unknown_changes = self
.theirs_ge_dichotomy
.iter()
.filter_map(|(_, h, _, _)| {
if self.ours_ge_dichotomy_set.contains(h)
|| txn.get_revchanges(&channel, h).unwrap().is_some()
let mut unknown_changes = Vec::new();
for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
let change = CS::Change(*h);
if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
unknown_changes.push(change)
}
if *is_tag {
let m_is_known = if let Some(n) = txn
.channel_has_state(txn.states(&*channel.read()), &m.into())
.unwrap()
let n = self
.dichotomy_changelist(txn, &remote.lock().remote)
.await?;
let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
}
async fn update_changelist_pushpull_from_scratch(
&mut self,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
debug!("no id, starting from scratch");
let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
let mut theirs_ge_dichotomy_set = HashSet::new();
let mut to_download = Vec::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if txn.get_revchanges(current_channel, h)?.is_none() {
to_download.push(CS::Change(*h));
}
if *is_tag {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
if !txn.is_tagged(txn.tags(&*ch), n.into())? {
to_download.push(CS::State(*m));
}
} else {
to_download.push(CS::State(*m));
}
}
}
Ok(RemoteDelta {
inodes,
remote_ref: None,
to_download,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs: Vec::new(),
})
debug!("no id, starting from scratch");
let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
let mut theirs_ge_dichotomy_set = HashSet::new();
let mut to_download = Vec::new();
let mut tags = HashSet::new();
for (_, h, m, is_tagged) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(*h);
if txn.get_revchanges(current_channel, h)?.is_none() {
to_download.push(*h);
if *is_tagged {
tags.insert(*m);
}
}
}
return Ok(RemoteDelta {
inodes,
remote_ref: None,
to_download,
tags,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs: Vec::new(),
});
};
let mut remote_ref = if let Some(name) = self.name() {
txn.open_or_create_remote(id, name).unwrap()
} else {
unreachable!()
return self
.update_changelist_pushpull_from_scratch(txn, path, current_channel)
.await;
let dichotomy_n = self
.dichotomy_changelist(txn, &remote_ref.lock().remote)
.await?;
let ours_ge_dichotomy: Vec<(u64, Hash)> = txn
let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
let ours_ge_dichotomy: Vec<(u64, CS)> = txn
.collect::<HashSet<Hash>>();
// remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }
let mut remote_unrecs = Vec::new();
for (n, hash) in &ours_ge_dichotomy {
if theirs_ge_dichotomy_set.contains(hash) {
// If this change is still present in the remote, skip
continue;
} else if txn.get_revchanges(¤t_channel, &hash)?.is_none() {
// If this unrecord wasn't in our current channel, skip
continue;
} else {
remote_unrecs.push((*n, *hash))
.collect::<HashSet<CS>>();
let mut theirs_ge_dichotomy_set = HashSet::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if *is_tag {
theirs_ge_dichotomy_set.insert(CS::State(*m));
let should_cache = force_cache.unwrap_or_else(|| remote_unrecs.is_empty());
// remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }
let remote_unrecs = remote_unrecs(
txn,
current_channel,
&ours_ge_dichotomy,
&theirs_ge_dichotomy_set,
)?;
let should_cache = if let Some(true) = force_cache {
true
} else {
remote_unrecs.is_empty()
};
debug!(
"should_cache = {:?} {:?} {:?}",
force_cache, remote_unrecs, should_cache
);
for (k, _) in ours_ge_dichotomy.iter().copied() {
txn.del_remote(&mut remote_ref, k)?;
use libpijul::ChannelMutTxnT;
for (k, t) in ours_ge_dichotomy.iter().copied() {
match t {
CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
CS::Change(_) => {
txn.del_remote(&mut remote_ref, k)?;
}
}
for (n, h, m, t) in theirs_ge_dichotomy.iter().copied() {
txn.put_remote(&mut remote_ref, n, (h, m, t))?;
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
debug!("theirs: {:?} {:?} {:?}", n, h, m);
txn.put_remote(&mut remote_ref, n, (h, m))?;
if is_tag {
txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
}
let state_cond = |txn: &MutTxn<()>, merkle: &libpijul::pristine::SerializedMerkle| {
txn.channel_has_state(txn.states(&*current_channel.read()), merkle)
.map(|x| x.is_some())
};
let change_cond = |txn: &MutTxn<()>, hash: &Hash| {
txn.get_revchanges(¤t_channel, hash)
.unwrap()
.is_none()
};
// IF:
// The user only wanted to push/pull specific changes
// ELIF:
// The user specified no changes and there were no remote unrecords
// effecting the current channel means we can auto-cache
// the local remote cache
// ELSE:
// The user specified no changes but there were remote unrecords
// effecting the current channel meaning we can't auto-cache
// the local remote cache.
} else if should_cache {
let mut to_download: Vec<Hash> = Vec::new();
let mut tags = HashSet::new();
{
let rem = remote_ref.lock();
for thing in txn.iter_remote(&rem.remote, 0)? {
let (n, libpijul::pristine::Pair { a: hash, b: merkle }) = thing?;
if state_cond(txn, &merkle)? {
break;
} else if change_cond(txn, &hash.into()) {
to_download.push(Hash::from(hash));
if txn.is_tagged(&rem.tags, (*n).into())? {
tags.insert(merkle.into());
}
} else {
let mut to_download: Vec<CS> = Vec::new();
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
// In all cases, add this new change/state/tag to `to_download`.
let ch = CS::Change(*h);
if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {
to_download.push(ch.clone());
if *is_tag {
to_download.push(CS::State(*m));
}
}
Ok(RemoteDelta {
inodes,
remote_ref: Some(remote_ref),
to_download,
tags,
ours_ge_dichotomy_set,
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs,
})
} else {
let mut to_download: Vec<Hash> = Vec::new();
for thing in txn.iter_remote(&remote_ref.lock().remote, 0)? {
let (n, libpijul::pristine::Pair { a: hash, b: merkle }) = thing?;
if u64::from(*n) < dichotomy_n {
if state_cond(txn, &merkle)? {
continue;
} else if change_cond(txn, &hash.into()) {
to_download.push(Hash::from(hash));
} else if *is_tag {
let has_tag = if let Some(n) =
txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?
{
txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?
} else {
false
};
if !has_tag {
to_download.push(CS::State(*m));
}
let mut tags = HashSet::new();
for (_, hash, merkle, t) in &theirs_ge_dichotomy {
if state_cond(txn, &merkle.into())? {
continue;
} else if change_cond(txn, &hash) {
to_download.push(Hash::from(*hash));
if *t {
tags.insert(*merkle);
// Additionally, if there are no remote unrecords
// (i.e. if `should_cache`), cache.
if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
use libpijul::ChannelMutTxnT;
txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
if *is_tag {
let mut rem = remote_ref.lock();
txn.put_tags(&mut rem.tags, *n, m)?;
if let Some((_, s)) = self.get_state(txn, Some(b)).await? {
if s == state {
let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
v.into()
} else {
Merkle::zero()
};
debug!("last_state: {:?} {:?}", state, last_statet);
if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
debug!("remote last_state: {:?} {:?}", s, st);
if s == state && st == last_statet {
let (mid, state) = txn.get_remote_state(remote, mid)?.unwrap();
let (mid, state) = {
let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
(a, b.b)
};
let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
// There's still a tag at position >= mid in the
// sequence.
b.b.into()
} else {
// No tag at or after mid, the last state, `statet`,
// is the right answer in that case.
last_statet
};
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
send: &mut tokio::sync::mpsc::Sender<CS>,
libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
if std::fs::metadata(&change_path_).is_err() {
hash_send.send(*h)?;
to_download.insert(*h);
if let CS::Change(h) = h {
libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
if std::fs::metadata(&change_path_).is_err() {
hash_send.send(CS::Change(*h))?;
to_download.insert(CS::Change(*h));
}
libpijul::changestore::filesystem::pop_filename(&mut change_path_);
let changes = repo.changes.get_changes(h)?;
changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
if let Some(h) = inode.change {
inodes.contains(&Position {
change: h,
pos: inode.pos,
})
} else {
false
}
if let CS::Change(h) = h {
let changes = repo.changes.get_changes(h)?;
changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
if let Some(h) = inode.change {
inodes.contains(&Position {
change: h,
pos: inode.pos,
})
} else {
false
}
})
let mut channel = channel.write();
txn.apply_change_ws(&repo.changes, &mut channel, h, &mut ws)?;
if let CS::Change(h) = h {
let mut channel = channel.write();
txn.apply_change_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
std::fs::create_dir_all(change_path.parent().unwrap())?;
use libpijul::changestore::ChangeStore;
hashes.push(hash);
for dep in repo.changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
send_hash.send(dep)?;
if let CS::Change(hash) = hash {
libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
std::fs::create_dir_all(change_path.parent().unwrap())?;
use libpijul::changestore::ChangeStore;
hashes.push(CS::Change(hash));
for dep in repo.changes.get_dependencies(&hash)? {
let dep: libpijul::pristine::Hash = dep;
send_hash.send(CS::Change(dep))?;
}
libpijul::changestore::filesystem::pop_filename(&mut change_path);
}
/// Compare the remote set (theirs_ge_dichotomy) with our current
/// version of that (ours_ge_dichotomy) and return the changes in our
/// current version that are not in the remote anymore.
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
txn: &T,
current_channel: &ChannelRef<T>,
ours_ge_dichotomy: &[(u64, CS)],
theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
let mut remote_unrecs = Vec::new();
for (n, hash) in ours_ge_dichotomy {
debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
if theirs_ge_dichotomy_set.contains(hash) {
// If this change is still present in the remote, skip
debug!("still present");
continue;
} else {
let has_it = match hash {
CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
CS::State(state) => {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
txn.is_tagged(txn.tags(&*ch), n.into())?
} else {
false
}
}
};
if has_it {
remote_unrecs.push((*n, *hash))
} else {
// If this unrecord wasn't in our current channel, skip
continue;
}
}
}
Ok(remote_unrecs)
) -> Result<Option<(u64, Merkle)>, anyhow::Error> {
if let Some(mid) = mid {
Ok(txn.get_changes(&channel, mid)?.map(|(_, m)| (mid, m)))
) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
if let Some(x) = txn.reverse_log(&*channel.read(), mid)?.next() {
let (n, (_, m)) = x?;
if let Some(m2) = txn
.rev_iter_tags(txn.tags(&*channel.read()), Some(n.into()))?
.next()
{
let (_, m2) = m2?;
Ok(Some((n, m.into(), m2.b.into())))
} else {
Ok(Some((n, m.into(), Merkle::zero())))
}
Ok(txn.reverse_log(&*channel.read(), None)?.next().map(|n| {
let (n, (_, m)) = n.unwrap();
(n, m.into())
}))
Ok(None)
for x in remote_txn.log(&*remote_channel.read(), from)? {
let rem = remote_channel.read();
let tags: Vec<u64> = remote_txn
.iter_tags(remote_txn.tags(&*rem), from)?
.map(|k| (*k.unwrap().0).into())
.collect();
let mut tagsi = 0;
for x in remote_txn.log(&*rem, from)? {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);
}
}
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
send: &mut tokio::sync::mpsc::Sender<CS>,
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
libpijul::changestore::filesystem::push_filename(&mut path, &c);
super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
if std::fs::metadata(&path).is_ok() {
debug!("metadata {:?} ok", path);
if let CS::Change(c) = c {
libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
libpijul::changestore::filesystem::push_filename(&mut path, &c);
super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
if std::fs::metadata(&path).is_ok() {
debug!("metadata {:?} ok", path);
libpijul::changestore::filesystem::pop_filename(&mut path);
continue;
}
std::fs::create_dir_all(&path.parent().unwrap())?;
if std::fs::hard_link(&self.changes_dir, &path).is_err() {
std::fs::copy(&self.changes_dir, &path)?;
}
debug!("hard link done");
libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;
match c {
CS::Change(c) => {
txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;
}
CS::State(c) => {
if let Some(n) = txn.channel_has_state(txn.states(&*channel), &c.into())? {
let tags = txn.tags_mut(&mut *channel);
txn.put_tags(tags, n.into(), c)?;
} else {
bail!(
"Cannot add tag {}: channel {:?} does not have that state",
c.to_base32(),
txn.name(&*channel)
)
}
}
}
c: libpijul::pristine::Hash,
) -> Result<libpijul::pristine::Hash, anyhow::Error> {
libpijul::changestore::filesystem::push_filename(&mut path, &c);
c: CS,
) -> Result<CS, anyhow::Error> {
let (req, c32) = match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut path, &c);
("change", c.to_base32())
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
if std::fs::metadata(&path).is_ok() {
bail!("Tag already downloaded: {}", c.to_base32())
}
("tag", c.to_base32())
}
};
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<libpijul::pristine::Hash>,
send: &mut tokio::sync::mpsc::Sender<libpijul::pristine::Hash>,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
send: &mut tokio::sync::mpsc::Sender<CS>,
let c = c.to_base32();
to_channel.push(("apply", &c));
let base32;
let body = match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let change = std::fs::read(&local)?;
base32 = c.to_base32();
to_channel.push(("apply", &base32));
change
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;
let mut v = Vec::new();
tag_file.short(&mut v)?;
base32 = c.to_base32();
to_channel.push(("tagup", &base32));
v
}
};
libpijul::changestore::filesystem::pop_filename(&mut local);
u.sort_by(|a, b| {
let na = txn.get_revchanges(&channel, a).unwrap().unwrap();
let nb = txn.get_revchanges(&channel, b).unwrap().unwrap();
na.cmp(&nb)
u.sort_by(|a, b| match (a, b) {
(CS::Change(a), CS::Change(b)) => {
let na = txn.get_revchanges(&channel, a).unwrap().unwrap();
let nb = txn.get_revchanges(&channel, b).unwrap().unwrap();
na.cmp(&nb)
}
(CS::State(a), CS::State(b)) => {
let na = txn
.channel_has_state(txn.states(&*channel.read()), &a.into())
.unwrap()
.unwrap();
let nb = txn
.channel_has_state(txn.states(&*channel.read()), &b.into())
.unwrap()
.unwrap();
na.cmp(&nb)
}
_ => unreachable!(),
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
match h {
CS::Change(h) => {
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
CS::State(s) => {
if let Some(n) = txn.channel_has_state(&channel.states, &s.into())? {
txn.put_tags(&mut channel.tags, n.into(), s)?;
} else {
bail!(
"Cannot add tag {}: channel {:?} does not have that state",
s.to_base32(),
channel.name
)
}
}
}
if let Some(int) = txn_.get_internal(&d.into())? {
for inode in txn_.iter_rev_touched(int)? {
let (int_, inode) = inode?;
if int_ < int {
continue;
} else if int_ > int {
break;
}
let ext = libpijul::pristine::Position {
change: txn_.get_external(&inode.change)?.unwrap().into(),
pos: inode.pos,
};
if inodes.is_empty() || inodes.contains(&ext) {
touched.insert(*inode);
match d {
CS::Change(d) => {
if let Some(int) = txn_.get_internal(&d.into())? {
for inode in txn_.iter_rev_touched(int)? {
let (int_, inode) = inode?;
if int_ < int {
continue;
} else if int_ > int {
break;
}
let ext = libpijul::pristine::Position {
change: txn_.get_external(&inode.change)?.unwrap().into(),
pos: inode.pos,
};
if inodes.is_empty() || inodes.contains(&ext) {
touched.insert(*inode);
}
}
for d in c.get_dependencies(&h)? {
if original.get(&d).is_some() && now_.get(&d).is_none() {
let hh = if let CS::Change(h) = h {
h
} else {
stack.pop();
result.push(h);
continue;
};
for d in c.get_dependencies(&hh)? {
if original.get(&CS::Change(d)).is_some() && now_.get(&CS::Change(d)).is_none() {
fn check_deps<C: ChangeStore>(
c: &C,
original: &[libpijul::Hash],
now: &[libpijul::Hash],
) -> Result<(), anyhow::Error> {
fn check_deps<C: ChangeStore>(c: &C, original: &[CS], now: &[CS]) -> Result<(), anyhow::Error> {
writeln!(&mut s, "# {}", header.message).expect("Infallible write to String");
writeln!(&mut s, "# {}", header.timestamp).expect("Infallible write to String");
writeln!(&mut s, "# {}", hash.to_base32()).expect("Infallible write to String");
writeln!(&mut s, "# {}", header.message).unwrap();
writeln!(&mut s, "# {}", header.timestamp).unwrap();
match hash {
CS::Change(hash) => {
writeln!(&mut s, "# {}", hash.to_base32()).unwrap();
}
CS::State(hash) => {
writeln!(&mut s, "# {}", hash.to_base32()).unwrap();
}
}
writeln!(&mut s, "# {}", hash.to_base32()).expect("Infallible write to String");
let hash = match hash {
CS::Change(hash) => hash.to_base32(),
CS::State(hash) => hash.to_base32(),
};
writeln!(&mut s, "# {}", hash).expect("Infallible write to String");
writeln!(o, "{} {}", n, m.to_base32())?;
let m2 = if let Some(x) = txn
.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
.next()
{
x?.1.b.into()
} else {
Merkle::zero()
};
writeln!(o, "{} {} {}", n, m.to_base32(), m2.to_base32())?;
} else if let Some(x) = txn.read().reverse_log(&*channel.read(), None)?.next() {
let (n, (_, m)) = x?;
let m: Merkle = m.into();
writeln!(o, "{} {}", n, m.to_base32())?
writeln!(o, "-")?;
let txn = txn.read();
if let Some(x) = txn.reverse_log(&*channel.read(), None)?.next() {
let (n, (_, m)) = x?;
let m: Merkle = m.into();
let m2 = if let Some(x) = txn
.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
.next()
{
x?.1.b.into()
} else {
Merkle::zero()
};
writeln!(o, "{} {} {}", n, m.to_base32(), m2.to_base32())?
} else {
writeln!(o, "-")?;
}
let mut tag_path = repo.changes_dir.clone();
libpijul::changestore::filesystem::push_tag_filename(&mut tag_path, &state);
let mut tag = libpijul::tag::OpenTagFile::open(&tag_path, &state)?;
let mut buf = Vec::new();
tag.short(&mut buf)?;
o.write_u64::<BigEndian>(buf.len() as u64)?;
o.write_all(&buf)?;
o.flush()?;
}
} else if let Some(cap) = TAGUP.captures(&buf) {
if let Some(state) = Merkle::from_base32(cap[1].as_bytes()) {
let header = bincode::deserialize(&buf)?;
let header = libpijul::tag::read_short(std::io::Cursor::new(&buf[..]), &m)?;
let temp_path = tag_path.with_extension("tmp");
std::fs::create_dir_all(temp_path.parent().unwrap())?;
let mut w = std::fs::File::create(&temp_path)?;
writeln!(v, "{}\n", p.to_base32()).unwrap();
let deps = changes.get_dependencies(&p)?;
if !deps.is_empty() {
write!(v, " Dependencies:").unwrap();
for d in deps {
write!(v, " {}", d.to_base32()).unwrap();
let header = match p {
CS::Change(p) => {
writeln!(v, "{}\n", p.to_base32()).unwrap();
let deps = changes.get_dependencies(&p)?;
if !deps.is_empty() {
write!(v, " Dependencies:").unwrap();
for d in deps {
write!(v, " {}", d.to_base32()).unwrap();
}
writeln!(v).unwrap();
}
changes.get_header(&p)?
}
CS::State(p) => {
writeln!(v, "t{}\n", p.to_base32()).unwrap();
changes.get_tag_header(&p)?
.filter_map(|l| libpijul::Hash::from_base32(l.as_bytes()))
.filter_map(|l| {
::log::debug!(
"l = {:?} {:?}",
l,
libpijul::Merkle::from_base32(l.as_bytes())
);
if l.starts_with("t") {
libpijul::Merkle::from_base32(&l.as_bytes()[1..]).map(crate::remote::CS::State)
} else {
libpijul::Hash::from_base32(l.as_bytes()).map(crate::remote::CS::Change)
}
})
pub fn short<W: std::io::Write>(&mut self, mut w: W) -> Result<(), TagError> {
let mut header_buf = vec![0u8; (self.header.channel - self.header.header) as usize];
self.file.seek(SeekFrom::Start(self.header.header))?;
self.file.read_exact(&mut header_buf)?;
debug!("header_buf = {:?}", header_buf);
let mut off = FileHeader {
version: VERSION,
header: 0,
channel: 0,
unhashed: 0,
total: 0,
offsets: DbOffsets::default(),
state: self.header.state.clone(),
};
off.header = bincode::serialized_size(&off)?;
off.channel = off.header + header_buf.len() as u64;
off.total = off.channel;
let mut off_buf = Vec::with_capacity(off.header as usize);
bincode::serialize_into(&mut off_buf, &off)?;
w.write_all(&off_buf)?;
w.write_all(&header_buf)?;
Ok(())
}
}
pub fn read_short<R: std::io::Read + std::io::Seek>(mut file: R, expected: &Merkle) -> Result<crate::change::ChangeHeader, TagError> {
let mut off = [0u8; std::mem::size_of::<FileHeader>() as usize];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut off)?;
let header: FileHeader = bincode::deserialize(&off).map_err(TagError::BincodeDe)?;
debug!("header = {:?}", header);
if &header.state == expected {
file.seek(SeekFrom::Start(header.header))?;
Ok(bincode::deserialize_from(file).map_err(TagError::BincodeDe)?)
} else {
Err(TagError::WrongHash {
expected: *expected,
got: header.state,
})
}
let tags = copy::<L64, Pair<SerializedMerkle, SerializedMerkle>, UP<L64, Pair<SerializedMerkle, SerializedMerkle>>, _>(
debug!("copying tags");
let tags = copy::<L64, Pair<SerializedMerkle, SerializedMerkle>, P<L64, Pair<SerializedMerkle, SerializedMerkle>>, _>(
fn state_from_prefix(
&self,
channel: &Self::Channel,
s: &str,
) -> Result<(Merkle, L64), super::HashPrefixError<Self::GraphError>> {
let h: SerializedMerkle = if let Some(ref h) = Merkle::from_prefix(s) {
h.into()
} else {
return Err(super::HashPrefixError::Parse(s.to_string()));
};
let mut result = None;
debug!("h = {:?}", h);
for x in btree::iter(&self.txn, &channel.states, Some((&h, None)))
.map_err(|e| super::HashPrefixError::Txn(e.into()))?
{
let (e, i) = x.map_err(|e| super::HashPrefixError::Txn(e.into()))?;
debug!("{:?} {:?}", e, i);
if e < &h {
continue;
} else {
let e: Merkle = e.into();
let b32 = e.to_base32();
debug!("{:?}", b32);
let (b32, _) = b32.split_at(s.len().min(b32.len()));
if b32 != s {
break;
} else if result.is_none() {
result = Some((e, *i))
} else {
return Err(super::HashPrefixError::Ambiguous(s.to_string()));
}
}
}
if let Some(result) = result {
Ok(result)
} else {
Err(super::HashPrefixError::NotFound(s.to_string()))
}
}
sanakirja_cursor!(remotetags, L64, Pair<SerializedMerkle, SerializedMerkle>);
sanakirja_rev_cursor!(remotetags, L64, Pair<SerializedMerkle, SerializedMerkle>);
type RemotetagsCursor = ::sanakirja::btree::cursor::Cursor<
L64,
Pair<SerializedMerkle, SerializedMerkle>,
UP<L64, Pair<SerializedMerkle, SerializedMerkle>>,
>;
}
fn get_remote_tag(
&self,
remote: &Self::Tags,
n: u64,
) -> Result<Option<(u64, &Pair<SerializedMerkle, SerializedMerkle>)>, TxnErr<Self::GraphError>>
{
let n = n.into();
if let Some(x) = btree::rev_iter(&self.txn, remote, Some((&n, None)))?.next() {
let (&k, m) = x?;
Ok(Some((k.into(), m)))
} else {
Ok(None)
}
impl<'a, T: ChannelTxnT> Iterator for crate::pristine::RevCursor<T, &'a T, T::TagsCursor, L64, ()>
{
type Item = Result<&'a L64, TxnErr<T::GraphError>>;
fn next(&mut self) -> Option<Self::Item> {
match self.txn.cursor_tags_prev(&mut self.cursor) {
Ok(Some(x)) => Some(Ok(x)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
impl<'a, T: ChannelTxnT>
crate::pristine::Cursor<T, &'a T, T::TagsCursor, L64, ()>
{
pub fn prev(&mut self) -> Option<Result<u64, TxnErr<T::GraphError>>> {
match self.txn.cursor_tags_prev(&mut self.cursor) {
Ok(Some(x)) => Some(Ok((*x).into())),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
impl<'a, T: ChannelTxnT> Iterator for crate::pristine::Cursor<T, &'a T, T::TagsCursor, L64, ()>
{
type Item = Result<u64, TxnErr<T::GraphError>>;
fn next(&mut self) -> Option<Self::Item> {
match self.txn.cursor_tags_next(&mut self.cursor) {
Ok(Some(x)) => Some(Ok((*x).into())),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}