jobs.rs
use crate::permissions::Perm;
use crate::{Config, get_user_login};
use axum::response::IntoResponse;
use axum::{
Router,
extract::ws::{WebSocket, WebSocketUpgrade},
extract::{Json, Path, State},
response::{Redirect, Response},
routing::{any, get, post},
};
use axum_extra::extract::SignedCookieJar;
use diesel::{
BoolExpressionMethods, ExpressionMethods, NullableExpressionMethods, OptionalExtension,
QueryDsl, Queryable, QueryableByName, Selectable, SelectableHelper,
};
use diesel_async::RunQueryDsl;
use futures::StreamExt;
use inotify::WatchMask;
use pijul_core::Base32;
use serde_derive::*;
use std::collections::hash_map::Entry;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::*;
pub fn router() -> Router<Config> {
Router::new()
.route("/{owner}/{repo}", get(list_jobs))
.route("/{owner}/{repo}/{job_id}", get(job))
.route("/{owner}/{repo}/{job_id}/retry", post(retry))
.route("/{owner}/{repo}/{job_id}/kill", post(kill))
.route("/{owner}/{repo}/{job_id}/ws", any(job_ws_handler))
.route("/{owner}/{repo}/ws", any(jobs_ws_handler))
}
#[derive(Debug, Deserialize)]
pub struct JobPath {
owner: String,
repo: String,
job_id: uuid::Uuid,
}
#[derive(Debug, Deserialize)]
pub struct JobsPath {
owner: String,
repo: String,
}
#[derive(Debug, Selectable, Queryable, QueryableByName, Serialize, Deserialize)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(table_name = crate::db::jobs::dsl)]
struct Job {
id: uuid::Uuid,
started: chrono::DateTime<chrono::Utc>,
ended: Option<chrono::DateTime<chrono::Utc>>,
status: Option<i32>,
}
#[derive(Debug, Serialize)]
pub struct Jobs {
login: Option<String>,
jobs: Vec<Job>,
}
pub async fn list_jobs(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobsPath>,
) -> Result<Json<Jobs>, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
Ok(Json(Jobs {
login,
jobs: repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.select(Job::as_select())
.order_by(jobs::started.desc())
.get_results::<Job>(&mut db)
.await?,
}))
}
#[derive(Debug, Serialize)]
pub struct Job_ {
login: Option<String>,
#[serde(flatten)]
job: Job,
}
pub async fn job(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobPath>,
) -> Result<Json<Job_>, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
if let Some(job) = repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.filter(jobs::id.eq(path.job_id))
.select(Job::as_select())
.order_by(jobs::started.desc())
.get_result::<Job>(&mut db)
.await
.optional()?
{
Ok(Json(Job_ { login, job }))
} else {
Err(crate::Error::NotFound)
}
}
pub async fn retry(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobPath>,
) -> Result<Redirect, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
if let Some(repo) = repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(&path.owner))
.filter(repos::name.eq(&path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::WRITE_JOBS.bits()
)))
.filter(jobs::id.eq(path.job_id))
.select(repos::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.optional()?
{
let mut db = config.db.get().await.unwrap();
use crate::db::jobs::dsl as jobs;
let (Some(state), channel) = jobs::jobs
.find(&path.job_id)
.select((jobs::repo_state, jobs::channel))
.get_result::<(Option<String>, String)>(&mut db)
.await
.unwrap()
else {
return Err(crate::Error::NotFound);
};
let id = diesel::insert_into(jobs::jobs)
.values((jobs::repo.eq(repo), jobs::repo_state.eq(&state)))
.returning(jobs::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.unwrap();
use crate::db::repositories::dsl as repositories;
use crate::db::users::dsl as users;
if let Some((state, deployment)) =
crate::replication::get_config_state(&config.repo_locks, repo, channel)
.await
.unwrap()
{
let (owner, repo) = repositories::repositories
.find(repo)
.inner_join(users::users)
.select((users::login, repositories::name))
.get_result::<(String, String)>(&mut db)
.await
.unwrap();
let mut targets = std::collections::HashMap::new();
targets.insert(deployment.clone(), format!(".#{}", deployment));
let body = serde_json::to_string(&ci::Trigger {
id,
owner,
repo,
state: state.to_base32(),
targets,
})
.unwrap();
debug!("{:?} {}", config.ci.url, body);
tokio::spawn(async move {
let client = reqwest::Client::new();
for (n, ci_url) in config.ci.url.iter().enumerate() {
let res = client
.post(ci_url.clone() + "/trigger")
.header("Content-Type", "application/json")
.body(body.clone())
.send()
.await
.unwrap();
if let reqwest::StatusCode::OK = res.status() {
crate::replication::sync_job(&mut db, client, &config.ci, n, id).await;
break;
}
}
});
}
Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))
} else {
Err(crate::Error::NotFound)
}
}
pub async fn kill(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobPath>,
) -> Result<Redirect, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
if let Some(repo) = repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(&path.owner))
.filter(repos::name.eq(&path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::WRITE_JOBS.bits()
)))
.filter(jobs::id.eq(path.job_id))
.select(repos::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.optional()?
{
let client = reqwest::Client::new();
for cl in config.ci.url.iter() {
let res = client
.post(format!("{}/kill/{}", cl, path.job_id))
.header("Content-Type", "application/json")
.send()
.await
.unwrap();
}
Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))
} else {
Err(crate::Error::NotFound)
}
}
pub async fn job_ws_handler(
State(config): State<Config>,
Path(path): Path<JobPath>,
jar: SignedCookieJar,
ws: WebSocketUpgrade,
) -> Response {
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
let (uid, login) = if let Ok(Some((a, b))) = get_user_login(&jar, &config).await {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await.unwrap();
if repos::repositories
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.select(repos::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.optional()
.unwrap()
.is_none()
{
return crate::Error::NeedsAuth.into_response();
}
ws.on_upgrade(move |socket| job_handle_socket(config, path.job_id, socket))
}
pub async fn jobs_ws_handler(
State(config): State<Config>,
Path(path): Path<JobPath>,
jar: SignedCookieJar,
ws: WebSocketUpgrade,
) -> Response {
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await.unwrap() {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await.unwrap();
let Some(repo_id) = repos::repositories
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.select(repos::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.optional()
.unwrap()
else {
return crate::Error::NeedsAuth.into_response();
};
ws.on_upgrade(move |socket| jobs_handle_socket(config, repo_id, socket))
}
#[derive(Debug, Deserialize, Serialize)]
enum JobMsg<'a> {
State {
stdout: usize,
stderr: usize,
},
Chunk {
channel: i32,
offset: usize,
content: &'a str,
},
Status {
ended: chrono::DateTime<chrono::Utc>,
status: Option<i32>,
},
Heartbeat,
}
async fn job_handle_socket(config: Config, id: uuid::Uuid, mut socket: WebSocket) {
let mut remote_stdout = 0;
let mut remote_stderr = 0;
send_all(
&config,
id,
&mut remote_stdout,
&mut remote_stderr,
&mut socket,
)
.await
.unwrap();
let Some(mut status) = config
.jobs
.lock()
.unwrap()
.get(&id)
.map(|(_, _, w)| w.clone())
else {
debug!("closing");
return;
};
let mut status_ok = true;
let mut notify_ok = true;
let mut notify_buffer = [0; 1024];
let mut notify = if let Some(ref path) = config.ci.filesystem {
let inotify = inotify::Inotify::init().expect("Error while initializing inotify instance");
let mut w = inotify.watches();
w.add(&path.join(&format!("{}.stdout", id)), WatchMask::MODIFY)
.unwrap();
w.add(&path.join(&format!("{}.stderr", id)), WatchMask::MODIFY)
.unwrap();
inotify.into_event_stream(&mut notify_buffer).unwrap()
} else {
inotify::Inotify::init()
.unwrap()
.into_event_stream(&mut notify_buffer)
.unwrap()
};
let mut stdout = String::new();
let mut stderr = String::new();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
while notify_ok || status_ok {
debug!("waiting job");
tokio::select! {
_ = interval.tick() => {
socket.send(serde_json::to_string(&JobMsg::Heartbeat).unwrap().into()).await.unwrap_or(());
}
n = notify.next(), if notify_ok => {
debug!("{:?}", n);
if n.is_none() {
notify_ok = false;
continue
}
if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
.await
.unwrap()
{
socket.send(a.into()).await.unwrap_or(());
socket.send(b.into()).await.unwrap_or(());
}
}
x = status.changed(), if status_ok => {
debug!("status {:?}", x);
if x.is_err() {
status_ok = false
}
let status = status.borrow_and_update().clone();
if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
.await
.unwrap()
{
socket.send(a.into()).await.unwrap_or(());
socket.send(b.into()).await.unwrap_or(());
}
debug!("status {:?}", status);
if let Some((ended, status)) = status {
socket.send(serde_json::to_string(&JobMsg::Status {
ended,
status
}).unwrap().into()).await.unwrap_or(());
} else {
debug!("Nothing to send");
}
}
else =>{
socket.send(serde_json::to_string(&JobMsg::Status {
ended: chrono::DateTime::UNIX_EPOCH,
status: None,
}).unwrap().into()).await.unwrap_or(());
break
}
}
}
}
async fn send_all(
config: &Config,
id: uuid::Uuid,
remote_stdout: &mut usize,
remote_stderr: &mut usize,
socket: &mut WebSocket,
) -> Result<(), crate::Error> {
use crate::db::jobs::dsl as jobs;
if let Some((ended, status)) = jobs::jobs
.find(id)
.select((jobs::ended, jobs::status))
.get_result::<(Option<chrono::DateTime<chrono::Utc>>, Option<i32>)>(
&mut config.db.get().await.unwrap(),
)
.await
.optional()
.unwrap()
{
if let Some(ended) = ended {
socket
.send(
serde_json::to_string(&JobMsg::Status { ended, status })
.unwrap()
.into(),
)
.await
.unwrap();
}
let Some(ref path) = config.ci.filesystem else {
return Ok(());
};
let mut outf = BufReader::new(
tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
.await
.unwrap(),
);
outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
.await?;
let mut buf = String::with_capacity(8192);
while let Ok(n) = outf.read_line(&mut buf).await {
*remote_stdout += n;
if buf.len() >= 4096 || n == 0 {
socket
.send(
serde_json::to_string(&JobMsg::Chunk {
channel: 0,
offset: 0,
content: &buf,
})
.unwrap()
.into(),
)
.await
.unwrap();
buf.clear()
}
if n == 0 {
break;
}
}
buf.clear();
let mut errf = BufReader::new(
tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
.await
.unwrap(),
);
errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
.await?;
while let Ok(n) = errf.read_line(&mut buf).await {
*remote_stderr += n;
if buf.len() >= 4096 || n == 0 {
socket
.send(
serde_json::to_string(&JobMsg::Chunk {
channel: 1,
offset: 0,
content: &buf,
})
.unwrap()
.into(),
)
.await
.unwrap();
buf.clear()
}
if n == 0 {
break;
}
}
}
Ok(())
}
async fn send_remaining(
config: &Config,
id: uuid::Uuid,
remote_stdout: &mut usize,
remote_stderr: &mut usize,
stdout: &mut String,
stderr: &mut String,
) -> Result<Option<(String, String)>, crate::Error> {
if let Some(ref path) = config.ci.filesystem {
let mut outf = tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
.await
.unwrap();
let mut errf = tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
.await
.unwrap();
outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
.await?;
errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
.await?;
stdout.clear();
stderr.clear();
outf.read_to_string(stdout).await?;
errf.read_to_string(stderr).await?;
*remote_stdout += stdout.len();
*remote_stderr += stderr.len();
Ok(Some((
serde_json::to_string(&JobMsg::Chunk {
channel: 0,
offset: 0,
content: &stdout,
})
.unwrap(),
serde_json::to_string(&JobMsg::Chunk {
channel: 1,
offset: 0,
content: &stderr,
})
.unwrap(),
)))
} else {
Ok(None)
}
}
#[derive(Debug, Deserialize, Serialize)]
enum JobsMsg {
Jobs(Vec<Job>),
Heartbeat,
}
// Update jobs page list.
async fn jobs_handle_socket(config: Config, repo_id: uuid::Uuid, mut socket: WebSocket) {
use crate::db::jobs::dsl as jobs;
let mut latest = {
let mut latest = config.jobs_latest.lock().await;
match latest.entry(repo_id) {
Entry::Vacant(e) => {
let last = jobs::jobs
.filter(jobs::repo.eq(repo_id))
.select(jobs::id)
.order_by(jobs::started.desc())
.first::<uuid::Uuid>(&mut config.db.get().await.unwrap())
.await
.unwrap();
let (sender, receiver) = tokio::sync::watch::channel(last);
e.insert(sender);
receiver
}
Entry::Occupied(e) => e.get().subscribe(),
}
};
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
debug!("waiting job");
tokio::select! {
_ = interval.tick() => {
socket.send(serde_json::to_string(&JobsMsg::Heartbeat).unwrap().into()).await.unwrap_or(());
}
x = latest.changed() => {
debug!("status {:?}", x);
let mut db = config.db.get().await.unwrap();
let latest = latest.borrow_and_update().clone();
let jobs = jobs::jobs
.filter(jobs::repo.eq(repo_id))
.select(Job::as_select())
.order_by(jobs::started.desc())
.get_results::<Job>(&mut db)
.await.unwrap();
socket.send(serde_json::to_string(&JobsMsg::Jobs(jobs)).unwrap().into()).await.unwrap_or(());
break
}
}
}
}