lib.rs
use serde::*;
use std::collections::HashMap;
use tokio::io::AsyncWriteExt;
use tracing::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct Status {
pub code: Option<i32>,
pub finished: chrono::DateTime<chrono::Utc>,
pub results: HashMap<String, std::path::PathBuf>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Trigger {
pub id: uuid::Uuid,
pub owner: String,
pub repo: String,
pub state: String,
#[serde(default)]
pub targets: HashMap<String, String>,
}
pub async fn dump_cmd(
mut cmd: tokio::process::Child,
files: &mut Option<(tokio::fs::File, tokio::fs::File)>,
mut kill_rx: &mut tokio::sync::oneshot::Receiver<()>,
status_tx: &mut tokio::sync::watch::Sender<Option<i32>>,
) -> Result<Option<i32>, std::io::Error> {
let mut stdout_ok = true;
let mut stderr_ok = true;
let mut buf_stdout = String::new();
let mut last_stdout = std::time::UNIX_EPOCH;
let mut buf_stderr = String::new();
let mut last_stderr = std::time::UNIX_EPOCH;
let bound = std::time::Duration::from_secs(1);
use tokio::io::AsyncBufReadExt;
let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
let mut stdout = stdout.lines();
let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
let mut stderr = stderr.lines();
while stdout_ok || stderr_ok {
debug!(
"stdout || stderr {:?} {:?}",
buf_stdout.len(),
buf_stderr.len()
);
tokio::select! {
line = stdout.next_line(), if stdout_ok => {
debug!("out {:?}", line);
let n = if let Some(line) = line? {
buf_stdout.push_str(&line);
buf_stdout.push('\n');
line.len() + 1
} else {
0
};
if last_stdout.elapsed().unwrap() >= bound || n == 0 {
debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
if let Some((ref mut stdout, _)) = files {
stdout.write_all(buf_stdout.as_bytes()).await?;
stdout.flush().await?;
}
buf_stdout.clear();
debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
last_stdout = std::time::SystemTime::now();
}
if n == 0 {
stdout_ok = false
}
}
line = stderr.next_line(), if stderr_ok => {
debug!("err {:?}", line);
let n = if let Some(line) = line? {
buf_stderr.push_str(&line);
buf_stderr.push('\n');
line.len() + 1
} else {
0
};
if last_stderr.elapsed().unwrap() >= bound || n == 0 {
debug!("sending stderr to db {:?}", buf_stderr.len());
if let Some((_, ref mut stderr)) = files {
stderr.write_all(buf_stderr.as_bytes()).await?;
stderr.flush().await?;
}
buf_stderr.clear();
last_stderr = std::time::SystemTime::now();
}
debug!("{:?}", buf_stderr.len());
if n == 0 {
stderr_ok = false
}
}
_ = &mut kill_rx => {
cmd.kill().await?;
break
}
}
}
tokio::select! {
s = cmd.wait() => {
let code = s?.code();
status_tx.send(code).unwrap();
Ok(code)
}
_ = &mut kill_rx => {
cmd.kill().await?;
Ok(None)
}
}
}