use std::cell::RefCell;
use std::borrow::Borrow;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use std::future::Future;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::Poll;

use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::{Sink, Stream};
use futures_util::future::{FusedFuture, LocalBoxFuture};
use futures_util::stream::{FuturesUnordered, Once};
use futures_util::{FutureExt, stream, StreamExt};
use futures_util::task::LocalSpawnExt;
use futures::{pin_mut, Stream};
use futures_util::stream::{FuturesUnordered, Once};
use futures_util::{stream, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use libpijul::changestore::{ChangeStore, filesystem};

use crate::context::DownloadContext;
use libpijul::changestore::{filesystem, ChangeStore};
use libpijul::pristine::{
    sanakirja::MutTxn, Base32, ChangeId, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef,
    TxnT,
};
use libpijul::{DOT_DIR, HashMap};
use libpijul::pristine::{ChangePosition, Position};
use libpijul::{ApplyWorkspace, HashMap, DOT_DIR};
use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
use log::{debug, info};
use regex::Regex;
use tokio::sync::mpsc;

use pijul_config::*;
use pijul_identity::Complete;
use pijul_repository::*;

pub mod ssh;
use ssh::*;

pub mod local;
use local::*;

pub mod http;
use http::*;

mod context;

mod context;

use pijul_interaction::{
    ProgressBar, Spinner, APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, UPLOAD_MESSAGE,
};

pub const PROTOCOL_VERSION: usize = 3;

pub enum RemoteRepo {
    Local(Local),
    Ssh(Ssh),
    Http(Http),
    LocalChannel(String),
    None,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CS {
    Change(Hash),
    State(Merkle),
}

impl filesystem::CacheEntry for CS {
    fn hash(&self) -> String {
        match self {
            CS::Change(v) => v.hash(),
            CS::State(v) => v.hash(),
        }
    }

    fn ext(&self) -> &str {
        match self {
            CS::Change(v) => v.ext(),
            CS::State(v) => v.ext(),
        }
    }
}

pub async fn repository(
    repo: &Repository,
    self_path: Option<&Path>,
    // User name in case it isn't provided in the `name` argument already.
    user: Option<&str>,
    name: &str,
    channel: &str,
    no_cert_check: bool,
    with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
    if let Some(name) = repo.config.remotes.iter().find(|e| e.name() == name) {
        name.to_remote(channel, no_cert_check, with_path).await
    } else {
        unknown_remote(self_path, user, name, channel, no_cert_check, with_path).await
    }
}

/// Associate a generated key with a remote identity. Patches authored
/// by unproven keys will only display the key as the author.
pub async fn prove(
    identity: &Complete,
    origin: Option<&str>,
    no_cert_check: bool,
) -> Result<(), anyhow::Error> {
    let remote = origin.unwrap_or(&identity.config.author.origin);
    let mut stderr = std::io::stderr();
    writeln!(
        stderr,
        "Linking identity `{}` with {}@{}",
        &identity.name, &identity.config.author.username, remote
    )?;

    let mut remote = if let Ok(repo) = Repository::find_root(None) {
        repository(
            &repo,
            None,
            Some(&identity.config.author.username),
            &remote,
            libpijul::DEFAULT_CHANNEL,
            no_cert_check,
            false,
        )
        .await?
    } else {
        unknown_remote(
            None,
            Some(&identity.config.author.username),
            &remote,
            libpijul::DEFAULT_CHANNEL,
            no_cert_check,
            false,
        )
        .await?
    };

    let (key, _password) = identity
        .credentials
        .clone()
        .unwrap()
        .decrypt(&identity.name)?;
    remote.prove(key).await?;

    Ok(())
}

#[async_trait]
pub trait ToRemote {
    async fn to_remote(
        &self,
        channel: &str,
        no_cert_check: bool,
        with_path: bool,
    ) -> Result<RemoteRepo, anyhow::Error>;
}

#[async_trait]
impl ToRemote for RemoteConfig {
    async fn to_remote(
        &self,
        channel: &str,
        no_cert_check: bool,
        with_path: bool,
    ) -> Result<RemoteRepo, anyhow::Error> {
        match self {
            RemoteConfig::Ssh { ssh, .. } => {
                if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
                    debug!("unknown_remote, ssh = {:?}", ssh);
                    if let Some(c) = sshr.connect(ssh, channel).await? {
                        return Ok(RemoteRepo::Ssh(c));
                    }
                }
                bail!("Remote not found: {:?}", ssh)
            }
            RemoteConfig::Http {
                http,
                headers,
                name,
            } => {
                let mut h = Vec::new();
                for (k, v) in headers.iter() {
                    match v {
                        RemoteHttpHeader::String(s) => {
                            h.push((k.clone(), s.clone()));
                        }
                        RemoteHttpHeader::Shell(shell) => {
                            h.push((k.clone(), shell_cmd(&shell.shell)?));
                        }
                    }
                }
                return Ok(RemoteRepo::Http(Http {
                    url: http.parse().unwrap(),
                    channel: channel.to_string(),
                    client: reqwest::ClientBuilder::new()
                        .danger_accept_invalid_certs(no_cert_check)
                        .build()?,
                    headers: h,
                    name: name.to_string(),
                }));
            }
        }
    }
}

pub async fn unknown_remote(
    self_path: Option<&Path>,
    user: Option<&str>,
    name: &str,
    channel: &str,
    no_cert_check: bool,
    with_path: bool,
) -> Result<RemoteRepo, anyhow::Error> {
    if let Ok(url) = url::Url::parse(name) {
        let scheme = url.scheme();
        if scheme == "http" || scheme == "https" {
            debug!("unknown_remote, http = {:?}", name);
            return Ok(RemoteRepo::Http(Http {
                url,
                channel: channel.to_string(),
                client: reqwest::ClientBuilder::new()
                    .danger_accept_invalid_certs(no_cert_check)
                    .build()?,
                headers: Vec::new(),
                name: name.to_string(),
            }));
        } else if scheme == "ssh" {
            if let Some(mut ssh) = ssh_remote(user, name, with_path) {
                debug!("unknown_remote, ssh = {:?}", ssh);
                if let Some(c) = ssh.connect(name, channel).await? {
                    return Ok(RemoteRepo::Ssh(c));
                }
            }
            bail!("Remote not found: {:?}", name)
        } else {
            bail!("Remote scheme not supported: {:?}", scheme)
        }
    }
    if let Ok(root) = std::fs::canonicalize(name) {
        if let Some(path) = self_path {
            let path = std::fs::canonicalize(path)?;
            if path == root {
                return Ok(RemoteRepo::LocalChannel(channel.to_string()));
            }
        }

        let mut dot_dir = root.join(DOT_DIR);
        let changes_dir = dot_dir.join(CHANGES_DIR);

        dot_dir.push(PRISTINE_DIR);
        debug!("dot_dir = {:?}", dot_dir);
        match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
            Ok(pristine) => {
                debug!("pristine done");
                return Ok(RemoteRepo::Local(Local {
                    root: Path::new(name).to_path_buf(),
                    channel: channel.to_string(),
                    changes_dir,
                    pristine: Arc::new(pristine),
                    name: name.to_string(),
                }));
            }
            Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
                sanakirja::Error::IO(e),
            )) if e.kind() == std::io::ErrorKind::NotFound => {
                debug!("repo not found")
            }
            Err(e) => return Err(e.into()),
        }
    }
    if let Some(mut ssh) = ssh_remote(user, name, with_path) {
        debug!("unknown_remote, ssh = {:?}", ssh);
        if let Some(c) = ssh.connect(name, channel).await? {
            return Ok(RemoteRepo::Ssh(c));
        }
    }
    bail!("Remote not found: {:?}", name)
}

// Extracting this saves a little bit of duplication.
pub fn get_local_inodes(
    txn: &mut MutTxn<()>,
    channel: &ChannelRef<MutTxn<()>>,
    repo: &Repository,
    path: &[String],
) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
    let mut paths = HashSet::new();
    for path in path.iter() {
        let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
        if ambiguous {
            bail!("Ambiguous path: {:?}", path)
        }
        paths.insert(p);
        paths.extend(
            libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
        );
    }
    Ok(paths)
}

/// Embellished [`RemoteDelta`] that has information specific
/// to a push operation. We want to know what our options are
/// for changes to upload, whether the remote has unrecorded relevant changes,
/// and whether the remote has changes we don't know about, since those might
/// effect whether or not we actually want to go through with the push.
pub struct PushDelta {
    pub to_upload: Vec<CS>,
    pub remote_unrecs: Vec<(u64, CS)>,
    pub unknown_changes: Vec<CS>,
}

/// For a [`RemoteRepo`] that's Local, Ssh, or Http
/// (anything other than a LocalChannel),
/// [`RemoteDelta`] contains data about the difference between
/// the "actual" state of the remote ('theirs') and the last version of it
/// that we cached ('ours'). The dichotomy is the last point at which the two
/// were the same. `remote_unrecs` is a list of changes which used to be
/// present in the remote, AND were present in the current channel we're
/// pulling to or pushing from. The significance of that is that if we knew
/// about a certain change but did not pull it, the user won't be notified
/// if it's unrecorded in the remote.
///
/// If the remote we're pulling from or pushing to is a LocalChannel,
/// (meaning it's just a different channel of the repo we're already in), then
/// `ours_ge_dichotomy`, `theirs_ge_dichotomy`, and `remote_unrecs` will be empty
/// since they have no meaning. If we're pulling from a LocalChannel,
/// there's no cache to have diverged from, and if we're pushing to a LocalChannel,
/// we're not going to suddenly be surprised by the presence of unknown changes.
///
/// This struct will be created by both a push and pull operation since both
/// need to update the changelist and will at least try to update the local
/// remote cache. For a push, this later gets turned into [`PushDelta`].
pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
    pub inodes: HashSet<Position<Hash>>,
    pub to_download: Vec<CS>,
    pub remote_ref: Option<RemoteRef<T>>,
    pub ours_ge_dichotomy_set: HashSet<CS>,
    pub theirs_ge_dichotomy_set: HashSet<CS>,
    // Keep the Vec representation around as well so that notification
    // for unknown changes during shows the hashes in order.
    pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
    pub remote_unrecs: Vec<(u64, CS)>,
}

impl RemoteDelta<MutTxn<()>> {
    /// Make a [`PushDelta`] from a [`RemoteDelta`]
    /// when the remote is a [`RemoteRepo::LocalChannel`].
    pub fn to_local_channel_push(
        self,
        remote_channel: &str,
        txn: &mut MutTxn<()>,
        path: &[String],
        channel: &ChannelRef<MutTxn<()>>,
        repo: &Repository,
    ) -> Result<PushDelta, anyhow::Error> {
        let mut to_upload = Vec::new();
        let inodes = get_local_inodes(txn, channel, repo, path)?;

        for x in txn.reverse_log(&*channel.read(), None)? {
            let (_, (h, _)) = x?;
            if let Some(channel) = txn.load_channel(remote_channel)? {
                let channel = channel.read();
                let h_int = txn.get_internal(h)?.unwrap();
                if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
                    if inodes.is_empty() {
                        to_upload.push(CS::Change(h.into()))
                    } else {
                        for p in inodes.iter() {
                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
                                to_upload.push(CS::Change(h.into()));
                                break;
                            }
                        }
                    }
                }
            }
        }
        assert!(self.ours_ge_dichotomy_set.is_empty());
        assert!(self.theirs_ge_dichotomy_set.is_empty());
        let d = PushDelta {
            to_upload: to_upload.into_iter().rev().collect(),
            remote_unrecs: self.remote_unrecs,
            unknown_changes: Vec::new(),
        };
        assert!(d.remote_unrecs.is_empty());
        Ok(d)
    }

    /// Make a [`PushDelta`] from a [`RemoteDelta`] when the remote
    /// is not a LocalChannel.
    pub fn to_remote_push(
        self,
        txn: &mut MutTxn<()>,
        path: &[String],
        channel: &ChannelRef<MutTxn<()>>,
        repo: &Repository,
    ) -> Result<PushDelta, anyhow::Error> {
        let mut to_upload = Vec::new();
        let inodes = get_local_inodes(txn, channel, repo, path)?;
        if let Some(ref remote_ref) = self.remote_ref {
            let mut tags: HashSet<Merkle> = HashSet::new();
            for x in txn.rev_iter_tags(&channel.read().tags, None)? {
                let (n, m) = x?;
                debug!("rev_iter_tags {:?} {:?}", n, m);
                // First, if the remote has exactly the same first n tags, break.
                if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
                    if p.b == m.b {
                        debug!("the remote has tag {:?}", p.a);
                        break;
                    }
                    if p.a != m.a {
                        // What to do here?  It is possible that state
                        // `n` is a different state than `m.a` in the
                        // remote, and is also tagged.
                    }
                } else {
                    tags.insert(m.a.into());
                }
            }
            debug!("tags = {:?}", tags);
            for x in txn.reverse_log(&*channel.read(), None)? {
                let (_, (h, m)) = x?;
                let h_unrecorded = self
                    .remote_unrecs
                    .iter()
                    .any(|(_, hh)| hh == &CS::Change(h.into()));
                if !h_unrecorded {
                    if txn.remote_has_state(remote_ref, &m)?.is_some() {
                        debug!("remote_has_state: {:?}", m);
                        break;
                    }
                }
                let h_int = txn.get_internal(h)?.unwrap();
                let h_deser = Hash::from(h);
                // For elements that are in the uncached remote changes (theirs_ge_dichotomy),
                // don't put those in to_upload since the remote we're pushing to
                // already has those changes.
                if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
                    && !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
                {
                    if inodes.is_empty() {
                        if tags.remove(&m.into()) {
                            to_upload.push(CS::State(m.into()));
                        }
                        to_upload.push(CS::Change(h_deser));
                    } else {
                        for p in inodes.iter() {
                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
                                to_upload.push(CS::Change(h_deser));
                                if tags.remove(&m.into()) {
                                    to_upload.push(CS::State(m.into()));
                                }
                                break;
                            }
                        }
                    }
                }
            }
            for t in tags.iter() {
                if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
                    if !txn.is_tagged(&remote_ref.lock().tags, n)? {
                        to_upload.push(CS::State(*t));
                    }
                } else {
                    debug!("the remote doesn't have state {:?}", t);
                }
            }
        }

        // { h | h \in theirs_ge_dichotomy /\ ~(h \in ours_ge_dichotomy) }
        // The set of their changes >= dichotomy that aren't
        // already known to our set of changes after the dichotomy.
        let mut unknown_changes = Vec::new();
        for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
            let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
            let change = CS::Change(*h);
            if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
                unknown_changes.push(change)
            }
            if *is_tag {
                let m_is_known = if let Some(n) = txn
                    .channel_has_state(txn.states(&*channel.read()), &m.into())
                    .unwrap()
                {
                    txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
                } else {
                    false
                };
                if !m_is_known {
                    unknown_changes.push(CS::State(*m))
                }
            }
        }

        Ok(PushDelta {
            to_upload: to_upload.into_iter().rev().collect(),
            remote_unrecs: self.remote_unrecs,
            unknown_changes,
        })
    }
}

/// Create a [`RemoteDelta`] for a [`RemoteRepo::LocalChannel`].
/// Since this case doesn't have a local remote cache to worry about,
/// mainly just calculates the `to_download` list of changes.
pub fn update_changelist_local_channel(
    remote_channel: &str,
    txn: &mut MutTxn<()>,
    path: &[String],
    current_channel: &ChannelRef<MutTxn<()>>,
    repo: &Repository,
    specific_changes: &[String],
) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
    if !specific_changes.is_empty() {
        let mut to_download = Vec::new();
        for h in specific_changes {
            let h = txn.hash_from_prefix(h)?.0;
            if txn.get_revchanges(current_channel, &h)?.is_none() {
                to_download.push(CS::Change(h));
            }
        }
        Ok(RemoteDelta {
            inodes: HashSet::new(),
            to_download,
            remote_ref: None,
            ours_ge_dichotomy_set: HashSet::new(),
            theirs_ge_dichotomy: Vec::new(),
            theirs_ge_dichotomy_set: HashSet::new(),
            remote_unrecs: Vec::new(),
        })
    } else {
        let mut inodes = HashSet::new();
        let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
        let mut to_download = Vec::new();
        inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
            change: txn.get_external(&x.change).unwrap().unwrap().into(),
            pos: x.pos,
        }));
        if let Some(remote_channel) = txn.load_channel(remote_channel)? {
            let remote_channel = remote_channel.read();
            for x in txn.reverse_log(&remote_channel, None)? {
                let (_, (h, m)) = x?;
                if txn
                    .channel_has_state(txn.states(&*current_channel.read()), &m)?
                    .is_some()
                {
                    break;
                }
                let h_int = txn.get_internal(h)?.unwrap();
                if txn
                    .get_changeset(txn.changes(&*current_channel.read()), h_int)?
                    .is_none()
                {
                    if inodes_.is_empty()
                        || inodes_.iter().any(|&inode| {
                            txn.get_rev_touched_files(h_int, Some(&inode))
                                .unwrap()
                                .is_some()
                        })
                    {
                        to_download.push(CS::Change(h.into()));
                    }
                }
            }
        }
        Ok(RemoteDelta {
            inodes,
            to_download,
            remote_ref: None,
            ours_ge_dichotomy_set: HashSet::new(),
            theirs_ge_dichotomy: Vec::new(),
            theirs_ge_dichotomy_set: HashSet::new(),
            remote_unrecs: Vec::new(),
        })
    }
}

impl RemoteRepo {
    fn name(&self) -> Option<&str> {
        match *self {
            RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
            RemoteRepo::Local(ref l) => Some(l.name.as_str()),
            RemoteRepo::Http(ref h) => Some(h.name.as_str()),
            RemoteRepo::LocalChannel(_) => None,
            RemoteRepo::None => unreachable!(),
        }
    }

    pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
        match *self {
            RemoteRepo::Ssh(ref s) => {
                if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
                    Ok(Some(s.name.split_at(sep + 1).1.to_string()))
                } else {
                    Ok(Some(s.name.as_str().to_string()))
                }
            }
            RemoteRepo::Local(ref l) => {
                if let Some(file) = l.root.file_name() {
                    Ok(Some(
                        file.to_str()
                            .context("failed to decode local repository name")?
                            .to_string(),
                    ))
                } else {
                    Ok(None)
                }
            }
            RemoteRepo::Http(ref h) => {
                if let Some(name) = libpijul::path::file_name(h.url.path()) {
                    if !name.trim().is_empty() {
                        return Ok(Some(name.trim().to_string()));
                    }
                }
                Ok(h.url.host().map(|h| h.to_string()))
            }
            RemoteRepo::LocalChannel(_) => Ok(None),
            RemoteRepo::None => unreachable!(),
        }
    }

    pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
        if let RemoteRepo::Ssh(s) = self {
            s.finish().await?
        }
        Ok(())
    }

    pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
        &mut self,
        txn: &mut T,
        path: &[String],
    ) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
        debug!("update_changelist");
        let id = if let Some(id) = self.get_id(txn).await? {
            id
        } else {
            return Ok(None);
        };
        let mut remote = if let Some(name) = self.name() {
            txn.open_or_create_remote(id, name)?
        } else {
            return Ok(None);
        };
        let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
        debug!("update changelist {:?}", n);
        let v: Vec<_> = txn
            .iter_remote(&remote.lock().remote, n)?
            .filter_map(|k| {
                debug!("filter_map {:?}", k);
                let k = (*k.unwrap().0).into();
                if k >= n {
                    Some(k)
                } else {
                    None
                }
            })
            .collect();
        for k in v {
            debug!("deleting {:?}", k);
            txn.del_remote(&mut remote, k)?;
        }
        let v: Vec<_> = txn
            .iter_tags(&remote.lock().tags, n)?
            .filter_map(|k| {
                debug!("filter_map {:?}", k);
                let k = (*k.unwrap().0).into();
                if k >= n {
                    Some(k)
                } else {
                    None
                }
            })
            .collect();
        for k in v {
            debug!("deleting {:?}", k);
            txn.del_tags(&mut remote.lock().tags, k)?;
        }

        debug!("deleted");
        let paths = self.download_changelist(txn, &mut remote, n, path).await?;
        Ok(Some((paths, remote)))
    }

    async fn update_changelist_pushpull_from_scratch(
        &mut self,
        txn: &mut MutTxn<()>,
        path: &[String],
        current_channel: &ChannelRef<MutTxn<()>>,
    ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
        debug!("no id, starting from scratch");
        let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
        let mut theirs_ge_dichotomy_set = HashSet::new();
        let mut to_download = Vec::new();
        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
            theirs_ge_dichotomy_set.insert(CS::Change(*h));
            if txn.get_revchanges(current_channel, h)?.is_none() {
                to_download.push(CS::Change(*h));
            }
            if *is_tag {
                let ch = current_channel.read();
                if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
                    if !txn.is_tagged(txn.tags(&*ch), n.into())? {
                        to_download.push(CS::State(*m));
                    }
                } else {
                    to_download.push(CS::State(*m));
                }
            }
        }
        Ok(RemoteDelta {
            inodes,
            remote_ref: None,
            to_download,
            ours_ge_dichotomy_set: HashSet::new(),
            theirs_ge_dichotomy,
            theirs_ge_dichotomy_set,
            remote_unrecs: Vec::new(),
        })
    }

    /// Creates a [`RemoteDelta`].
    ///
    /// IF:
    ///    the RemoteRepo is a [`RemoteRepo::LocalChannel`], delegate to
    ///    the simpler method [`update_changelist_local_channel`], returning the
    ///    `to_download` list of changes.
    ///
    /// ELSE:
    ///    calculate the `to_download` list of changes. Additionally, if there are
    ///    no remote unrecords, update the local remote cache. If there are remote unrecords,
    ///    calculate and return information about the difference between our cached version
    ///    of the remote, and their version of the remote.
    pub async fn update_changelist_pushpull(
        &mut self,
        txn: &mut MutTxn<()>,
        path: &[String],
        current_channel: &ChannelRef<MutTxn<()>>,
        force_cache: bool,
        repo: &Repository,
        specific_changes: &[String],
        is_pull: bool,
    ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
        debug!("update_changelist_pushpull");
        if let RemoteRepo::LocalChannel(c) = self {
            return update_changelist_local_channel(
                c,
                txn,
                path,
                current_channel,
                repo,
                specific_changes,
            );
        }

        let id = if let Some(id) = self.get_id(txn).await? {
            debug!("id = {:?}", id);
            id
        } else {
            return self
                .update_changelist_pushpull_from_scratch(txn, path, current_channel)
                .await;
        };
        let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
        let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
        let ours_ge_dichotomy: Vec<(u64, CS)> = txn
            .iter_remote(&remote_ref.lock().remote, dichotomy_n)?
            .filter_map(|k| {
                debug!("filter_map {:?}", k);
                match k.unwrap() {
                    (k, libpijul::pristine::Pair { a: hash, .. }) => {
                        let (k, hash) = (u64::from(*k), Hash::from(*hash));
                        if k >= dichotomy_n {
                            Some((k, CS::Change(hash)))
                        } else {
                            None
                        }
                    }
                }
            })
            .collect();
        let (inodes, theirs_ge_dichotomy) =
            self.download_changelist_nocache(dichotomy_n, path).await?;
        debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
        let ours_ge_dichotomy_set = ours_ge_dichotomy
            .iter()
            .map(|(_, h)| h)
            .copied()
            .collect::<HashSet<CS>>();
        let mut theirs_ge_dichotomy_set = HashSet::new();
        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
            theirs_ge_dichotomy_set.insert(CS::Change(*h));
            if *is_tag {
                theirs_ge_dichotomy_set.insert(CS::State(*m));
            }
        }

        // remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }
        let remote_unrecs = remote_unrecs(
            txn,
            current_channel,
            &ours_ge_dichotomy,
            &theirs_ge_dichotomy_set,
        )?;
        let should_cache = force_cache || remote_unrecs.is_empty();
        debug!(
            "should_cache = {:?} {:?} {:?}",
            force_cache, remote_unrecs, should_cache
        );
        if should_cache {
            use libpijul::ChannelMutTxnT;
            for (k, t) in ours_ge_dichotomy.iter().copied() {
                match t {
                    CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
                    CS::Change(_) => {
                        txn.del_remote(&mut remote_ref, k)?;
                    }
                }
            }
            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
                debug!("theirs: {:?} {:?} {:?}", n, h, m);
                txn.put_remote(&mut remote_ref, n, (h, m))?;
                if is_tag {
                    txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
                }
            }
        }
        if !specific_changes.is_empty() {
            // Here, the user only wanted to push/pull specific changes
            let to_download = specific_changes
                .iter()
                .map(|h| {
                    if is_pull {
                        {
                            if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
                                return Ok(CS::State(t.0));
                            }
                        }
                        Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
                    } else {
                        if let Ok(t) = txn.state_from_prefix(&current_channel.read().states, h) {
                            Ok(CS::State(t.0))
                        } else {
                            Ok(CS::Change(txn.hash_from_prefix(h)?.0))
                        }
                    }
                })
                .collect::<Result<Vec<_>, anyhow::Error>>();
            Ok(RemoteDelta {
                inodes,
                remote_ref: Some(remote_ref),
                to_download: to_download?,
                ours_ge_dichotomy_set,
                theirs_ge_dichotomy,
                theirs_ge_dichotomy_set,
                remote_unrecs,
            })
        } else {
            let mut to_download: Vec<CS> = Vec::new();
            let mut to_download_ = HashSet::new();
            for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
                let (_, p) = x?;
                let h: Hash = p.a.into();
                if txn
                    .channel_has_state(txn.states(&current_channel.read()), &p.b)
                    .unwrap()
                    .is_some()
                {
                    break;
                }
                if txn.get_revchanges(&current_channel, &h).unwrap().is_none() {
                    let h = CS::Change(h);
                    if to_download_.insert(h.clone()) {
                        to_download.push(h);
                    }
                }
            }

            // The patches in theirs_ge_dichotomy are unknown to us,
            // download them.
            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
                debug!(
                    "update_changelist_pushpull line {}, {:?} {:?}",
                    line!(),
                    n,
                    h
                );
                // In all cases, add this new change/state/tag to `to_download`.
                let ch = CS::Change(*h);
                if txn.get_revchanges(&current_channel, h).unwrap().is_none() {
                    if to_download_.insert(ch.clone()) {
                        to_download.push(ch.clone());
                    }
                    if *is_tag {
                        to_download.push(CS::State(*m));
                    }
                } else if *is_tag {
                    let has_tag = if let Some(n) =
                        txn.channel_has_state(txn.states(&current_channel.read()), &m.into())?
                    {
                        txn.is_tagged(txn.tags(&current_channel.read()), n.into())?
                    } else {
                        false
                    };
                    if !has_tag {
                        to_download.push(CS::State(*m));
                    }
                }
                // Additionally, if there are no remote unrecords
                // (i.e. if `should_cache`), cache.
                if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
                    use libpijul::ChannelMutTxnT;
                    txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
                    if *is_tag {
                        let mut rem = remote_ref.lock();
                        txn.put_tags(&mut rem.tags, *n, m)?;
                    }
                }
            }
            Ok(RemoteDelta {
                inodes,
                remote_ref: Some(remote_ref),
                to_download,
                ours_ge_dichotomy_set,
                theirs_ge_dichotomy,
                theirs_ge_dichotomy_set,
                remote_unrecs,
            })
        }
    }

    /// Get the list of the remote's changes that come after `from: u64`.
    /// Instead of immediately updating the local cache of the remote, return
    /// the change info without changing the cache.
    pub async fn download_changelist_nocache(
        &mut self,
        from: u64,
        paths: &[String],
    ) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
        let mut v = Vec::new();
        let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
            debug!("no cache: {:?}", h);
            Ok(v.push((n, h, m, m2)))
        };
        let r = match *self {
            RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
            RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
            RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
            RemoteRepo::LocalChannel(_) => HashSet::new(),
            RemoteRepo::None => unreachable!(),
        };
        Ok((r, v))
    }

    /// Uses a binary search to find the integer identifier of the last point
    /// at which our locally cached version of the remote was the same as the 'actual'
    /// state of the remote.
    async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
        &mut self,
        txn: &T,
        remote: &libpijul::pristine::Remote<T>,
    ) -> Result<u64, anyhow::Error> {
        let mut a = 0;
        let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
            debug!("dichotomy_changelist: {:?} {:?}", u, v);
            (u, (&v.b).into())
        } else {
            debug!("the local copy of the remote has no changes");
            return Ok(0);
        };
        let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
            v.into()
        } else {
            Merkle::zero()
        };
        debug!("last_state: {:?} {:?}", state, last_statet);
        if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
            debug!("remote last_state: {:?} {:?}", s, st);
            if s == state && st == last_statet {
                // The local list is already up to date.
                return Ok(b + 1);
            }
        }
        // Else, find the last state we have in common with the
        // remote, it might be older than the last known state (if
        // changes were unrecorded on the remote).
        while a < b {
            let mid = (a + b) / 2;
            let (mid, state) = {
                let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
                (a, b.b)
            };
            let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
                // There's still a tag at position >= mid in the
                // sequence.
                b.b.into()
            } else {
                // No tag at or after mid, the last state, `statet`,
                // is the right answer in that case.
                last_statet
            };

            let remote_state = self.get_state(txn, Some(mid)).await?;
            debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
            if let Some((_, remote_state, remote_statet)) = remote_state {
                if remote_state == state && remote_statet == statet {
                    if a == mid {
                        return Ok(a + 1);
                    } else {
                        a = mid;
                        continue;
                    }
                }
            }
            if b == mid {
                break;
            } else {
                b = mid
            }
        }
        Ok(a)
    }

    async fn get_state<T: libpijul::TxnTExt>(
        &mut self,
        txn: &T,
        mid: Option<u64>,
    ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
        match *self {
            RemoteRepo::Local(ref mut l) => l.get_state(mid),
            RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
            RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
            RemoteRepo::LocalChannel(ref channel) => {
                if let Some(channel) = txn.load_channel(&channel)? {
                    local::get_state(txn, &channel, mid)
                } else {
                    Ok(None)
                }
            }
            RemoteRepo::None => unreachable!(),
        }
    }

    /// This method might return `Ok(None)` in some cases, for example
    /// if the remote wants to indicate not to store a cache. This is
    /// the case for Nest channels, for example.
    async fn get_id<T: libpijul::TxnTExt + 'static>(
        &mut self,
        txn: &T,
    ) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
        match *self {
            RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
            RemoteRepo::Ssh(ref mut s) => s.get_id().await,
            RemoteRepo::Http(ref h) => h.get_id().await,
            RemoteRepo::LocalChannel(ref channel) => {
                if let Some(channel) = txn.load_channel(&channel)? {
                    Ok(txn.id(&*channel.read()).cloned())
                } else {
                    Err(anyhow::anyhow!(
                        "Unable to retrieve RemoteId for LocalChannel remote"
                    ))
                }
            }
            RemoteRepo::None => unreachable!(),
        }
    }

    pub async fn archive<W: std::io::Write + Send + 'static>(
        &mut self,
        prefix: Option<String>,
        state: Option<(Merkle, &[Hash])>,
        umask: u16,
        w: W,
    ) -> Result<u64, anyhow::Error> {
        match *self {
            RemoteRepo::Local(ref mut l) => {
                debug!("archiving local repo");
                let changes = libpijul::changestore::filesystem::FileSystem::from_root(
                    &l.root,
                    pijul_repository::max_files()?,
                );
                let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
                let conflicts = if let Some((state, extra)) = state {
                    let txn = l.pristine.arc_txn_begin()?;
                    let channel = {
                        let txn = txn.read();
                        txn.load_channel(&l.channel)?.unwrap()
                    };
                    txn.archive_with_state(&changes, &channel, &state, extra, &mut tarball, 0)?
                } else {
                    let txn = l.pristine.arc_txn_begin()?;
                    let channel = {
                        let txn = txn.read();
                        txn.load_channel(&l.channel)?.unwrap()
                    };
                    txn.archive(&changes, &channel, &mut tarball)?
                };
                Ok(conflicts.len() as u64)
            }
            RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
            RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
            RemoteRepo::LocalChannel(_) => unreachable!(),
            RemoteRepo::None => unreachable!(),
        }
    }

    async fn download_changelist<T: MutTxnTExt>(
        &mut self,
        txn: &mut T,
        remote: &mut RemoteRef<T>,
        from: u64,
        paths: &[String],
    ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
        let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
            let (ref mut txn, ref mut remote) = *a;
            txn.put_remote(remote, n, (h, m))?;
            if is_tag {
                txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
            }
            Ok(())
        };
        match *self {
            RemoteRepo::Local(ref mut l) => {
                l.download_changelist(f, &mut (txn, remote), from, paths)
            }
            RemoteRepo::Ssh(ref mut s) => {
                s.download_changelist(f, &mut (txn, remote), from, paths)
                    .await
            }
            RemoteRepo::Http(ref h) => {
                h.download_changelist(f, &mut (txn, remote), from, paths)
                    .await
            }
            RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
            RemoteRepo::None => unreachable!(),
        }
    }

    pub async fn upload_changes<T: MutTxnTExt + 'static>(
        &mut self,
        txn: &mut T,
        changes_dir: &Path,
        to_channel: Option<&str>,
        changes: &[CS],
    ) -> Result<(), anyhow::Error> {
        let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;

        match self {
            RemoteRepo::Local(ref mut l) => {
                l.upload_changes(upload_bar, &changes_dir, to_channel, changes)?
            }
            RemoteRepo::Ssh(ref mut s) => {
                s.upload_changes(upload_bar, &changes_dir, to_channel, changes)
                    .await?
            }
            RemoteRepo::Http(ref h) => {
                h.upload_changes(upload_bar, &changes_dir, to_channel, changes)
                    .await?
            }
            RemoteRepo::LocalChannel(ref channel) => {
                let mut channel = txn.open_or_create_channel(channel)?;
                let store = libpijul::changestore::filesystem::FileSystem::from_changes(
                    changes_dir.to_path_buf(),
                    pijul_repository::max_files()?,
                );
                local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
            }
            RemoteRepo::None => unreachable!(),
        }
        Ok(())
    }

    /// Start (and possibly complete) the download of a change.
    async fn download_changes(
    async fn download_changes(
        &mut self,
        progress_bar: ProgressBar,
        mut hashes: mpsc::UnboundedReceiver<CS>,
        mut send: mpsc::Sender<(CS, bool)>,
        path: &Path,
        full: bool,
    ) -> Result<bool, anyhow::Error> {
        debug!("download_changes");
        match *self {
            RemoteRepo::Local(ref mut l) => {
                l.download_changes(progress_bar, &mut hashes, &mut send, path)
                    .await?
            }
            RemoteRepo::Ssh(ref mut s) => {
                s.download_changes(progress_bar, &mut hashes, &mut send, path, full)
                    .await?
            }
            RemoteRepo::Http(ref mut h) => {
                h.download_changes(progress_bar, &mut hashes, &mut send, path, full)
                    .await?
            }
            RemoteRepo::LocalChannel(_) => {
                while let Some(c) = hashes.recv().await {
                    send.send((c, true)).await?;
                }
            }
            RemoteRepo::None => unreachable!(),
        }
        Ok(true)
    }

    pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
        &mut self,
        repo: &mut Repository,
        remote: &RemoteRef<T>,
    ) -> Result<(), anyhow::Error> {
        debug!("Downloading identities");
        let mut id_path = repo.path.clone();
        id_path.push(DOT_DIR);
        id_path.push("identities");
        let rev = None;
        let r = match *self {
            RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
            RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
            RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
            RemoteRepo::LocalChannel(_) => 0,
            RemoteRepo::None => unreachable!(),
        };
        remote.set_id_revision(r);
        Ok(())
    }

    pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
        match *self {
            RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
            RemoteRepo::Http(ref mut h) => h.prove(key).await,
            RemoteRepo::None => unreachable!(),
            _ => Ok(()),
        }
    }

    pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
        &mut self,
        repo: &mut Repository,
        txn: &mut T,
        channel: &mut ChannelRef<T>,
        to_apply: &[CS],
        inodes: &HashSet<Position<Hash>>,
        do_apply: bool,
    ) -> Result<Vec<CS>, anyhow::Error> {
        let apply_len = to_apply.len() as u64;
        let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
        let apply_bar = if do_apply {
            Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
        } else {
            None
        };

        let changes_dir = repo.changes_dir.clone();
        
        self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
            
            
            for h in to_apply {
                
        let mut to_apply_inodes = HashSet::new();
        {
            let to_apply_inodes = &mut to_apply_inodes;
            self.download_changes_with(download_bar, &changes_dir, false, |cx| async move {
                let stream = download_changes_rec(&cx, repo, to_apply);
                pin_mut!(stream);
use tokio::sync::{mpsc, watch};
use tokio::task::LocalSet;
use crate::context::DownloadContext;
        })
        // let t = tokio::spawn(async move {
        //     let mut buf = PathBuf::new();
        //
        //     if waiting == 0 {
        //         return Ok(());
        //     }
        //     let mut ready = Vec::new();
        //     while let Some((hash, follow)) = recv_signal.recv().await {
        //         if let CS::Change(hash) = hash {
        //             waiting -= 1;
        //             if follow {
        //                 use libpijul::changestore::ChangeStore;
        //                 let mut needs_dep = false;
        //                 for dep in changes.get_dependencies(&hash)? {
        //                     let dep: libpijul::pristine::Hash = dep;
        //
        //                     let dep_path = fmt_filename(&mut buf, &changes_dir, &dep);
        //                     let has_dep = std::fs::metadata(&dep_path).is_ok();
        //
        //                     if !has_dep {
        //                         needs_dep = true;
        //                         if asked.insert(CS::Change(dep)) {
        //                             progress_bar.inc(1);
        //                             send_hash.send(CS::Change(dep))?;
        //                             waiting += 1
        //                         }
        //                     }
        //                 }
        //
        //                 if !needs_dep {
        //                     send_ready.send(CS::Change(hash)).await?;
        //                 } else {
        //                     ready.push(CS::Change(hash))
        //                 }
        //             } else {
        //                 send_ready.send(CS::Change(hash)).await?;
        //             }
        //         }
        //         if waiting == 0 {
        //             break;
        //         }
        //     }
        //     info!("waiting loop done");
        //     for r in ready {
        //         send_ready.send(r).await?;
        //     }
        //     std::mem::drop(recv_signal);
        //     Ok(())
        // });
        // Ok(t)
            
            
            state.futs.push(async move {
                wait_for(&state, items).await;
                Ok(())
            }.boxed_local());
            
            while let Some(v) = state.futs.next().await {
                v?;

                for mut r in v {
                    let _ = r.changed().await;
                for item in items {
                    v.push(go(state, item));
                if !follow {
                    return Ok(());
                }
                
                wait_for(state, state.repo.changes.get_dependencies(&hash)?.into_iter().map(CS::Change)).await;
                
                Ok(())
            }
            
            async fn wait_for<'a>(state: &'a State<'a>, items: impl IntoIterator<Item=CS>) {
                let mut v = Vec::new();
                let CS::Change(hash) = cs else {
                    return Ok(());
                };
            
            async fn go2<'a>(state: &'a State<'a>, cs: CS) -> anyhow::Result<()> {
                let follow = state.cx.download(cs).await?;
                    state.futs.push(async move {
                        let _tx = tx;
                        go2(state, cs).await?;
                        state.sender.send(Ok(cs)).await?;
                        Ok(())
                    }.boxed_local());

                    rx
                }).clone()
        items: impl IntoIterator<Item = CS> + 'a,
    ) -> impl Stream<Item=anyhow::Result<CS>> + 'a {
        try_stream(|sender| async move {
            struct State<'a> {
                cx: DownloadContext<'a>,
                repo: &'a Repository,
                sender: mpsc::Sender<anyhow::Result<CS>>,
                futs: FuturesUnordered<LocalBoxFuture<'a, anyhow::Result<()>>>,
                barriers: RefCell<HashMap<CS, watch::Receiver<()>>>,
            }
            
            let state = State {
                cx,
                repo,
                sender,
                futs: Default::default(),
                barriers: RefCell::new(Default::default()),
            };
            
            fn go<'a>(state: &'a State<'a>, cs: CS) -> watch::Receiver<()> {
                state.barriers.borrow_mut().entry(cs).or_insert_with(|| {
                    let (tx, mut rx) = watch::channel(());
                    rx.mark_unchanged();
        cx: DownloadContext<'a>,
        repo: &'a Repository,
    async fn download_changes_rec<'a>(
        // let mut waiting = 0;
        // let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
        //
        // let mut asked = HashSet::new();
        // for h in to_apply {
        //     debug!("to_apply {:?}", h);
        //
        //     asked.insert(*h);
        //     hash_send.send(*h)?;
        //     waiting += 1;
        // }
        //
        // let u = self
        //     .download_changes_rec(
        //         repo,
        //         hash_send,
        //         recv,
        //         send_ready,
        //         download_bar,
        //         waiting,
        //         asked,
        //     )
        //     .await?;
        //
        // let mut ws = libpijul::ApplyWorkspace::new();
        // let mut to_apply_inodes = HashSet::new();
        // while let Some(h) = recv_ready.recv().await {
        //     debug!("to_apply: {:?}", h);
        //     let touches_inodes = inodes.is_empty()
        //         || {
        //             debug!("inodes = {:?}", inodes);
        //             use libpijul::changestore::ChangeStore;
        //             if let CS::Change(ref h) = h {
        //                 let changes = repo.changes.get_changes(h)?;
        //                 changes.iter().any(|c| {
        //                     c.iter().any(|c| {
        //                         let inode = c.inode();
        //                         debug!("inode = {:?}", inode);
        //                         inodes.contains(&Position {
        //                             change: inode.change.unwrap_or(*h),
        //                             pos: inode.pos,
        //                         })
        //                     })
        //                 })
        //             } else {
        //                 false
        //             }
        //         }
        //         || { inodes.iter().any(|i| CS::Change(i.change) == h) };
        //
        //     if touches_inodes {
        //         to_apply_inodes.insert(h);
        //     } else {
        //         continue;
        //     }
        //
        //     if let Some(apply_bar) = apply_bar.clone() {
        //         info!("Applying {:?}", h);
        //         apply_bar.inc(1);
        //         debug!("apply");
        //         if let CS::Change(h) = h {
        //             let mut channel = channel.write();
        //             txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
        //         }
        //         debug!("applied");
        //     } else {
        //         debug!("not applying {:?}", h)
        //     }
        // }
        //
        // let mut result = Vec::with_capacity(to_apply_inodes.len());
        // for h in to_apply {
        //     if to_apply_inodes.contains(&h) {
        //         result.push(*h)
        //     }
        // }
        //
        // debug!("finished");
        // debug!("waiting for spawned process");
        // *self = t.await??;
        // u.await??;
        // Ok(result)
        todo!()
            
            Ok(())
        }).await?;

fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
{
    stream(|sender| {
        let fut = op(sender.clone());

        async move {
            if let Err(e) = fut.await {
                let _ = sender.send(Err(e));
            }
        }
    })
}

fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>
{
    struct Impl<F, T> {
        fut: Once<F>,
        rx: mpsc::Receiver<T>,
    }

    impl<F, T> Stream for Impl<F, T>
    where
        F: Future<Output=()>,
    {
        type Item = T;

        fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
            let (fut, mut rx) = unsafe {
                let this = self.get_unchecked_mut();
                (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
            };

            match (fut.poll_next(cx), rx.poll_recv(cx)) {
                (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
                (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
                _ => Poll::Pending,
            }
        }
    }

    let (tx, rx) = mpsc::channel(1);

    Impl {
        fut: stream::once(op(tx)),
        rx,
    }
}
                let mut ws = ApplyWorkspace::new();

                while let Some(h) = stream.try_next().await? {
                    debug!("to_apply: {:?}", h);

                    let mut touches_inodes = inodes.is_empty();

                    debug!("inodes = {:?}", inodes);

                    if !touches_inodes {
                        if let CS::Change(ref h) = h {
                            let changes = repo.changes.get_changes(h)?;

                            touches_inodes |= changes.iter().any(|c| {
                                c.iter().any(|c| {
                                    let inode = c.inode();
                                    debug!("inode = {:?}", inode);
                                    inodes.contains(&Position {
                                        change: inode.change.unwrap_or(*h),
                                        pos: inode.pos,
                                    })
                                })
                            })
                        }
                    }

                    if !touches_inodes {
                        touches_inodes |= inodes.iter().any(|i| CS::Change(i.change) == h);
                    }

                    if !touches_inodes {
                        continue;
                    }

                    to_apply_inodes.insert(h);

                    if let Some(apply_bar) = &apply_bar {
                        info!("Applying {:?}", h);
                        apply_bar.inc(1);
                        debug!("apply");

                        if let CS::Change(ref h) = h {
                            let mut channel = channel.write();
                            txn.apply_change_rec_ws(&repo.changes, &mut channel, h, &mut ws)?;
                        }

                        debug!("applied");
                    } else {
                        debug!("not applying {:?}", h)
                    }
                }

                Ok(())
            })
            .await?;
        }

        let mut result = Vec::with_capacity(to_apply_inodes.len());
        for h in to_apply {
            if to_apply_inodes.contains(&h) {
                result.push(*h)
            }
        }

        debug!("finished");
        Ok(result)
    }

    pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
        &mut self,
        repo: &Repository,
        txn: &mut T,
        channel: &ChannelRef<T>,
        tag: &[Hash],
    ) -> Result<(), anyhow::Error> {
        let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;

        let mut hashes = Vec::new();

        {
            let txn = &mut *txn;
            let hashes = &mut hashes;

            self.download_changes_with(download_bar, &repo.changes_dir, false, |cx| async move {
                let stream = download_changes_rec(&cx, repo, tag.iter().cloned().map(CS::Change));
                pin_mut!(stream);

                let mut ws = ApplyWorkspace::new();

                while let Some(cs) = stream.try_next().await? {
                    if let CS::Change(hash) = cs {
                        let mut channel = channel.write();
                        txn.apply_change_rec_ws(&repo.changes, &mut channel, &hash, &mut ws)?;
                    }

                    hashes.push(cs);
                }

                Ok(())
            })
            .await?;
        }

        self.complete_changes(repo, txn, channel, &hashes, false)
            .await?;
        Ok(())
    }

    pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
        &mut self,
        repo: &mut Repository,
        txn: &mut T,
        channel: &mut ChannelRef<T>,
        state: Merkle,
    ) -> Result<(), anyhow::Error> {
        let id = if let Some(id) = self.get_id(txn).await? {
            id
        } else {
            return Ok(());
        };
        self.update_changelist(txn, &[]).await?;
        let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
        let mut to_pull = Vec::new();
        let mut found = false;
        for x in txn.iter_remote(&remote.lock().remote, 0)? {
            let (n, p) = x?;
            debug!("{:?} {:?}", n, p);
            to_pull.push(CS::Change(p.a.into()));
            if p.b == state {
                found = true;
                break;
            }
        }
        if !found {
            bail!("State not found: {:?}", state)
        }
        self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
            .await?;
        self.update_identities(repo, &remote).await?;

        self.complete_changes(repo, txn, channel, &to_pull, false)
            .await?;
        Ok(())
    }

    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
        &mut self,
        repo: &pijul_repository::Repository,
        txn: &T,
        local_channel: &ChannelRef<T>,
        changes: &[CS],
        full: bool,
    ) -> Result<(), anyhow::Error> {
        debug!("complete changes {:?}", changes);

        let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
        let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;

        self.download_changes_with(download_bar, &repo.changes_dir, true, |cx| async move {
            let mut waiting = Vec::new();

            for c in changes {
                let CS::Change(c) = c else { continue };

                let sc = c.into();

                if repo
                    .changes
                    .has_contents(*c, txn.get_internal(&sc)?.cloned())
                {
                    debug!("has contents {:?}", c);
                    continue;
                }

                if full {
                    waiting.push(cx.download(CS::Change(*c))?);
                    continue;
                }

                let Some(&change) = txn.get_internal(&sc)? else {
                    debug!("could not find internal for {:?}", sc);
                    continue;
                };

                // Check if at least one non-empty vertex from c is still alive.
                let v = libpijul::pristine::Vertex {
                    change,
                    start: ChangePosition(0u64.into()),
                    end: ChangePosition(0u64.into()),
                };

                let channel = local_channel.read();
                let graph = txn.graph(&channel);

                for x in txn.iter_graph(graph, Some(&v))? {
                    let (v, e) = x?;
                    if v.change > change {
                        break;
                    } else if e.flag().is_alive_parent() {
                        waiting.push(cx.download(CS::Change(*c))?);
                        break;
                    }
                }
            }

            for w in waiting {
                w.await?;
            }

            Ok(())
        })
        .await?;

        Ok(())
    }

    pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
        &mut self,
        repo: &mut Repository,
        txn: &mut T,
        local_channel: &mut ChannelRef<T>,
        path: &[String],
    ) -> Result<(), anyhow::Error> {
        let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
            x
        } else {
            bail!("Channel not found")
        };
        let mut pullable = Vec::new();
        {
            let rem = remote_changes.lock();
            for x in txn.iter_remote(&rem.remote, 0)? {
                let (_, p) = x?;
                pullable.push(CS::Change(p.a.into()))
            }
        }
        self.pull(repo, txn, local_channel, &pullable, &inodes, true)
            .await?;
        self.update_identities(repo, &remote_changes).await?;

        self.complete_changes(repo, txn, local_channel, &pullable, false)
            .await?;
        Ok(())
    }
}

lazy_static! {
    static ref CHANGELIST_LINE: Regex = Regex::new(
        r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
    )
    .unwrap();
    static ref PATHS_LINE: Regex =
        Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
}

enum ListLine {
    Change {
        n: u64,
        h: Hash,
        m: Merkle,
        tag: bool,
    },
    Position(Position<Hash>),
    Error(String),
}

fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
    debug!("data = {:?}", data);
    if let Some(caps) = CHANGELIST_LINE.captures(data) {
        if let (Some(h), Some(m)) = (
            Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
            Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
        ) {
            return Ok(ListLine::Change {
                n: caps.name("num").unwrap().as_str().parse().unwrap(),
                h,
                m,
                tag: caps.name("tag").is_some(),
            });
        }
    }
    if data.starts_with("error:") {
        return Ok(ListLine::Error(data.split_at(6).1.to_string()));
    }
    if let Some(caps) = PATHS_LINE.captures(data) {
        return Ok(ListLine::Position(Position {
            change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
            pos: ChangePosition(
                caps.name("num")
                    .unwrap()
                    .as_str()
                    .parse::<u64>()
                    .unwrap()
                    .into(),
            ),
        }));
    }
    debug!("offending line: {:?}", data);
    bail!("Protocol error")
}

/// Compare the remote set (theirs_ge_dichotomy) with our current
/// version of that (ours_ge_dichotomy) and return the changes in our
/// current version that are not in the remote anymore.
fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
    txn: &T,
    current_channel: &ChannelRef<T>,
    ours_ge_dichotomy: &[(u64, CS)],
    theirs_ge_dichotomy_set: &HashSet<CS>,
) -> Result<Vec<(u64, CS)>, anyhow::Error> {
    let mut remote_unrecs = Vec::new();
    for (n, hash) in ours_ge_dichotomy {
        debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
        if theirs_ge_dichotomy_set.contains(hash) {
            // If this change is still present in the remote, skip
            debug!("still present");
            continue;
        } else {
            let has_it = match hash {
                CS::Change(hash) => txn.get_revchanges(&current_channel, &hash)?.is_some(),
                CS::State(state) => {
                    let ch = current_channel.read();
                    if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
                        txn.is_tagged(txn.tags(&*ch), n.into())?
                    } else {
                        false
                    }
                }
            };
            if has_it {
                remote_unrecs.push((*n, *hash))
            } else {
                // If this unrecord wasn't in our current channel, skip
                continue;
            }
        }
    }
    Ok(remote_unrecs)
}

fn try_stream<C, F, T, E>(op: C) -> impl Stream<Item = Result<T, E>>
where
    C: FnOnce(mpsc::Sender<Result<T, E>>) -> F,
    F: Future<Output = Result<(), E>>,
{
    stream(|sender| {
        let fut = op(sender.clone());

        async move {
            if let Err(e) = fut.await {
                let _ = sender.send(Err(e));
            }
        }
    })
}

fn stream<C, F, T>(op: C) -> impl Stream<Item = T>
where
    C: FnOnce(mpsc::Sender<T>) -> F,
    F: Future<Output = ()>,
{
    struct Impl<F, T> {
        fut: Once<F>,
        rx: mpsc::Receiver<T>,
    }

    impl<F, T> Stream for Impl<F, T>
    where
        F: Future<Output = ()>,
    {
        type Item = T;

        fn poll_next(
            self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
        ) -> Poll<Option<Self::Item>> {
            let (fut, mut rx) = unsafe {
                let this = self.get_unchecked_mut();
                (Pin::new_unchecked(&mut this.fut), Pin::new(&mut this.rx))
            };

            match (fut.poll_next(cx), rx.poll_recv(cx)) {
                (_, Poll::Ready(Some(v))) => Poll::Ready(Some(v)),
                (Poll::Ready(_), Poll::Ready(None)) => Poll::Ready(None),
                _ => Poll::Pending,
            }
        }
    }

    let (tx, rx) = mpsc::channel(1);

    Impl {
        fut: stream::once(op(tx)),
        rx,
    }
}

fn download_changes_rec<'a, I>(
    cx: &'a DownloadContext,
    repo: &'a Repository,
    items: I,
) -> impl Stream<Item = anyhow::Result<CS>> + 'a
where
    I: IntoIterator + 'a,
    I::Item: Borrow<CS>,
{
    try_stream(move |sender| async move {
        let mut tasks = FuturesUnordered::new();

        // there is probably a way to model this using futures, but I
        // couldn't find a nice way to do it
        // so here, have three funny collections

        let mut pending_deps = HashMap::<Hash, HashSet<Hash>>::new();
        let mut rev_deps = HashMap::<Hash, Vec<Hash>>::new();
        let mut fetched = HashSet::new();

        let make_download_job = |cs| async move {
            match cx.download(cs) {
                Ok(v) => (cs, v.await),
                Err(e) => (cs, Err(e)),
            }
        };

        for item in items {
            let item = *item.borrow();
            tasks.push(make_download_job(item));

            if let CS::Change(hash) = item {
                rev_deps.insert(hash, Vec::new());
            }
        }

        while let Some((cs, res)) = tasks.next().await {
            debug!("{:?} finished downloading (result: {:?})", cs, &res);

            let follow = res?;

            let CS::Change(hash) = cs else {
                debug!("it is not a change, we're done");
                sender.send(Ok(cs)).await?;
                continue;
            };

            fetched.insert(hash);

            // first, populate our dependencies

            if follow {
                info!("{:?}", hash);
                let pending_deps = match pending_deps.entry(hash) {
                    Entry::Occupied(_) => unreachable!(),
                    Entry::Vacant(v) => v.insert(Default::default()),
                };

                for dep in repo.changes.get_dependencies(&hash)? {
                    if !fetched.contains(&dep) {
                        pending_deps.insert(dep);
                    }

                    rev_deps.entry(dep).or_insert_with(|| {
                        tasks.push(make_download_job(CS::Change(dep)));
                        Default::default()
                    });
                }
            }

            // then, send completed changes (including parents of this) to
            // our caller

            let mut to_check = vec![hash];

            while let Some(hash) = to_check.pop() {
                if let Some(pending_deps) = pending_deps.get(&hash) {
                    if !pending_deps.is_empty() {
                        continue;
                    }
                }

                // this change has no pending dependencies => it is complete
                sender.send(Ok(CS::Change(hash))).await?;

                // other changes may be complete if this was the last
                // missing dependency, propagate
                for dep in rev_deps.get(&hash).into_iter().flatten() {
                    if let Some(p) = pending_deps.get_mut(dep) {
                        assert!(p.remove(dep));
                        to_check.push(*dep);
                    }
                }
            }
        }

        Ok(())
    })
}