Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

replication.rs
use crate::repository::*;
use ::replication::Update;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use pijul_core::changestore::ChangeStore;
use pijul_core::fs::FsErrorC;
use pijul_core::output::{FileError, OutputError};
use pijul_core::pristine::sanakirja::MutTxn0;
use pijul_core::pristine::sanakirja::SanakirjaError;
use pijul_core::pristine::{ForkError, TreeErr, TxnErr};
use pijul_core::{
    ApplyError, ArcTxn, Base32, ChannelMutTxnT, MutTxnT, MutTxnTExt, TxnT, TxnTExt, UnrecordError,
};
use serde_derive::*;
use std::pin::Pin;
use std::process::Stdio;
use thiserror::*;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tracing::*;

#[derive(Clone)]
pub struct H {
    ci: crate::config_file::CiConfig,
    jobs: crate::config::Jobs,
    locks: RepositoryLocks,
    db: crate::config::Db,
    builders: std::sync::Arc<tokio::sync::Semaphore>,
    client: reqwest::Client,
}

impl H {
    pub fn new(
        ci: crate::config_file::CiConfig,
        jobs: crate::config::Jobs,
        locks: RepositoryLocks,
        db: crate::config::Db,
        builders: std::sync::Arc<tokio::sync::Semaphore>,
    ) -> Self {
        H {
            ci,
            jobs,
            locks,
            db,
            builders,
            client: reqwest::Client::new(),
        }
    }
}

#[derive(Debug, Error)]
pub enum Error {
    #[error(transparent)]
    Sk(#[from] SanakirjaError),
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Txn(#[from] TxnErr<SanakirjaError>),
    #[error(transparent)]
    Tree(#[from] TreeErr<SanakirjaError>),
    #[error(transparent)]
    File(#[from] FileError<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Fs(#[from] FsErrorC<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Fork(#[from] ForkError<SanakirjaError>),
    #[error(transparent)]
    Unrec(#[from] UnrecordError<crate::repository::changestore::Error, std::io::Error, MutTxn0>),
    #[error(transparent)]
    Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Output(#[from] OutputError<crate::repository::changestore::Error, MutTxn0, std::io::Error>),
    #[error("Join error")]
    Join,
    #[error(transparent)]
    Diesel(#[from] diesel::result::Error),
    #[error(transparent)]
    Utf8(#[from] std::string::FromUtf8Error),
    #[error(transparent)]
    Reqwest(#[from] reqwest::Error),
}

impl ::replication::Handler for H {
    type Error = Error;
    type F =
        Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;

    fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
        let s = self.clone();
        Box::pin(async move {
            match update {
                Update::Change { .. } => {}
                Update::Apply {
                    repo,
                    channel,
                    hash,
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let txn = pri.arc_txn_begin()?;
                        let channel_ = format!("{}_{}", repo, channel);
                        let mut txn_ = txn.write();
                        let channel_ = txn_.open_or_create_channel(&channel_)?;
                        let mut channel__ = channel_.write();
                        let result = txn_.apply_change(&repo_.changes, &mut *channel__, &hash);
                        match result {
                            Err(pijul_core::ApplyError::LocalChange(
                                pijul_core::LocalApplyError::ChangeAlreadyOnChannel { .. },
                            )) => {
                                error!(
                                    "ignored error, apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(None);
                            }
                            Err(pijul_core::ApplyError::LocalChange(
                                pijul_core::LocalApplyError::DependencyMissing { hash },
                            )) => {
                                error!(
                                    "apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(Some(
                                    pijul_core::ApplyError::LocalChange(
                                        pijul_core::LocalApplyError::DependencyMissing { hash },
                                    )
                                    .into(),
                                ));
                            }
                            Ok(_) => {
                                debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }
                        txn_.touch_channel(&mut *channel__, None);
                        std::mem::drop(channel__);
                        std::mem::drop(txn_);
                        txn.commit()?;
                        Ok::<_, Error>(None)
                    })
                    .await
                    {
                        Ok(Ok(Some(x))) => return Ok(Some(x)),
                        Ok(Ok(None)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::Eof { repo, channel } => {
                    if cfg!(feature = "jobs") {
                        if let Some((state, deployment)) =
                            get_config_state(&s.locks, repo, channel).await?
                        {
                            debug!("{:?} {:?}", state, deployment);
                            let mut db = s.db.get().await.unwrap();
                            use crate::db::jobs::dsl as jobs;
                            let id = diesel::insert_into(jobs::jobs)
                                .values((
                                    jobs::repo.eq(repo),
                                    jobs::repo_state.eq(state.to_base32()),
                                ))
                                .returning(jobs::id)
                                .get_result::<uuid::Uuid>(&mut db)
                                .await
                                .unwrap();

                            use crate::db::repositories::dsl as repositories;
                            use crate::db::users::dsl as users;

                            let (owner, repo) = repositories::repositories
                                .find(repo)
                                .inner_join(users::users)
                                .select((users::login, repositories::name))
                                .get_result::<(String, String)>(&mut db)
                                .await
                                .unwrap();
                            let mut targets = std::collections::HashMap::new();
                            targets.insert(deployment.clone(), format!(".#{}", deployment));
                            let body = serde_json::to_string(&ci::Trigger {
                                id,
                                owner,
                                repo,
                                state: state.to_base32(),
                                targets,
                            })
                            .unwrap();
                            debug!("{:?} {}", s.ci.url, body);
                            for (n, ci) in s.ci.url.iter().enumerate() {
                                let res = s
                                    .client
                                    .post(ci.clone() + "/trigger")
                                    .header("Content-Type", "application/json")
                                    .body(body.clone())
                                    .send()
                                    .await
                                    .unwrap();
                                debug!("{:?}", res);

                                if let reqwest::StatusCode::OK = res.status() {
                                    tokio::spawn(async move {
                                        sync_job(&mut db, s.client, &s.ci, n, id).await;
                                    });
                                    break;
                                }
                            }
                        }
                    }
                }
                Update::Unrecord {
                    repo,
                    channel,
                    hash,
                    ..
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let mut txn = pri.mut_txn_begin()?;
                        let channel = format!("{}_{}", repo, channel);
                        let mut channel_ = txn.open_or_create_channel(&channel)?;
                        let result = txn.unrecord(
                            &repo_.changes,
                            &mut channel_,
                            &hash,
                            0,
                            &pijul_core::working_copy::sink(),
                        );
                        match result {
                            Err(pijul_core::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
                                debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }

                        let mut in_other_channels = false;
                        for ch in txn.channels("")? {
                            if txn.get_revchanges(&ch, &hash.into())?.is_some() {
                                in_other_channels = true;
                                break;
                            }
                        }
                        txn.touch_channel(&mut *channel_.write(), None);
                        txn.commit()?;
                        Ok::<_, Error>(in_other_channels)
                    })
                    .await
                    {
                        Ok(Ok(false)) => {
                            let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
                            pijul_core::changestore::filesystem::push_filename(&mut p, &hash);
                            if let Ok(meta) = tokio::fs::metadata(&p).await {
                                std::fs::remove_file(&p).unwrap_or(());
                                std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
                                if is_source {
                                    crate::repository::free_used_storage(
                                        &mut *s.db.get().await.unwrap(),
                                        repo,
                                        meta.len(),
                                    )
                                    .await
                                    .unwrap();
                                }
                            }
                        }
                        Ok(Ok(_)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::NewChannel { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.open_or_create_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Fork { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    let chan = txn.open_or_create_channel(&channel)?;
                    let new = format!("{}_{}", repo, new);
                    match txn.fork(&chan, &new) {
                        Ok(_) => txn.commit()?,
                        Err(ForkError::ChannelNameExists(_)) => {}
                        Err(e) => return Err(e.into()),
                    }
                }
                Update::Prune { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.drop_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Rename { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    if let Some(mut c) = txn.load_channel(&channel)? {
                        let new = format!("{}_{}", repo, new);
                        debug!("rename {:?} to {:?}", channel, new);
                        txn.rename_channel(&mut c, &new)?;
                        txn.commit()?;
                    } else {
                        debug!("not rename");
                    }
                }
            }
            Ok(None)
        })
    }
}

#[derive(Debug, Deserialize)]
struct Config {
    deployment: Option<String>,
}

fn get_file<C: ChangeStore<Error = crate::repository::changestore::Error>>(
    txn: &ArcTxn<pijul_core::pristine::sanakirja::MutTxn0>,
    channel: &pijul_core::ChannelRef<pijul_core::pristine::sanakirja::MutTxn0>,
    changes: &C,
    path: &str,
) -> Result<Option<String>, Error> {
    let (pos, is_dir) = txn.read().follow_oldest_path(changes, channel, path)?;
    if is_dir {
        return Ok(None);
    }
    let mut out = crate::repository::RawVertexBuf { out: Vec::new() };

    use pijul_core::ChannelTxnT;
    let mut graph = {
        let txn = txn.read();
        pijul_core::alive::retrieve(&*txn, txn.graph(&*channel.read()), pos, false)?
    };

    let mut forward = Vec::new();
    pijul_core::alive::output_graph(changes, &txn, &channel, &mut out, &mut graph, &mut forward)
        .map_err(|x| Error::File(x))?;
    debug!("{:?}", out);
    Ok(Some(String::from_utf8(out.out)?))
}

pub async fn get_config_state(
    locks: &RepositoryLocks,
    repo: uuid::Uuid,
    channel: String,
) -> Result<Option<(pijul_core::Merkle, String)>, Error> {
    debug!("get_config_state");
    let repo_ = locks.get(&repo).await.unwrap();
    tokio::task::spawn_blocking(move || {
        let pri = repo_.pristine.blocking_write();
        let txn_ = pri.arc_txn_begin()?;
        let channel = format!("{}_{}", repo, channel);
        if let Some(channel) = txn_.read().load_channel(&channel)? {
            if let Some(config) = get_file(&txn_, &channel, &repo_.changes, "pijul.toml")? {
                debug!("config = {:?}", config);

                if let Ok(parsed) = toml::from_str::<Config>(&config) {
                    if let Some(dep) = parsed.deployment {
                        if let Some(Ok((_, (_, state)))) =
                            txn_.read().reverse_log(&channel.read(), None)?.next()
                        {
                            return Ok(Some((state.into(), dep)));
                        }
                    }
                }
            };
        }
        Ok(None)
    })
    .await
    .unwrap()
}

pub async fn sync_job(
    db: &mut diesel_async::AsyncPgConnection,
    client: reqwest::Client,
    ci: &crate::config_file::CiConfig,
    ci_n: usize,
    job: uuid::Uuid,
) {
    let mut files = if let Some(ref f) = ci.filesystem {
        let stdout = OpenOptions::new()
            .append(true)
            .create(true)
            .open(&f.join(&format!("{}.stdout", job)))
            .await
            .unwrap();
        let stderr = OpenOptions::new()
            .append(true)
            .create(true)
            .open(&f.join(&format!("{}.stderr", job)))
            .await
            .unwrap();
        Some((stdout, stderr))
    } else {
        None
    };

    loop {
        debug!("Querying status");
        let status = client
            .get(format!("{}/status/{}", ci.url[ci_n], job))
            .send()
            .await
            .unwrap();
        let status = if let reqwest::StatusCode::OK = status.status() {
            let status: ci::Status = status.json().await.unwrap();
            debug!("STATUS {:?}", status);
            use crate::db::jobs::dsl as jobs;
            diesel::update(jobs::jobs.find(job))
                .set((
                    jobs::status.eq(status.code),
                    jobs::ended.eq(status.finished),
                ))
                .execute(db)
                .await
                .unwrap();
            Some(status)
        } else {
            None
        };

        if let Some((ref mut stdout, ref mut stderr)) = files {
            let stdout_len = stdout.metadata().await.unwrap().len();
            let stderr_len = stderr.metadata().await.unwrap().len();
            let (stdout_, stderr_) = tokio::join!(
                client
                    .get(format!("{}/stdout/{}", ci.url[ci_n], job))
                    .header("Range", format!("bytes={stdout_len}-*"))
                    .send(),
                client
                    .get(format!("{}/stderr/{}", ci.url[ci_n], job))
                    .header("Range", format!("bytes={stderr_len}-*"))
                    .send()
            );
            let stdout_ = stdout_.unwrap();
            let stderr_ = stderr_.unwrap();
            debug!("{:?} {:?}", stdout_, stderr_);
            let bytes = stdout_.bytes().await.unwrap();
            stdout.write(&bytes).await.unwrap();
            let bytes = stderr_.bytes().await.unwrap();
            stderr.write(&bytes).await.unwrap();
        }

        if let Some(status) = status {
            let (mut status_tx, _status_rx) = tokio::sync::watch::channel(None);
            let (_kill_tx, mut kill_rx) = tokio::sync::oneshot::channel();
            for (script, s) in status.results.iter() {
                let mut cmd = tokio::process::Command::new("nix-store");
                cmd.arg("-r").arg(&s);
                let cmd = cmd
                    .stderr(Stdio::piped())
                    .stdout(Stdio::piped())
                    .stdin(Stdio::null())
                    .spawn()
                    .unwrap();
                ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx)
                    .await
                    .unwrap();
            }
            for (script, s) in status.results.iter() {
                debug!("launching {:?} {:?}", s, script);
                let cmd = tokio::process::Command::new(format!("{}/bin/{script}", s.display()))
                    .stderr(Stdio::piped())
                    .stdout(Stdio::piped())
                    .stdin(Stdio::null())
                    .spawn()
                    .unwrap();
                ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx)
                    .await
                    .unwrap();
            }
            break;
        }
        tokio::time::sleep(std::time::Duration::from_secs(1)).await
    }
}