Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

main.rs
use axum::{
    Json, Router,
    extract::{Path, State},
    http::StatusCode,
    response::{IntoResponse, Response},
    routing::{get, post},
};
// use ci::Message;
use clap::Parser;
use serde_derive::*;
// use std::io::Read;
use axum_extra::TypedHeader;
use axum_extra::headers::Range;
use ci::*;
use pijul_core::MutTxnT;
use std::collections::HashMap;
use std::net::{Ipv6Addr, ToSocketAddrs};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use tempfile;
use thiserror::*;
use tokio::fs::OpenOptions;
use tracing::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[derive(Serialize, Deserialize)]
struct ConfigFile {
    port: u16,
    nest_url: String,
    repo_path: PathBuf,
    ci_path: PathBuf,
}

#[derive(Debug, Parser)]
pub struct App {
    #[arg(short, long)]
    config: PathBuf,
}

#[derive(Clone)]
pub struct Config {
    config: Arc<ConfigFile>,
    jobs: Arc<
        std::sync::Mutex<
            std::collections::HashMap<
                uuid::Uuid,
                (
                    tokio::sync::oneshot::Sender<()>,
                    tokio::sync::watch::Receiver<std::option::Option<i32>>,
                ),
            >,
        >,
    >,
}

#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "ci=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();
    pijul_interaction::set_context(pijul_interaction::InteractiveContext::NotInteractive);
    let matches = App::parse();
    let conf: ConfigFile =
        toml::from_str(&std::fs::read_to_string(&matches.config).unwrap()).unwrap();
    let addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
    let addr = (addr, conf.port).to_socket_addrs().unwrap().next().unwrap();
    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
    let config = Config {
        config: Arc::new(conf),
        jobs: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
    };
    let app = Router::new()
        .route("/trigger", post(trigger))
        .route("/stdout/{job}", get(stdout))
        .route("/stderr/{job}", get(stderr))
        .route("/status/{job}", get(status))
        .with_state(config.clone());
    axum::serve(listener, app).await.unwrap();
}

async fn status(
    State(config): State<Config>,
    Path(job): Path<uuid::Uuid>,
) -> Result<Json<Status>, Error> {
    let s =
        tokio::fs::read_to_string(&config.config.ci_path.join(&format!("{}.status", job))).await?;
    Ok(Json(serde_json::from_str(&s)?))
}

use axum_range::KnownSize;
use axum_range::Ranged;

async fn stdout(
    State(config): State<Config>,
    Path(job): Path<uuid::Uuid>,
    range: Option<TypedHeader<Range>>,
) -> Response {
    debug!("stdout {:?} {:?}", job, range);
    let Ok(mut file) =
        tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stdout", job))).await
    else {
        return StatusCode::NOT_FOUND.into_response();
    };
    if let Some(TypedHeader(range)) = range {
        let body = KnownSize::file(file).await.unwrap();
        Ranged::new(Some(range), body).into_response()
    } else {
        use tokio::io::AsyncReadExt;
        let mut v = Vec::new();
        file.read_to_end(&mut v).await.unwrap();
        v.into_response()
    }
}

async fn stderr(
    State(config): State<Config>,
    Path(job): Path<uuid::Uuid>,
    range: Option<TypedHeader<Range>>,
) -> Response {
    debug!("stderr {:?} {:?}", job, range);
    let Ok(mut file) =
        tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stderr", job))).await
    else {
        return StatusCode::NOT_FOUND.into_response();
    };
    if let Some(TypedHeader(range)) = range {
        let body = KnownSize::file(file).await.unwrap();
        Ranged::new(Some(range), body).into_response()
    } else {
        use tokio::io::AsyncReadExt;
        let mut v = Vec::new();
        file.read_to_end(&mut v).await.unwrap();
        v.into_response()
    }
}

#[derive(Error, Debug)]
pub enum Error {
    #[error("Lock")]
    Lock,
    #[error("Forbidden")]
    Forbidden,
    #[error("Not found")]
    NotFound,
    #[error(transparent)]
    Output(
        #[from]
        pijul_core::output::OutputError<
            <C as pijul_core::changestore::ChangeStore>::Error,
            pijul_core::pristine::sanakirja::MutTxn0,
            std::io::Error,
        >,
    ),
    #[error(transparent)]
    Reqwest(#[from] reqwest::Error),
    #[error(transparent)]
    Sanakirja(#[from] pijul_core::pristine::sanakirja::SanakirjaError),
    #[error(transparent)]
    IO(#[from] std::io::Error),
    #[error(transparent)]
    Json(#[from] serde_json::Error),
}

type C = pijul_core::changestore::filesystem::FileSystem;

impl IntoResponse for Error {
    fn into_response(self) -> Response {
        debug!("response {:?}", self);
        match self {
            Error::Forbidden => (StatusCode::FORBIDDEN, "{}").into_response(),
            Error::NotFound => (StatusCode::NOT_FOUND, "{}").into_response(),
            _ => (StatusCode::INTERNAL_SERVER_ERROR, "{}").into_response(),
        }
    }
}

// #[derive(Debug, Clone)]
// struct RepoPath {
//     path: PathBuf,
//     remove_dir: bool,
//     remove_dot: bool,
// }

#[axum::debug_handler]
pub async fn trigger(
    State(config): State<Config>,
    Json(t): Json<Trigger>,
) -> Result<StatusCode, Error> {
    tokio::spawn(async move {
        info!("trigger: {:?}", t);
        let t0 = std::time::Instant::now();
        let mut remote = pijul_remote::RemoteRepo::Http(pijul_remote::http::Http {
            url: format!("{}/{}/{}", config.config.nest_url, t.owner, t.repo)
                .parse()
                .unwrap(),
            channel: "main".to_string(),
            client: reqwest::ClientBuilder::new().build()?,
            headers: vec![],
            name: t.state.clone(),
        });

        let path = tempfile::tempdir().unwrap();
        let pijul_config = pijul_config::Config::load(None, Vec::new()).unwrap();
        let repo = tokio::sync::Mutex::new(
            pijul_repository::Repository::init(&pijul_config, Some(&path.path()), None, None)
                .unwrap(),
        );
        {
            let mut repo = repo.lock().await;
            let txn = repo.pristine.arc_txn_begin()?;
            let mut channel = txn.write().open_or_create_channel("main")?;

            let h = t.state.parse().unwrap();
            remote
                .clone_state(&mut repo, &txn, &mut channel, h)
                .await
                .unwrap();

            pijul_core::output::output_repository_no_pending(
                &repo.working_copy,
                &repo.changes,
                &txn,
                &channel,
                "",
                true,
                None,
                1, // std::thread::available_parallelism()?.get(),
                0,
            )?;

            remote.finish().await.unwrap();
        }
        info!("cloned in {:?}", t0.elapsed());

        std::fs::remove_dir_all(&path.path().join(".pijul"))?;

        let (status_tx, status_rx) = tokio::sync::watch::channel(None);
        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
        if let Some((k, _)) = config
            .jobs
            .lock()
            .unwrap()
            .insert(t.id, (kill_tx, status_rx))
        {
            k.send(()).unwrap_or(());
        }
        let result = build(
            Some(&config.config.ci_path),
            path,
            t.id,
            &t.targets,
            kill_rx,
            status_tx,
        )
        .await;
        config.jobs.lock().unwrap().remove(&t.id);

        let (code, results) = result?;
        debug!("code = {:?} {:?}", code, results);
        tokio::fs::write(
            config.config.ci_path.join(&format!("{}.status", t.id)),
            serde_json::to_string(&Status {
                code,
                results,
                finished: chrono::Utc::now(),
            })
            .unwrap(),
        )
        .await?;
        Ok::<(), Error>(())
    });
    Ok(StatusCode::OK)
}

async fn build(
    ci: Option<&std::path::Path>,
    path: tempfile::TempDir,
    id: uuid::Uuid,
    targets: &HashMap<String, String>,
    mut kill_rx: tokio::sync::oneshot::Receiver<()>,
    mut status_tx: tokio::sync::watch::Sender<Option<i32>>,
) -> Result<(Option<i32>, HashMap<String, PathBuf>), Error> {
    let mut files = if let Some(ref ci_path) = ci {
        tokio::fs::create_dir_all(ci_path).await?;
        Some((
            OpenOptions::new()
                .append(true)
                .create(true)
                .open(&ci_path.join(&format!("{}.stdout", id)))
                .await
                .unwrap(),
            OpenOptions::new()
                .append(true)
                .create(true)
                .open(&ci_path.join(&format!("{}.stderr", id)))
                .await
                .unwrap(),
        ))
    } else {
        None
    };

    let mut cmd = tokio::process::Command::new("nix");

    cmd.arg("build").arg("-L").arg("--log-format").arg("raw");
    for (_, target) in targets {
        cmd.arg(target);
    }
    let cmd = cmd
        .current_dir(&path)
        .stderr(Stdio::piped())
        .stdout(Stdio::piped())
        .stdin(Stdio::null())
        .spawn()
        .unwrap();
    let code = dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await?;

    if code == Some(0) {
        let mut cmd = tokio::process::Command::new("attic");
        cmd.arg("push").arg("coturnix");
        let mut result = HashMap::new();
        for (n, (t, _)) in targets.iter().enumerate() {
            if n == 0 {
                result.insert(
                    t.clone(),
                    tokio::fs::read_link(path.path().join("result"))
                        .await
                        .unwrap(),
                );
                cmd.arg("result");
            } else {
                result.insert(
                    t.clone(),
                    tokio::fs::read_link(path.path().join(format!("result-{n}")))
                        .await
                        .unwrap(),
                );
                cmd.arg(format!("result-{n}"));
            }
        }
        let cmd = cmd
            .current_dir(&path)
            .stderr(Stdio::piped())
            .stdout(Stdio::piped())
            .stdin(Stdio::null())
            .spawn()
            .unwrap();
        Ok((
            dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await?,
            result,
        ))
    } else {
        Ok((code, HashMap::new()))
    }
}