main.rs
use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, post},
};
// use ci::Message;
use clap::Parser;
use serde_derive::*;
// use std::io::Read;
use axum_extra::TypedHeader;
use axum_extra::headers::Range;
use ci::*;
use pijul_core::MutTxnT;
use std::collections::HashMap;
use std::net::{Ipv6Addr, ToSocketAddrs};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use tempfile;
use thiserror::*;
use tokio::fs::OpenOptions;
use tracing::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Serialize, Deserialize)]
struct ConfigFile {
port: u16,
nest_url: String,
repo_path: PathBuf,
ci_path: PathBuf,
}
#[derive(Debug, Parser)]
pub struct App {
#[arg(short, long)]
config: PathBuf,
}
#[derive(Clone)]
pub struct Config {
config: Arc<ConfigFile>,
jobs: Arc<
std::sync::Mutex<
std::collections::HashMap<
uuid::Uuid,
(
tokio::sync::oneshot::Sender<()>,
tokio::sync::watch::Receiver<std::option::Option<i32>>,
),
>,
>,
>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "ci=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
pijul_interaction::set_context(pijul_interaction::InteractiveContext::NotInteractive);
let matches = App::parse();
let conf: ConfigFile =
toml::from_str(&std::fs::read_to_string(&matches.config).unwrap()).unwrap();
let addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
let addr = (addr, conf.port).to_socket_addrs().unwrap().next().unwrap();
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let config = Config {
config: Arc::new(conf),
jobs: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
};
let app = Router::new()
.route("/trigger", post(trigger))
.route("/stdout/{job}", get(stdout))
.route("/stderr/{job}", get(stderr))
.route("/status/{job}", get(status))
.with_state(config.clone());
axum::serve(listener, app).await.unwrap();
}
async fn status(
State(config): State<Config>,
Path(job): Path<uuid::Uuid>,
) -> Result<Json<Status>, Error> {
let s =
tokio::fs::read_to_string(&config.config.ci_path.join(&format!("{}.status", job))).await?;
Ok(Json(serde_json::from_str(&s)?))
}
use axum_range::KnownSize;
use axum_range::Ranged;
async fn stdout(
State(config): State<Config>,
Path(job): Path<uuid::Uuid>,
range: Option<TypedHeader<Range>>,
) -> Response {
debug!("stdout {:?} {:?}", job, range);
let Ok(mut file) =
tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stdout", job))).await
else {
return StatusCode::NOT_FOUND.into_response();
};
if let Some(TypedHeader(range)) = range {
let body = KnownSize::file(file).await.unwrap();
Ranged::new(Some(range), body).into_response()
} else {
use tokio::io::AsyncReadExt;
let mut v = Vec::new();
file.read_to_end(&mut v).await.unwrap();
v.into_response()
}
}
async fn stderr(
State(config): State<Config>,
Path(job): Path<uuid::Uuid>,
range: Option<TypedHeader<Range>>,
) -> Response {
debug!("stderr {:?} {:?}", job, range);
let Ok(mut file) =
tokio::fs::File::open(&config.config.ci_path.join(&format!("{}.stderr", job))).await
else {
return StatusCode::NOT_FOUND.into_response();
};
if let Some(TypedHeader(range)) = range {
let body = KnownSize::file(file).await.unwrap();
Ranged::new(Some(range), body).into_response()
} else {
use tokio::io::AsyncReadExt;
let mut v = Vec::new();
file.read_to_end(&mut v).await.unwrap();
v.into_response()
}
}
#[derive(Error, Debug)]
pub enum Error {
#[error("Lock")]
Lock,
#[error("Forbidden")]
Forbidden,
#[error("Not found")]
NotFound,
#[error(transparent)]
Output(
#[from]
pijul_core::output::OutputError<
<C as pijul_core::changestore::ChangeStore>::Error,
pijul_core::pristine::sanakirja::MutTxn0,
std::io::Error,
>,
),
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Sanakirja(#[from] pijul_core::pristine::sanakirja::SanakirjaError),
#[error(transparent)]
IO(#[from] std::io::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
}
type C = pijul_core::changestore::filesystem::FileSystem;
impl IntoResponse for Error {
fn into_response(self) -> Response {
debug!("response {:?}", self);
match self {
Error::Forbidden => (StatusCode::FORBIDDEN, "{}").into_response(),
Error::NotFound => (StatusCode::NOT_FOUND, "{}").into_response(),
_ => (StatusCode::INTERNAL_SERVER_ERROR, "{}").into_response(),
}
}
}
// #[derive(Debug, Clone)]
// struct RepoPath {
// path: PathBuf,
// remove_dir: bool,
// remove_dot: bool,
// }
#[axum::debug_handler]
pub async fn trigger(
State(config): State<Config>,
Json(t): Json<Trigger>,
) -> Result<StatusCode, Error> {
tokio::spawn(async move {
info!("trigger: {:?}", t);
let t0 = std::time::Instant::now();
let mut remote = pijul_remote::RemoteRepo::Http(pijul_remote::http::Http {
url: format!("{}/{}/{}", config.config.nest_url, t.owner, t.repo)
.parse()
.unwrap(),
channel: "main".to_string(),
client: reqwest::ClientBuilder::new().build()?,
headers: vec![],
name: t.state.clone(),
});
let path = tempfile::tempdir().unwrap();
let pijul_config = pijul_config::Config::load(None, Vec::new()).unwrap();
let repo = tokio::sync::Mutex::new(
pijul_repository::Repository::init(&pijul_config, Some(&path.path()), None, None)
.unwrap(),
);
{
let mut repo = repo.lock().await;
let txn = repo.pristine.arc_txn_begin()?;
let mut channel = txn.write().open_or_create_channel("main")?;
let h = t.state.parse().unwrap();
remote
.clone_state(&mut repo, &txn, &mut channel, h)
.await
.unwrap();
pijul_core::output::output_repository_no_pending(
&repo.working_copy,
&repo.changes,
&txn,
&channel,
"",
true,
None,
1, // std::thread::available_parallelism()?.get(),
0,
)?;
remote.finish().await.unwrap();
}
info!("cloned in {:?}", t0.elapsed());
std::fs::remove_dir_all(&path.path().join(".pijul"))?;
let (status_tx, status_rx) = tokio::sync::watch::channel(None);
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
if let Some((k, _)) = config
.jobs
.lock()
.unwrap()
.insert(t.id, (kill_tx, status_rx))
{
k.send(()).unwrap_or(());
}
let result = build(
Some(&config.config.ci_path),
path,
t.id,
&t.targets,
kill_rx,
status_tx,
)
.await;
config.jobs.lock().unwrap().remove(&t.id);
let (code, results) = result?;
debug!("code = {:?} {:?}", code, results);
tokio::fs::write(
config.config.ci_path.join(&format!("{}.status", t.id)),
serde_json::to_string(&Status {
code,
results,
finished: chrono::Utc::now(),
})
.unwrap(),
)
.await?;
Ok::<(), Error>(())
});
Ok(StatusCode::OK)
}
async fn build(
ci: Option<&std::path::Path>,
path: tempfile::TempDir,
id: uuid::Uuid,
targets: &HashMap<String, String>,
mut kill_rx: tokio::sync::oneshot::Receiver<()>,
mut status_tx: tokio::sync::watch::Sender<Option<i32>>,
) -> Result<(Option<i32>, HashMap<String, PathBuf>), Error> {
let mut files = if let Some(ref ci_path) = ci {
tokio::fs::create_dir_all(ci_path).await?;
Some((
OpenOptions::new()
.append(true)
.create(true)
.open(&ci_path.join(&format!("{}.stdout", id)))
.await
.unwrap(),
OpenOptions::new()
.append(true)
.create(true)
.open(&ci_path.join(&format!("{}.stderr", id)))
.await
.unwrap(),
))
} else {
None
};
let mut cmd = tokio::process::Command::new("nix");
cmd.arg("build").arg("-L").arg("--log-format").arg("raw");
for (_, target) in targets {
cmd.arg(target);
}
let cmd = cmd
.current_dir(&path)
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
let code = dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await?;
if code == Some(0) {
let mut cmd = tokio::process::Command::new("attic");
cmd.arg("push").arg("coturnix");
let mut result = HashMap::new();
for (n, (t, _)) in targets.iter().enumerate() {
if n == 0 {
result.insert(
t.clone(),
tokio::fs::read_link(path.path().join("result"))
.await
.unwrap(),
);
cmd.arg("result");
} else {
result.insert(
t.clone(),
tokio::fs::read_link(path.path().join(format!("result-{n}")))
.await
.unwrap(),
);
cmd.arg(format!("result-{n}"));
}
}
let cmd = cmd
.current_dir(&path)
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
Ok((
dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx).await?,
result,
))
} else {
Ok((code, HashMap::new()))
}
}