Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

lib.rs
use serde::*;
use std::collections::HashMap;
use tokio::io::AsyncWriteExt;
use tracing::*;

#[derive(Debug, Serialize, Deserialize)]
pub struct Status {
    pub code: Option<i32>,
    pub finished: chrono::DateTime<chrono::Utc>,
    pub results: HashMap<String, std::path::PathBuf>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Trigger {
    pub id: uuid::Uuid,
    pub owner: String,
    pub repo: String,
    pub state: String,
    #[serde(default)]
    pub targets: HashMap<String, String>,
}

pub async fn dump_cmd(
    mut cmd: tokio::process::Child,
    files: &mut Option<(tokio::fs::File, tokio::fs::File)>,
    mut kill_rx: &mut tokio::sync::oneshot::Receiver<()>,
    status_tx: &mut tokio::sync::watch::Sender<Option<i32>>,
) -> Result<Option<i32>, std::io::Error> {
    let mut stdout_ok = true;
    let mut stderr_ok = true;
    let mut buf_stdout = String::new();
    let mut last_stdout = std::time::UNIX_EPOCH;
    let mut buf_stderr = String::new();
    let mut last_stderr = std::time::UNIX_EPOCH;
    let bound = std::time::Duration::from_secs(1);

    use tokio::io::AsyncBufReadExt;
    let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
    let mut stdout = stdout.lines();
    let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
    let mut stderr = stderr.lines();

    while stdout_ok || stderr_ok {
        debug!(
            "stdout || stderr {:?} {:?}",
            buf_stdout.len(),
            buf_stderr.len()
        );
        tokio::select! {
            line = stdout.next_line(), if stdout_ok => {
                debug!("out {:?}", line);
                let n = if let Some(line) = line? {
                    buf_stdout.push_str(&line);
                    buf_stdout.push('\n');
                    line.len() + 1
                } else {
                    0
                };
                if last_stdout.elapsed().unwrap() >= bound  || n == 0 {
                    debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());

                    if let Some((ref mut stdout, _)) = files {
                        stdout.write_all(buf_stdout.as_bytes()).await?;
                        stdout.flush().await?;
                    }
                    buf_stdout.clear();
                    debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
                    last_stdout = std::time::SystemTime::now();
                }
                if n == 0 {
                    stdout_ok = false
                }
            }
            line = stderr.next_line(), if stderr_ok => {
                debug!("err {:?}", line);
                let n = if let Some(line) = line? {
                    buf_stderr.push_str(&line);
                    buf_stderr.push('\n');
                    line.len() + 1
                } else {
                    0
                };
                if last_stderr.elapsed().unwrap() >= bound || n == 0 {
                    debug!("sending stderr to db {:?}", buf_stderr.len());
                    if let Some((_, ref mut stderr)) = files {
                        stderr.write_all(buf_stderr.as_bytes()).await?;
                        stderr.flush().await?;
                    }
                    buf_stderr.clear();
                    last_stderr = std::time::SystemTime::now();
                }
                debug!("{:?}", buf_stderr.len());
                if n == 0 {
                    stderr_ok = false
                }
            }
            _ = &mut kill_rx => {
                cmd.kill().await?;
                break
            }
        }
    }

    tokio::select! {
        s = cmd.wait() => {
            let code = s?.code();
            status_tx.send(code).unwrap();
            Ok(code)
        }
        _ = &mut kill_rx => {
            cmd.kill().await?;
            Ok(None)
        }
    }
}