Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

jobs.rs
use crate::permissions::Perm;
use crate::{Config, get_user_login};
use axum::response::IntoResponse;
use axum::{
    Router,
    extract::ws::{WebSocket, WebSocketUpgrade},
    extract::{Json, Path, State},
    response::{Redirect, Response},
    routing::{any, get, post},
};
use axum_extra::extract::SignedCookieJar;
use diesel::{
    BoolExpressionMethods, ExpressionMethods, NullableExpressionMethods, OptionalExtension,
    QueryDsl, Queryable, QueryableByName, Selectable, SelectableHelper,
};
use diesel_async::RunQueryDsl;
use futures::StreamExt;
use inotify::WatchMask;
use pijul_core::Base32;
use serde_derive::*;
use std::collections::hash_map::Entry;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::*;

pub fn router() -> Router<Config> {
    Router::new()
        .route("/{owner}/{repo}", get(list_jobs))
        .route("/{owner}/{repo}/{job_id}", get(job))
        .route("/{owner}/{repo}/{job_id}/retry", post(retry))
        .route("/{owner}/{repo}/{job_id}/kill", post(kill))
        .route("/{owner}/{repo}/{job_id}/ws", any(job_ws_handler))
        .route("/{owner}/{repo}/ws", any(jobs_ws_handler))
}

#[derive(Debug, Deserialize)]
pub struct JobPath {
    owner: String,
    repo: String,
    job_id: uuid::Uuid,
}

#[derive(Debug, Deserialize)]
pub struct JobsPath {
    owner: String,
    repo: String,
}

#[derive(Debug, Selectable, Queryable, QueryableByName, Serialize, Deserialize)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(table_name = crate::db::jobs::dsl)]
struct Job {
    id: uuid::Uuid,
    started: chrono::DateTime<chrono::Utc>,
    ended: Option<chrono::DateTime<chrono::Utc>>,
    status: Option<i32>,
}

#[derive(Debug, Serialize)]
pub struct Jobs {
    login: Option<String>,
    jobs: Vec<Job>,
}

pub async fn list_jobs(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobsPath>,
) -> Result<Json<Jobs>, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    Ok(Json(Jobs {
        login,
        jobs: repos::repositories
            .inner_join(jobs::jobs)
            .inner_join(users::users)
            .filter(users::login.eq(path.owner))
            .filter(repos::name.eq(path.repo))
            .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
                uid.unwrap_or(uuid::Uuid::nil()),
                repos::id,
                Perm::READ_JOBS.bits()
            )))
            .select(Job::as_select())
            .order_by(jobs::started.desc())
            .get_results::<Job>(&mut db)
            .await?,
    }))
}

#[derive(Debug, Serialize)]
pub struct Job_ {
    login: Option<String>,
    #[serde(flatten)]
    job: Job,
}

pub async fn job(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobPath>,
) -> Result<Json<Job_>, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    if let Some(job) = repos::repositories
        .inner_join(jobs::jobs)
        .inner_join(users::users)
        .filter(users::login.eq(path.owner))
        .filter(repos::name.eq(path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::READ_JOBS.bits()
        )))
        .filter(jobs::id.eq(path.job_id))
        .select(Job::as_select())
        .order_by(jobs::started.desc())
        .get_result::<Job>(&mut db)
        .await
        .optional()?
    {
        Ok(Json(Job_ { login, job }))
    } else {
        Err(crate::Error::NotFound)
    }
}

pub async fn retry(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobPath>,
) -> Result<Redirect, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    if let Some(repo) = repos::repositories
        .inner_join(jobs::jobs)
        .inner_join(users::users)
        .filter(users::login.eq(&path.owner))
        .filter(repos::name.eq(&path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::WRITE_JOBS.bits()
        )))
        .filter(jobs::id.eq(path.job_id))
        .select(repos::id)
        .get_result::<uuid::Uuid>(&mut db)
        .await
        .optional()?
    {
        let mut db = config.db.get().await.unwrap();
        use crate::db::jobs::dsl as jobs;
        let (Some(state), channel) = jobs::jobs
            .find(&path.job_id)
            .select((jobs::repo_state, jobs::channel))
            .get_result::<(Option<String>, String)>(&mut db)
            .await
            .unwrap()
        else {
            return Err(crate::Error::NotFound);
        };

        let id = diesel::insert_into(jobs::jobs)
            .values((jobs::repo.eq(repo), jobs::repo_state.eq(&state)))
            .returning(jobs::id)
            .get_result::<uuid::Uuid>(&mut db)
            .await
            .unwrap();

        use crate::db::repositories::dsl as repositories;
        use crate::db::users::dsl as users;

        if let Some((state, deployment)) =
            crate::replication::get_config_state(&config.repo_locks, repo, channel)
                .await
                .unwrap()
        {
            let (owner, repo) = repositories::repositories
                .find(repo)
                .inner_join(users::users)
                .select((users::login, repositories::name))
                .get_result::<(String, String)>(&mut db)
                .await
                .unwrap();
            let mut targets = std::collections::HashMap::new();
            targets.insert(deployment.clone(), format!(".#{}", deployment));
            let body = serde_json::to_string(&ci::Trigger {
                id,
                owner,
                repo,
                state: state.to_base32(),
                targets,
            })
            .unwrap();
            debug!("{:?} {}", config.ci.url, body);
            tokio::spawn(async move {
                let client = reqwest::Client::new();
                for (n, ci_url) in config.ci.url.iter().enumerate() {
                    let res = client
                        .post(ci_url.clone() + "/trigger")
                        .header("Content-Type", "application/json")
                        .body(body.clone())
                        .send()
                        .await
                        .unwrap();
                    if let reqwest::StatusCode::OK = res.status() {
                        crate::replication::sync_job(&mut db, client, &config.ci, n, id).await;
                        break;
                    }
                }
            });
        }
        Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))
    } else {
        Err(crate::Error::NotFound)
    }
}

pub async fn kill(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobPath>,
) -> Result<Redirect, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    if let Some(repo) = repos::repositories
        .inner_join(jobs::jobs)
        .inner_join(users::users)
        .filter(users::login.eq(&path.owner))
        .filter(repos::name.eq(&path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::WRITE_JOBS.bits()
        )))
        .filter(jobs::id.eq(path.job_id))
        .select(repos::id)
        .get_result::<uuid::Uuid>(&mut db)
        .await
        .optional()?
    {
        let client = reqwest::Client::new();

        for cl in config.ci.url.iter() {
            let res = client
                .post(format!("{}/kill/{}", cl, path.job_id))
                .header("Content-Type", "application/json")
                .send()
                .await
                .unwrap();
        }
        Ok(Redirect::to(&format!("/{}/{}/jobs", path.owner, path.repo)))
    } else {
        Err(crate::Error::NotFound)
    }
}

pub async fn job_ws_handler(
    State(config): State<Config>,
    Path(path): Path<JobPath>,
    jar: SignedCookieJar,
    ws: WebSocketUpgrade,
) -> Response {
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;
    let (uid, login) = if let Ok(Some((a, b))) = get_user_login(&jar, &config).await {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await.unwrap();
    if repos::repositories
        .inner_join(users::users)
        .filter(users::login.eq(path.owner))
        .filter(repos::name.eq(path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::READ_JOBS.bits()
        )))
        .select(repos::id)
        .get_result::<uuid::Uuid>(&mut db)
        .await
        .optional()
        .unwrap()
        .is_none()
    {
        return crate::Error::NeedsAuth.into_response();
    }

    ws.on_upgrade(move |socket| job_handle_socket(config, path.job_id, socket))
}

pub async fn jobs_ws_handler(
    State(config): State<Config>,
    Path(path): Path<JobPath>,
    jar: SignedCookieJar,
    ws: WebSocketUpgrade,
) -> Response {
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await.unwrap() {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await.unwrap();
    let Some(repo_id) = repos::repositories
        .inner_join(users::users)
        .filter(users::login.eq(path.owner))
        .filter(repos::name.eq(path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::READ_JOBS.bits()
        )))
        .select(repos::id)
        .get_result::<uuid::Uuid>(&mut db)
        .await
        .optional()
        .unwrap()
    else {
        return crate::Error::NeedsAuth.into_response();
    };

    ws.on_upgrade(move |socket| jobs_handle_socket(config, repo_id, socket))
}

#[derive(Debug, Deserialize, Serialize)]
enum JobMsg<'a> {
    State {
        stdout: usize,
        stderr: usize,
    },
    Chunk {
        channel: i32,
        offset: usize,
        content: &'a str,
    },
    Status {
        ended: chrono::DateTime<chrono::Utc>,
        status: Option<i32>,
    },
    Heartbeat,
}

async fn job_handle_socket(config: Config, id: uuid::Uuid, mut socket: WebSocket) {
    let mut remote_stdout = 0;
    let mut remote_stderr = 0;
    send_all(
        &config,
        id,
        &mut remote_stdout,
        &mut remote_stderr,
        &mut socket,
    )
    .await
    .unwrap();

    let Some(mut status) = config
        .jobs
        .lock()
        .unwrap()
        .get(&id)
        .map(|(_, _, w)| w.clone())
    else {
        debug!("closing");
        return;
    };
    let mut status_ok = true;
    let mut notify_ok = true;

    let mut notify_buffer = [0; 1024];
    let mut notify = if let Some(ref path) = config.ci.filesystem {
        let inotify = inotify::Inotify::init().expect("Error while initializing inotify instance");
        let mut w = inotify.watches();
        w.add(&path.join(&format!("{}.stdout", id)), WatchMask::MODIFY)
            .unwrap();
        w.add(&path.join(&format!("{}.stderr", id)), WatchMask::MODIFY)
            .unwrap();
        inotify.into_event_stream(&mut notify_buffer).unwrap()
    } else {
        inotify::Inotify::init()
            .unwrap()
            .into_event_stream(&mut notify_buffer)
            .unwrap()
    };

    let mut stdout = String::new();
    let mut stderr = String::new();
    let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));

    while notify_ok || status_ok {
        debug!("waiting job");
        tokio::select! {
            _ = interval.tick() => {
                socket.send(serde_json::to_string(&JobMsg::Heartbeat).unwrap().into()).await.unwrap_or(());
            }
            n = notify.next(), if notify_ok => {
                debug!("{:?}", n);
                if n.is_none() {
                    notify_ok = false;
                    continue
                }
                if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
                    .await
                    .unwrap()
                {
                    socket.send(a.into()).await.unwrap_or(());
                    socket.send(b.into()).await.unwrap_or(());
                }
            }
            x = status.changed(), if status_ok => {
                debug!("status {:?}", x);
                if x.is_err() {
                    status_ok = false
                }
                let status = status.borrow_and_update().clone();
                if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
                    .await
                    .unwrap()
                {
                    socket.send(a.into()).await.unwrap_or(());
                    socket.send(b.into()).await.unwrap_or(());
                }
                debug!("status {:?}", status);
                if let Some((ended, status)) = status {
                    socket.send(serde_json::to_string(&JobMsg::Status {
                        ended,
                        status
                    }).unwrap().into()).await.unwrap_or(());
                } else {
                    debug!("Nothing to send");
                }
            }
            else =>{
                socket.send(serde_json::to_string(&JobMsg::Status {
                    ended: chrono::DateTime::UNIX_EPOCH,
                    status: None,
                }).unwrap().into()).await.unwrap_or(());
                break
            }

        }
    }
}

async fn send_all(
    config: &Config,
    id: uuid::Uuid,
    remote_stdout: &mut usize,
    remote_stderr: &mut usize,
    socket: &mut WebSocket,
) -> Result<(), crate::Error> {
    use crate::db::jobs::dsl as jobs;
    if let Some((ended, status)) = jobs::jobs
        .find(id)
        .select((jobs::ended, jobs::status))
        .get_result::<(Option<chrono::DateTime<chrono::Utc>>, Option<i32>)>(
            &mut config.db.get().await.unwrap(),
        )
        .await
        .optional()
        .unwrap()
    {
        if let Some(ended) = ended {
            socket
                .send(
                    serde_json::to_string(&JobMsg::Status { ended, status })
                        .unwrap()
                        .into(),
                )
                .await
                .unwrap();
        }
        let Some(ref path) = config.ci.filesystem else {
            return Ok(());
        };

        let mut outf = BufReader::new(
            tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
                .await
                .unwrap(),
        );
        outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
            .await?;

        let mut buf = String::with_capacity(8192);
        while let Ok(n) = outf.read_line(&mut buf).await {
            *remote_stdout += n;
            if buf.len() >= 4096 || n == 0 {
                socket
                    .send(
                        serde_json::to_string(&JobMsg::Chunk {
                            channel: 0,
                            offset: 0,
                            content: &buf,
                        })
                        .unwrap()
                        .into(),
                    )
                    .await
                    .unwrap();
                buf.clear()
            }
            if n == 0 {
                break;
            }
        }
        buf.clear();
        let mut errf = BufReader::new(
            tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
                .await
                .unwrap(),
        );
        errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
            .await?;
        while let Ok(n) = errf.read_line(&mut buf).await {
            *remote_stderr += n;
            if buf.len() >= 4096 || n == 0 {
                socket
                    .send(
                        serde_json::to_string(&JobMsg::Chunk {
                            channel: 1,
                            offset: 0,
                            content: &buf,
                        })
                        .unwrap()
                        .into(),
                    )
                    .await
                    .unwrap();
                buf.clear()
            }
            if n == 0 {
                break;
            }
        }
    }
    Ok(())
}

async fn send_remaining(
    config: &Config,
    id: uuid::Uuid,
    remote_stdout: &mut usize,
    remote_stderr: &mut usize,
    stdout: &mut String,
    stderr: &mut String,
) -> Result<Option<(String, String)>, crate::Error> {
    if let Some(ref path) = config.ci.filesystem {
        let mut outf = tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
            .await
            .unwrap();
        let mut errf = tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
            .await
            .unwrap();
        outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
            .await?;
        errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
            .await?;
        stdout.clear();
        stderr.clear();
        outf.read_to_string(stdout).await?;
        errf.read_to_string(stderr).await?;
        *remote_stdout += stdout.len();
        *remote_stderr += stderr.len();

        Ok(Some((
            serde_json::to_string(&JobMsg::Chunk {
                channel: 0,
                offset: 0,
                content: &stdout,
            })
            .unwrap(),
            serde_json::to_string(&JobMsg::Chunk {
                channel: 1,
                offset: 0,
                content: &stderr,
            })
            .unwrap(),
        )))
    } else {
        Ok(None)
    }
}

#[derive(Debug, Deserialize, Serialize)]
enum JobsMsg {
    Jobs(Vec<Job>),
    Heartbeat,
}

// Update jobs page list.
async fn jobs_handle_socket(config: Config, repo_id: uuid::Uuid, mut socket: WebSocket) {
    use crate::db::jobs::dsl as jobs;
    let mut latest = {
        let mut latest = config.jobs_latest.lock().await;
        match latest.entry(repo_id) {
            Entry::Vacant(e) => {
                let last = jobs::jobs
                    .filter(jobs::repo.eq(repo_id))
                    .select(jobs::id)
                    .order_by(jobs::started.desc())
                    .first::<uuid::Uuid>(&mut config.db.get().await.unwrap())
                    .await
                    .unwrap();

                let (sender, receiver) = tokio::sync::watch::channel(last);
                e.insert(sender);
                receiver
            }
            Entry::Occupied(e) => e.get().subscribe(),
        }
    };

    let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
    loop {
        debug!("waiting job");
        tokio::select! {
            _ = interval.tick() => {
                socket.send(serde_json::to_string(&JobsMsg::Heartbeat).unwrap().into()).await.unwrap_or(());
            }
            x = latest.changed() => {
                debug!("status {:?}", x);
                let mut db = config.db.get().await.unwrap();
                let latest = latest.borrow_and_update().clone();
                let jobs = jobs::jobs
                    .filter(jobs::repo.eq(repo_id))
                    .select(Job::as_select())
                    .order_by(jobs::started.desc())
                    .get_results::<Job>(&mut db)
                    .await.unwrap();
                socket.send(serde_json::to_string(&JobsMsg::Jobs(jobs)).unwrap().into()).await.unwrap_or(());
                break
            }
        }
    }
}