use std::cell::RefCell;
use std::borrow::Borrow;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use std::future::Future;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::Poll;
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::{Sink, Stream};
use futures_util::future::{FusedFuture, LocalBoxFuture};
use futures_util::stream::{FuturesUnordered, Once};
use futures_util::{FutureExt, stream, StreamExt};
use futures_util::task::LocalSpawnExt;
use futures::{pin_mut, Stream};
use futures_util::stream::{FuturesUnordered, Once};
use futures_util::{stream, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use libpijul::changestore::{ChangeStore, filesystem};
use crate::context::DownloadContext;
use libpijul::changestore::{filesystem, ChangeStore};
use libpijul::pristine::{
sanakirja::MutTxn, Base32, ChangeId, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef,
TxnT,
};
use libpijul::{DOT_DIR, HashMap};
use libpijul::pristine::{ChangePosition, Position};
use libpijul::{ApplyWorkspace, HashMap, DOT_DIR};
use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
use log::{debug, info};
use regex::Regex;
use tokio::sync::mpsc;
use pijul_config::*;
use pijul_identity::Complete;
use pijul_repository::*;
pub mod ssh;
use ssh::*;
pub mod local;
use local::*;
pub mod http;
use http::*;
mod context;
mod context;
use pijul_interaction::{
ProgressBar, Spinner, APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, UPLOAD_MESSAGE,
};
pub const PROTOCOL_VERSION: usize = 3;
pub enum RemoteRepo {
Local(Local),
Ssh(Ssh),
Http(Http),
LocalChannel(String),
None,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CS {
Change(Hash),
State(Merkle),
}
impl filesystem::CacheEntry for CS {
fn hash(&self) -> String {
match self {
CS::Change(v) => v.hash(),
CS::State(v) => v.hash(),
}
}
fn ext(&self) -> &str {
match self {
CS::Change(v) => v.ext(),
CS::State(v) => v.ext(),
}
}
}
pub async fn repository(
repo: &Repository,
self_path: Option<&Path>,
user: Option<&str>,
name: &str,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if let Some(name) = repo.config.remotes.iter().find(|e| e.name() == name) {
name.to_remote(channel, no_cert_check, with_path).await
} else {
unknown_remote(self_path, user, name, channel, no_cert_check, with_path).await
}
}
pub async fn prove(
identity: &Complete,
origin: Option<&str>,
no_cert_check: bool,
) -> Result<(), anyhow::Error> {
let remote = origin.unwrap_or(&identity.config.author.origin);
let mut stderr = std::io::stderr();
writeln!(
stderr,
"Linking identity `{}` with {}@{}",
&identity.name, &identity.config.author.username, remote
)?;
let mut remote = if let Ok(repo) = Repository::find_root(None) {
repository(
&repo,
None,
Some(&identity.config.author.username),
&remote,
libpijul::DEFAULT_CHANNEL,
no_cert_check,
false,
)
.await?
} else {
unknown_remote(
None,
Some(&identity.config.author.username),
&remote,
libpijul::DEFAULT_CHANNEL,
no_cert_check,
false,
)
.await?
};
let (key, _password) = identity
.credentials
.clone()
.unwrap()
.decrypt(&identity.name)?;
remote.prove(key).await?;
Ok(())
}
#[async_trait]
pub trait ToRemote {
async fn to_remote(
&self,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error>;
}
#[async_trait]
impl ToRemote for RemoteConfig {
async fn to_remote(
&self,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
match self {
RemoteConfig::Ssh { ssh, .. } => {
if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = sshr.connect(ssh, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", ssh)
}
RemoteConfig::Http {
http,
headers,
name,
} => {
let mut h = Vec::new();
for (k, v) in headers.iter() {
match v {
RemoteHttpHeader::String(s) => {
h.push((k.clone(), s.clone()));
}
RemoteHttpHeader::Shell(shell) => {
h.push((k.clone(), shell_cmd(&shell.shell)?));
}
}
}
return Ok(RemoteRepo::Http(Http {
url: http.parse().unwrap(),
channel: channel.to_string(),
client: reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(no_cert_check)
.build()?,
headers: h,
name: name.to_string(),
}));
}
}
}
}
pub async fn unknown_remote(
self_path: Option<&Path>,
user: Option<&str>,
name: &str,
channel: &str,
no_cert_check: bool,
with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
if let Ok(url) = url::Url::parse(name) {
let scheme = url.scheme();
if scheme == "http" || scheme == "https" {
debug!("unknown_remote, http = {:?}", name);
return Ok(RemoteRepo::Http(Http {
url,
channel: channel.to_string(),
client: reqwest::ClientBuilder::new()
.danger_accept_invalid_certs(no_cert_check)
.build()?,
headers: Vec::new(),
name: name.to_string(),
}));
} else if scheme == "ssh" {
if let Some(mut ssh) = ssh_remote(user, name, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = ssh.connect(name, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", name)
} else {
bail!("Remote scheme not supported: {:?}", scheme)
}
}
if let Ok(root) = std::fs::canonicalize(name) {
if let Some(path) = self_path {
let path = std::fs::canonicalize(path)?;
if path == root {
return Ok(RemoteRepo::LocalChannel(channel.to_string()));
}
}
let mut dot_dir = root.join(DOT_DIR);
let changes_dir = dot_dir.join(CHANGES_DIR);
dot_dir.push(PRISTINE_DIR);
debug!("dot_dir = {:?}", dot_dir);
match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
Ok(pristine) => {
debug!("pristine done");
return Ok(RemoteRepo::Local(Local {
root: Path::new(name).to_path_buf(),
channel: channel.to_string(),
changes_dir,
pristine: Arc::new(pristine),
name: name.to_string(),
}));
}
Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
sanakirja::Error::IO(e),
)) if e.kind() == std::io::ErrorKind::NotFound => {
debug!("repo not found")
}
Err(e) => return Err(e.into()),
}
}
if let Some(mut ssh) = ssh_remote(user, name, with_path) {
debug!("unknown_remote, ssh = {:?}", ssh);
if let Some(c) = ssh.connect(name, channel).await? {
return Ok(RemoteRepo::Ssh(c));
}
}
bail!("Remote not found: {:?}", name)
}
pub fn get_local_inodes(
txn: &mut MutTxn<()>,
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
path: &[String],
) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
let mut paths = HashSet::new();
for path in path.iter() {
let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
if ambiguous {
bail!("Ambiguous path: {:?}", path)
}
paths.insert(p);
paths.extend(
libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
);
}
Ok(paths)
}
pub struct PushDelta {
pub to_upload: Vec<CS>,
pub remote_unrecs: Vec<(u64, CS)>,
pub unknown_changes: Vec<CS>,
}
pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
pub inodes: HashSet<Position<Hash>>,
pub to_download: Vec<CS>,
pub remote_ref: Option<RemoteRef<T>>,
pub ours_ge_dichotomy_set: HashSet<CS>,
pub theirs_ge_dichotomy_set: HashSet<CS>,
pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
pub remote_unrecs: Vec<(u64, CS)>,
}
impl RemoteDelta<MutTxn<()>> {
pub fn to_local_channel_push(
self,
remote_channel: &str,
txn: &mut MutTxn<()>,
path: &[String],
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
) -> Result<PushDelta, anyhow::Error> {
let mut to_upload = Vec::new();
let inodes = get_local_inodes(txn, channel, repo, path)?;
for x in txn.reverse_log(&*channel.read(), None)? {
let (_, (h, _)) = x?;
if let Some(channel) = txn.load_channel(remote_channel)? {
let channel = channel.read();
let h_int = txn.get_internal(h)?.unwrap();
if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
if inodes.is_empty() {
to_upload.push(CS::Change(h.into()))
} else {
for p in inodes.iter() {
if txn.get_touched_files(p, Some(h_int))?.is_some() {
to_upload.push(CS::Change(h.into()));
break;
}
}
}
}
}
}
assert!(self.ours_ge_dichotomy_set.is_empty());
assert!(self.theirs_ge_dichotomy_set.is_empty());
let d = PushDelta {
to_upload: to_upload.into_iter().rev().collect(),
remote_unrecs: self.remote_unrecs,
unknown_changes: Vec::new(),
};
assert!(d.remote_unrecs.is_empty());
Ok(d)
}
pub fn to_remote_push(
self,
txn: &mut MutTxn<()>,
path: &[String],
channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
) -> Result<PushDelta, anyhow::Error> {
let mut to_upload = Vec::new();
let inodes = get_local_inodes(txn, channel, repo, path)?;
if let Some(ref remote_ref) = self.remote_ref {
let mut tags: HashSet<Merkle> = HashSet::new();
for x in txn.rev_iter_tags(&channel.read().tags, None)? {
let (n, m) = x?;
debug!("rev_iter_tags {:?} {:?}", n, m);
if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
if p.b == m.b {
debug!("the remote has tag {:?}", p.a);
break;
}
if p.a != m.a {
}
} else {
tags.insert(m.a.into());
}
}
debug!("tags = {:?}", tags);
for x in txn.reverse_log(&*channel.read(), None)? {
let (_, (h, m)) = x?;
let h_unrecorded = self
.remote_unrecs
.iter()
.any(|(_, hh)| hh == &CS::Change(h.into()));
if !h_unrecorded {
if txn.remote_has_state(remote_ref, &m)?.is_some() {
debug!("remote_has_state: {:?}", m);
break;
}
}
let h_int = txn.get_internal(h)?.unwrap();
let h_deser = Hash::from(h);
if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
&& !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
{
if inodes.is_empty() {
if tags.remove(&m.into()) {
to_upload.push(CS::State(m.into()));
}
to_upload.push(CS::Change(h_deser));
} else {
for p in inodes.iter() {
if txn.get_touched_files(p, Some(h_int))?.is_some() {
to_upload.push(CS::Change(h_deser));
if tags.remove(&m.into()) {
to_upload.push(CS::State(m.into()));
}
break;
}
}
}
}
}
for t in tags.iter() {
if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
if !txn.is_tagged(&remote_ref.lock().tags, n)? {
to_upload.push(CS::State(*t));
}
} else {
debug!("the remote doesn't have state {:?}", t);
}
}
}
let mut unknown_changes = Vec::new();
for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
let change = CS::Change(*h);
if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
unknown_changes.push(change)
}
if *is_tag {
let m_is_known = if let Some(n) = txn
.channel_has_state(txn.states(&*channel.read()), &m.into())
.unwrap()
{
txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
} else {
false
};
if !m_is_known {
unknown_changes.push(CS::State(*m))
}
}
}
Ok(PushDelta {
to_upload: to_upload.into_iter().rev().collect(),
remote_unrecs: self.remote_unrecs,
unknown_changes,
})
}
}
pub fn update_changelist_local_channel(
remote_channel: &str,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
repo: &Repository,
specific_changes: &[String],
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
if !specific_changes.is_empty() {
let mut to_download = Vec::new();
for h in specific_changes {
let h = txn.hash_from_prefix(h)?.0;
if txn.get_revchanges(current_channel, &h)?.is_none() {
to_download.push(CS::Change(h));
}
}
Ok(RemoteDelta {
inodes: HashSet::new(),
to_download,
remote_ref: None,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy: Vec::new(),
theirs_ge_dichotomy_set: HashSet::new(),
remote_unrecs: Vec::new(),
})
} else {
let mut inodes = HashSet::new();
let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
let mut to_download = Vec::new();
inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
change: txn.get_external(&x.change).unwrap().unwrap().into(),
pos: x.pos,
}));
if let Some(remote_channel) = txn.load_channel(remote_channel)? {
let remote_channel = remote_channel.read();
for x in txn.reverse_log(&remote_channel, None)? {
let (_, (h, m)) = x?;
if txn
.channel_has_state(txn.states(&*current_channel.read()), &m)?
.is_some()
{
break;
}
let h_int = txn.get_internal(h)?.unwrap();
if txn
.get_changeset(txn.changes(&*current_channel.read()), h_int)?
.is_none()
{
if inodes_.is_empty()
|| inodes_.iter().any(|&inode| {
txn.get_rev_touched_files(h_int, Some(&inode))
.unwrap()
.is_some()
})
{
to_download.push(CS::Change(h.into()));
}
}
}
}
Ok(RemoteDelta {
inodes,
to_download,
remote_ref: None,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy: Vec::new(),
theirs_ge_dichotomy_set: HashSet::new(),
remote_unrecs: Vec::new(),
})
}
}
impl RemoteRepo {
fn name(&self) -> Option<&str> {
match *self {
RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
RemoteRepo::Local(ref l) => Some(l.name.as_str()),
RemoteRepo::Http(ref h) => Some(h.name.as_str()),
RemoteRepo::LocalChannel(_) => None,
RemoteRepo::None => unreachable!(),
}
}
pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
match *self {
RemoteRepo::Ssh(ref s) => {
if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
Ok(Some(s.name.split_at(sep + 1).1.to_string()))
} else {
Ok(Some(s.name.as_str().to_string()))
}
}
RemoteRepo::Local(ref l) => {
if let Some(file) = l.root.file_name() {
Ok(Some(
file.to_str()
.context("failed to decode local repository name")?
.to_string(),
))
} else {
Ok(None)
}
}
RemoteRepo::Http(ref h) => {
if let Some(name) = libpijul::path::file_name(h.url.path()) {
if !name.trim().is_empty() {
return Ok(Some(name.trim().to_string()));
}
}
Ok(h.url.host().map(|h| h.to_string()))
}
RemoteRepo::LocalChannel(_) => Ok(None),
RemoteRepo::None => unreachable!(),
}
}
pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
if let RemoteRepo::Ssh(s) = self {
s.finish().await?
}
Ok(())
}
pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
&mut self,
txn: &mut T,
path: &[String],
) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
debug!("update_changelist");
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(None);
};
let mut remote = if let Some(name) = self.name() {
txn.open_or_create_remote(id, name)?
} else {
return Ok(None);
};
let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
debug!("update changelist {:?}", n);
let v: Vec<_> = txn
.iter_remote(&remote.lock().remote, n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
let k = (*k.unwrap().0).into();
if k >= n {
Some(k)
} else {
None
}
})
.collect();
for k in v {
debug!("deleting {:?}", k);
txn.del_remote(&mut remote, k)?;
}
let v: Vec<_> = txn
.iter_tags(&remote.lock().tags, n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
let k = (*k.unwrap().0).into();
if k >= n {
Some(k)
} else {
None
}
})
.collect();
for k in v {
debug!("deleting {:?}", k);
txn.del_tags(&mut remote.lock().tags, k)?;
}
debug!("deleted");
let paths = self.download_changelist(txn, &mut remote, n, path).await?;
Ok(Some((paths, remote)))
}
async fn update_changelist_pushpull_from_scratch(
&mut self,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
debug!("no id, starting from scratch");
let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
let mut theirs_ge_dichotomy_set = HashSet::new();
let mut to_download = Vec::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if txn.get_revchanges(current_channel, h)?.is_none() {
to_download.push(CS::Change(*h));
}
if *is_tag {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
if !txn.is_tagged(txn.tags(&*ch), n.into())? {
to_download.push(CS::State(*m));
}
} else {
to_download.push(CS::State(*m));
}
}
}
Ok(RemoteDelta {
inodes,
remote_ref: None,
to_download,
ours_ge_dichotomy_set: HashSet::new(),
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs: Vec::new(),
})
}
pub async fn update_changelist_pushpull(
&mut self,
txn: &mut MutTxn<()>,
path: &[String],
current_channel: &ChannelRef<MutTxn<()>>,
force_cache: bool,
repo: &Repository,
specific_changes: &[String],
is_pull: bool,
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
debug!("update_changelist_pushpull");
if let RemoteRepo::LocalChannel(c) = self {
return update_changelist_local_channel(
c,
txn,
path,
current_channel,
repo,
specific_changes,
);
}
let id = if let Some(id) = self.get_id(txn).await? {
debug!("id = {:?}", id);
id
} else {
return self
.update_changelist_pushpull_from_scratch(txn, path, current_channel)
.await;
};
let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
let ours_ge_dichotomy: Vec<(u64, CS)> = txn
.iter_remote(&remote_ref.lock().remote, dichotomy_n)?
.filter_map(|k| {
debug!("filter_map {:?}", k);
match k.unwrap() {
(k, libpijul::pristine::Pair { a: hash, .. }) => {
let (k, hash) = (u64::from(*k), Hash::from(*hash));
if k >= dichotomy_n {
Some((k, CS::Change(hash)))
} else {
None
}
}
}
})
.collect();
let (inodes, theirs_ge_dichotomy) =
self.download_changelist_nocache(dichotomy_n, path).await?;
debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
let ours_ge_dichotomy_set = ours_ge_dichotomy
.iter()
.map(|(_, h)| h)
.copied()
.collect::<HashSet<CS>>();
let mut theirs_ge_dichotomy_set = HashSet::new();
for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
theirs_ge_dichotomy_set.insert(CS::Change(*h));
if *is_tag {
theirs_ge_dichotomy_set.insert(CS::State(*m));
}
}
let remote_unrecs = remote_unrecs(
txn,
current_channel,
&ours_ge_dichotomy,
&theirs_ge_dichotomy_set,
)?;
let should_cache = force_cache || remote_unrecs.is_empty();
debug!(
"should_cache = {:?} {:?} {:?}",
force_cache, remote_unrecs, should_cache
);
if should_cache {
use libpijul::ChannelMutTxnT;
for (k, t) in ours_ge_dichotomy.iter().copied() {
match t {
CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
CS::Change(_) => {
txn.del_remote(&mut remote_ref, k)?;
}
}
}
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
debug!("theirs: {:?} {:?} {:?}", n, h, m);
txn.put_remote(&mut remote_ref, n, (h, m))?;
if is_tag {
txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
}
}
}
if !specific_changes.is_empty() {
let to_download = specific_changes
.iter()
.map(|h| {
if is_pull {
{
if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
return Ok(CS::State(t.0));
}
}
Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
} else {
if let Ok(t) = txn.state_from_prefix(¤t_channel.read().states, h) {
Ok(CS::State(t.0))
} else {
Ok(CS::Change(txn.hash_from_prefix(h)?.0))
}
}
})
.collect::<Result<Vec<_>, anyhow::Error>>();
Ok(RemoteDelta {
inodes,
remote_ref: Some(remote_ref),
to_download: to_download?,
ours_ge_dichotomy_set,
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs,
})
} else {
let mut to_download: Vec<CS> = Vec::new();
let mut to_download_ = HashSet::new();
for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
let (_, p) = x?;
let h: Hash = p.a.into();
if txn
.channel_has_state(txn.states(¤t_channel.read()), &p.b)
.unwrap()
.is_some()
{
break;
}
if txn.get_revchanges(¤t_channel, &h).unwrap().is_none() {
let h = CS::Change(h);
if to_download_.insert(h.clone()) {
to_download.push(h);
}
}
}
for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
debug!(
"update_changelist_pushpull line {}, {:?} {:?}",
line!(),
n,
h
);
let ch = CS::Change(*h);
if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {
if to_download_.insert(ch.clone()) {
to_download.push(ch.clone());
}
if *is_tag {
to_download.push(CS::State(*m));
}
} else if *is_tag {
let has_tag = if let Some(n) =
txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?
{
txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?
} else {
false
};
if !has_tag {
to_download.push(CS::State(*m));
}
}
if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
use libpijul::ChannelMutTxnT;
txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
if *is_tag {
let mut rem = remote_ref.lock();
txn.put_tags(&mut rem.tags, *n, m)?;
}
}
}
Ok(RemoteDelta {
inodes,
remote_ref: Some(remote_ref),
to_download,
ours_ge_dichotomy_set,
theirs_ge_dichotomy,
theirs_ge_dichotomy_set,
remote_unrecs,
})
}
}
pub async fn download_changelist_nocache(
&mut self,
from: u64,
paths: &[String],
) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
let mut v = Vec::new();
let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
debug!("no cache: {:?}", h);
Ok(v.push((n, h, m, m2)))
};
let r = match *self {
RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
RemoteRepo::LocalChannel(_) => HashSet::new(),
RemoteRepo::None => unreachable!(),
};
Ok((r, v))
}
async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
&mut self,
txn: &T,
remote: &libpijul::pristine::Remote<T>,
) -> Result<u64, anyhow::Error> {
let mut a = 0;
let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
debug!("dichotomy_changelist: {:?} {:?}", u, v);
(u, (&v.b).into())
} else {
debug!("the local copy of the remote has no changes");
return Ok(0);
};
let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
v.into()
} else {
Merkle::zero()
};
debug!("last_state: {:?} {:?}", state, last_statet);
if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
debug!("remote last_state: {:?} {:?}", s, st);
if s == state && st == last_statet {
return Ok(b + 1);
}
}
while a < b {
let mid = (a + b) / 2;
let (mid, state) = {
let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
(a, b.b)
};
let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
b.b.into()
} else {
last_statet
};
let remote_state = self.get_state(txn, Some(mid)).await?;
debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
if let Some((_, remote_state, remote_statet)) = remote_state {
if remote_state == state && remote_statet == statet {
if a == mid {
return Ok(a + 1);
} else {
a = mid;
continue;
}
}
}
if b == mid {
break;
} else {
b = mid
}
}
Ok(a)
}
async fn get_state<T: libpijul::TxnTExt>(
&mut self,
txn: &T,
mid: Option<u64>,
) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => l.get_state(mid),
RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
RemoteRepo::LocalChannel(ref channel) => {
if let Some(channel) = txn.load_channel(&channel)? {
local::get_state(txn, &channel, mid)
} else {
Ok(None)
}
}
RemoteRepo::None => unreachable!(),
}
}
async fn get_id<T: libpijul::TxnTExt + 'static>(
&mut self,
txn: &T,
) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
match *self {
RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
RemoteRepo::Ssh(ref mut s) => s.get_id().await,
RemoteRepo::Http(ref h) => h.get_id().await,
RemoteRepo::LocalChannel(ref channel) => {
if let Some(channel) = txn.load_channel(&channel)? {
Ok(txn.id(&*channel.read()).cloned())
} else {
Err(anyhow::anyhow!(
"Unable to retrieve RemoteId for LocalChannel remote"
))
}
}
RemoteRepo::None => unreachable!(),
}
}
pub async fn archive<W: std::io::Write + Send + 'static>(
&mut self,
prefix: Option<String>,
state: Option<(Merkle, &[Hash])>,
umask: u16,
w: W,
) -> Result<u64, anyhow::Error> {
match *self {
RemoteRepo::Local(ref mut l) => {
debug!("archiving local repo");
let changes = libpijul::changestore::filesystem::FileSystem::from_root(
&l.root,
pijul_repository::max_files()?,
);
let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
let conflicts = if let Some((state, extra)) = state {
let txn = l.pristine.arc_txn_begin()?;
let channel = {
let txn = txn.read();
txn.load_channel(&l.channel)?.unwrap()
};
txn.archive_with_state(&changes, &channel, &state, extra, &mut tarball, 0)?
} else {
let txn = l.pristine.arc_txn_begin()?;
let channel = {
let txn = txn.read();
txn.load_channel(&l.channel)?.unwrap()
};
txn.archive(&changes, &channel, &mut tarball)?
};
Ok(conflicts.len() as u64)
}
RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
RemoteRepo::LocalChannel(_) => unreachable!(),
RemoteRepo::None => unreachable!(),
}
}
async fn download_changelist<T: MutTxnTExt>(
&mut self,
txn: &mut T,
remote: &mut RemoteRef<T>,
from: u64,
paths: &[String],
) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
let (ref mut txn, ref mut remote) = *a;
txn.put_remote(remote, n, (h, m))?;
if is_tag {
txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
}
Ok(())
};
match *self {
RemoteRepo::Local(ref mut l) => {
l.download_changelist(f, &mut (txn, remote), from, paths)
}
RemoteRepo::Ssh(ref mut s) => {
s.download_changelist(f, &mut (txn, remote), from, paths)
.await
}
RemoteRepo::Http(ref h) => {
h.download_changelist(f, &mut (txn, remote), from, paths)
.await
}
RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
RemoteRepo::None => unreachable!(),
}
}
pub async fn upload_changes<T: MutTxnTExt + 'static>(
&mut self,
txn: &mut T,
changes_dir: &Path,
to_channel: Option<&str>,
changes: &[CS],
) -> Result<(), anyhow::Error> {
let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
match self {
RemoteRepo::Local(ref mut l) => {
l.upload_changes(upload_bar, &changes_dir, to_channel, changes)?
}
RemoteRepo::Ssh(ref mut s) => {
s.upload_changes(upload_bar, &changes_dir, to_channel, changes)
.await?
}
RemoteRepo::Http(ref h) => {
h.upload_changes(upload_bar, &changes_dir, to_channel, changes)
.await?
}
RemoteRepo::LocalChannel(ref channel) => {
let mut channel = txn.open_or_create_channel(channel)?;
let store = libpijul::changestore::filesystem::FileSystem::from_changes(
changes_dir.to_path_buf(),
pijul_repository::max_files()?,
);
local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
}
RemoteRepo::None => unreachable!(),
}
Ok(())
}
async fn download_changes(
async fn download_changes(
&mut self,
progress_bar: ProgressBar,
mut hashes: mpsc::UnboundedReceiver<CS>,
mut send: mpsc::Sender<(CS, bool)>,
path: &Path,
full: bool,
) -> Result<bool, anyhow::Error> {
debug!("download_changes");
match *self {
RemoteRepo::Local(ref mut l) => {
l.download_changes(progress_bar, &mut hashes, &mut send, path)
.await?
}
RemoteRepo::Ssh(ref mut s) => {
s.download_changes(progress_bar, &mut hashes, &mut send, path, full)
.await?
}
RemoteRepo::Http(ref mut h) => {
h.download_changes(progress_bar, &mut hashes, &mut send, path, full)
.await?
}
RemoteRepo::LocalChannel(_) => {
while let Some(c) = hashes.recv().await {
send.send((c, true)).await?;
}
}
RemoteRepo::None => unreachable!(),
}
Ok(true)
}
pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
&mut self,
repo: &mut Repository,
remote: &RemoteRef<T>,
) -> Result<(), anyhow::Error> {
debug!("Downloading identities");
let mut id_path = repo.path.clone();
id_path.push(DOT_DIR);
id_path.push("identities");
let rev = None;
let r = match *self {
RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
RemoteRepo::LocalChannel(_) => 0,
RemoteRepo::None => unreachable!(),
};
remote.set_id_revision(r);
Ok(())
}
pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
match *self {
RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
RemoteRepo::Http(ref mut h) => h.prove(key).await,
RemoteRepo::None => unreachable!(),
_ => Ok(()),
}
}
pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
to_apply: &[CS],
inodes: &HashSet<Position<Hash>>,
do_apply: bool,
) -> Result<Vec<CS>, anyhow::Error> {
let apply_len = to_apply.len() as u64;
let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
let apply_bar = if do_apply {
Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
} else {
None
};
let changes_dir = repo.changes_dir.clone();
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
for h in to_apply {
let mut to_apply_inodes = HashSet::new();
{
let to_apply_inodes = &mut to_apply_inodes;
self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
let stream = download_changes_rec(&cx, repo, to_apply);
pin_mut!(stream);
use tokio::sync::{mpsc, watch};
use tokio::task::LocalSet;
use crate::context::DownloadContext;
})
state.futs.push(async move {
wait_for(&state, items).await;
Ok(())
}.boxed_local());
while let Some(v) = state.futs.next().await {
v?;
for mut r in v {
let _ = r.changed().await;
for item in items {
v.push(go(state, item));
if !follow {
return Ok(());
}
wait_for(state, state.repo.changes.get_dependencies(&hash)?.into_iter().map(CS::Change)).await;
Ok(())
}
async fn wait_for<'a>(state: &'a State<'a>, items: impl IntoIterator<Item=CS>) {
let mut v = Vec::new();
let CS::Change(hash) = cs else {
return Ok(());
};
async fn go2<'a>(state: &'a State<'a>, cs: CS) -> anyhow::Result<()> {
let follow = state.cx.download(cs).await?;
state.futs.push(async move {
let _tx = tx;
go2(state, cs).await?;
state.sender.send(Ok(cs)).await?;
Ok(())
}.boxed_local());
rx
}).clone()
items: impl IntoIterator<Item = CS> + 'a,
) -> impl Stream<Item=anyhow::Result<CS>> + 'a {
try_stream(|sender| async move {
struct State<'a> {
cx: DownloadContext<'a>,
repo: &'a Repository,
sender: mpsc::Sender<anyhow::Result<CS>>,
futs: FuturesUnordered<LocalBoxFuture<'a, anyhow::Result<()>>>,
barriers: RefCell<HashMap<CS, watch::Receiver<()>>>,
}
let state = State {
cx,
repo,
sender,
futs: Default::default(),
barriers: RefCell::new(Default::default()),
};
fn go<'a>(state: &'a State<'a>, cs: CS) -> watch::Receiver<()> {
state.barriers.borrow_mut().entry(cs).or_insert_with(|| {
let (tx, mut rx) = watch::channel(());
rx.mark_unchanged();
cx: DownloadContext<'a>,
repo: &'a Repository,
async fn download_changes_rec<'a>(
todo!()
Ok(())
}).await?;
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
F: Future<Output = Result<(), E>>,
{
stream(|sender| {
let fut = op(sender.clone());
async move {
if let Err(e) = fut.await {
let _ = sender.send(Err(e));
}
}
})
}
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output=()>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
let mut ws = ApplyWorkspace::new();
while let Some(h) = stream.try_next().await? {
debug!("to_apply: {:?}", h);
let mut touches_inodes = inodes.is_empty();
debug!("inodes = {:?}", inodes);
if !touches_inodes {
if let CS::Change(ref h) = h {
let changes = repo.changes.get_changes(h)?;
touches_inodes |= changes.iter().any(|c| {
c.iter().any(|c| {
let inode = c.inode();
debug!("inode = {:?}", inode);
inodes.contains(&Position {
change: inode.change.unwrap_or(*h),
pos: inode.pos,
})
})
})
}
}
if !touches_inodes {
touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
}
if !touches_inodes {
continue;
}
to_apply_inodes.insert(h);
if let Some(apply_bar) = &apply_bar {
info!("Applying {:?}", h);
apply_bar.inc(1);
debug!("apply");
if let CS::Change(ref h) = h {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
}
debug!("applied");
} else {
debug!("not applying {:?}", h)
}
}
Ok(())
})
.await?;
}
let mut result = Vec::with_capacity(to_apply_inodes.len());
for h in to_apply {
if to_apply_inodes.contains(&h) {
result.push(*h)
}
}
debug!("finished");
Ok(result)
}
pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &Repository,
txn: &mut T,
channel: &ChannelRef<T>,
tag: &[Hash],
) -> Result<(), anyhow::Error> {
let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
let mut hashes = Vec::new();
{
let txn = &mut *txn;
let hashes = &mut hashes;
self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
pin_mut!(stream);
let mut ws = ApplyWorkspace::new();
while let Some(cs) = stream.try_next().await? {
if let CS::Change(hash) = cs {
let mut channel = channel.write();
txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
}
hashes.push(cs);
}
Ok(())
})
.await?;
}
self.complete_changes(repo, txn, channel, &hashes, false)
.await?;
Ok(())
}
pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
channel: &mut ChannelRef<T>,
state: Merkle,
) -> Result<(), anyhow::Error> {
let id = if let Some(id) = self.get_id(txn).await? {
id
} else {
return Ok(());
};
self.update_changelist(txn, &[]).await?;
let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
let mut to_pull = Vec::new();
let mut found = false;
for x in txn.iter_remote(&remote.lock().remote, 0)? {
let (n, p) = x?;
debug!("{:?} {:?}", n, p);
to_pull.push(CS::Change(p.a.into()));
if p.b == state {
found = true;
break;
}
}
if !found {
bail!("State not found: {:?}", state)
}
self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
.await?;
self.update_identities(repo, &remote).await?;
self.complete_changes(repo, txn, channel, &to_pull, false)
.await?;
Ok(())
}
pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
&mut self,
repo: &pijul_repository::Repository,
txn: &T,
local_channel: &ChannelRef<T>,
changes: &[CS],
full: bool,
) -> Result<(), anyhow::Error> {
debug!("complete changes {:?}", changes);
let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
let mut waiting = Vec::new();
for c in changes {
let CS::Change(c) = c else { continue };
let sc = c.into();
if repo
.changes
.has_contents(*c, txn.get_internal(&sc)?.cloned())
{
debug!("has contents {:?}", c);
continue;
}
if full {
waiting.push(cx.download(CS::Change(*c))?);
continue;
}
let Some(&change) = txn.get_internal(&sc)? else {
debug!("could not find internal for {:?}", sc);
continue;
};
let v = libpijul::pristine::Vertex {
change,
start: ChangePosition(0u64.into()),
end: ChangePosition(0u64.into()),
};
let channel = local_channel.read();
let graph = txn.graph(&channel);
for x in txn.iter_graph(graph, Some(&v))? {
let (v, e) = x?;
if v.change > change {
break;
} else if e.flag().is_alive_parent() {
waiting.push(cx.download(CS::Change(*c))?);
break;
}
}
}
for w in waiting {
w.await?;
}
Ok(())
})
.await?;
Ok(())
}
pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
&mut self,
repo: &mut Repository,
txn: &mut T,
local_channel: &mut ChannelRef<T>,
path: &[String],
) -> Result<(), anyhow::Error> {
let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
x
} else {
bail!("Channel not found")
};
let mut pullable = Vec::new();
{
let rem = remote_changes.lock();
for x in txn.iter_remote(&rem.remote, 0)? {
let (_, p) = x?;
pullable.push(CS::Change(p.a.into()))
}
}
self.pull(repo, txn, local_channel, &pullable, &inodes, true)
.await?;
self.update_identities(repo, &remote_changes).await?;
self.complete_changes(repo, txn, local_channel, &pullable, false)
.await?;
Ok(())
}
}
lazy_static! {
static ref CHANGELIST_LINE: Regex = Regex::new(
r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
)
.unwrap();
static ref PATHS_LINE: Regex =
Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
}
enum ListLine {
Change {
n: u64,
h: Hash,
m: Merkle,
tag: bool,
},
Position(Position<Hash>),
Error(String),
}
fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
debug!("data = {:?}", data);
if let Some(caps) = CHANGELIST_LINE.captures(data) {
if let (Some(h), Some(m)) = (
Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
) {
return Ok(ListLine::Change {
n: caps.name("num").unwrap().as_str().parse().unwrap(),
h,
m,
tag: caps.name("tag").is_some(),
});
}
}
if data.starts_with("error:") {
return Ok(ListLine::Error(data.split_at(6).1.to_string()));
}
if let Some(caps) = PATHS_LINE.captures(data) {
return Ok(ListLine::Position(Position {
change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
pos: ChangePosition(
caps.name("num")
.unwrap()
.as_str()
.parse::<u64>()
.unwrap()
.into(),
),
}));
}
debug!("offending line: {:?}", data);
bail!("Protocol error")
}
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
txn: &T,
current_channel: &ChannelRef<T>,
ours_ge_dichotomy: &[(u64, CS)],
theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
let mut remote_unrecs = Vec::new();
for (n, hash) in ours_ge_dichotomy {
debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
if theirs_ge_dichotomy_set.contains(hash) {
debug!("still present");
continue;
} else {
let has_it = match hash {
CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
CS::State(state) => {
let ch = current_channel.read();
if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
txn.is_tagged(txn.tags(&*ch), n.into())?
} else {
false
}
}
};
if has_it {
remote_unrecs.push((*n, *hash))
} else {
continue;
}
}
}
Ok(remote_unrecs)
}
fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
F: Future<Output = Result<(), E>>,
{
stream(|sender| {
let fut = op(sender.clone());
async move {
if let Err(e) = fut.await {
let _ = sender.send(Err(e));
}
}
})
}
fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
C: FnOnce(mpsc::Sender<T>) -> F,
F: Future<Output = ()>,
{
struct Impl<F, T> {
fut: Once<F>,
rx: mpsc::Receiver<T>,
}
impl<F, T> Stream for Impl<F, T>
where
F: Future<Output = ()>,
{
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (fut, mut rx) = unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
};
match (fut.poll_next(cx), rx.poll_recv(cx)) {
(_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
(Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
let (tx, rx) = mpsc::channel(1);
Impl {
fut: stream::once(op(tx)),
rx,
}
}
fn download_changes_rec<'a, I>(
cx: &'a DownloadContext,
repo: &'a Repository,
items: I,
) -> impl Stream<Item = anyhow::Result<CS>> + 'a
where
I: IntoIterator + 'a,
I::Item: Borrow<CS>,
{
try_stream(move |sender| async move {
let mut tasks = FuturesUnordered::new();
let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
let mut fetched = HashSet::new();
let make_download_job = |cs| async move {
match cx.download(cs) {
Ok(v) => (cs, v.await),
Err(e) => (cs, Err(e)),
}
};
for item in items {
let item = *item.borrow();
tasks.push(make_download_job(item));
if let CS::Change(hash) = item {
rev_deps.insert(hash, Vec::new());
}
}
while let Some((cs, res)) = tasks.next().await {
debug!("{:?} finished downloading (result: {:?})", cs, &res);
let follow = res?;
let CS::Change(hash) = cs else {
debug!("it is not a change, we're done");
sender.send(Ok(cs)).await?;
continue;
};
fetched.insert(hash);
if follow {
info!("{:?}", hash);
let pending_deps = match pending_deps.entry(hash) {
Entry::Occupied(_) => unreachable!(),
Entry::Vacant(v) => v.insert(Default::default()),
};
for dep in repo.changes.get_dependencies(&hash)? {
if !fetched.contains(&dep) {
pending_deps.insert(dep);
}
rev_deps.entry(dep).or_insert_with(|| {
tasks.push(make_download_job(CS::Change(dep)));
Default::default()
});
}
}
let mut to_check = vec![hash];
while let Some(hash) = to_check.pop() {
if let Some(pending_deps) = pending_deps.get(&hash) {
if !pending_deps.is_empty() {
continue;
}
}
sender.send(Ok(CS::Change(hash))).await?;
for dep in rev_deps.get(&hash).into_iter().flatten() {
if let Some(p) = pending_deps.get_mut(dep) {
assert!(p.remove(dep));
to_check.push(*dep);
}
}
}
}
Ok(())
})
}