W4LOGDLDBZIUUNACCXWWUZGVEGWAXYAZI3C66HYHURFQ3CWZ7XKAC let pri = repo_.pristine.blocking_write();let txn = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);let channel_ = {let mut txn_ = txn.write();txn_.open_or_create_channel(&channel)?};s.deploy(&txn, &channel_, &repo_.changes, repo)?;
if let Some((temp, depl)) = {let pri = repo_.pristine.blocking_write();let txn = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);let channel_ = {let mut txn_ = txn.write();txn_.open_or_create_channel(&channel)?};let changes = repo_.changes.clone();s.output_for_deployment(&txn, &channel_, &changes)?} {tokio::spawn(async move {use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap()).await.unwrap();let permit = s.builders.acquire().await.unwrap();{s.deploy(id, temp, depl).await?;}std::mem::drop(permit);Ok::<_, Error>(())});}
let jobs = self.jobs.clone();let builders = self.builders.clone();let ci = self.ci.clone();
let tmp_dir = tempfile::tempdir()?;let wc =libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());libpijul::output::output_repository_no_pending(&wc, &changes, &txn, &channel, "", true, None, 1, 0,)?;return Ok(Some((tmp_dir, depl)));}}}Ok(None)}
tokio::spawn(async move {let permit = builders.acquire().await.unwrap();use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut db.get().await.unwrap()).await.unwrap();let mut tmp_dir = tempfile::tempdir()?;tmp_dir.disable_cleanup(true);let wc = libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path(),);tokio::task::spawn_blocking(move || {libpijul::output::output_repository_no_pending(&wc, &changes, &txn, &channel, "", true, None, 1, 0,)?;Ok::<_, Error>(())}).await.unwrap().unwrap();use std::process::Stdio;let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();let p = tmp_dir.path().join(depl);debug!("launching {:?}", p);
async fn deploy(&self,id: uuid::Uuid,tmp_dir: tempfile::TempDir,depl: String,) -> Result<(), Error> {use std::process::Stdio;let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();let p = tmp_dir.path().join(depl);debug!("launching {:?}", p);
let mut cmd = tokio::process::Command::new(p).current_dir(tmp_dir.path()).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap();
let mut cmd = tokio::process::Command::new(p).current_dir(tmp_dir.path()).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap();
use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let mut stdout_ok = true;let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines();
use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let mut stdout_ok = true;let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines();
let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);let mut files = if let Some(ref path) = ci.filesystem {Some((OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stderr", id))).await.unwrap(),))} else {None};while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len());tokio::select! {line = stdout.next_line(), if stdout_ok => {let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len()} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);let mut files = if let Some(ref path) = self.ci.filesystem {Some((OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stderr", id))).await.unwrap(),))} else {None};while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len());tokio::select! {line = stdout.next_line(), if stdout_ok => {let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len()} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?;}buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {let n = if let Some(line) = line ?{buf_stderr.push_str(&line);buf_stderr.push('\n');line.len()} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false}}}
if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?;
let status = tokio::select! {status = cmd.wait() => {status?.code()}_ = kill_rx => {cmd.kill().await?;None}};debug!("process exited with {:?}", status);debug!("stderr {}", buf_stderr);debug!("stdout {}", buf_stdout);let now = chrono::Utc::now();diesel::update(jobs::jobs.find(id)).set((jobs::status.eq(status), jobs::ended.eq(&now))).execute(&mut db.get().await.unwrap()).await?;status_tx.send(Some((now, status))).unwrap();jobs.lock().unwrap().remove(&id);std::mem::drop(permit);Ok::<_, Error>(())});
buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {let n = if let Some(line) = line ?{buf_stderr.push_str(&line);buf_stderr.push('\n');line.len()} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false}
Ok(())
let status = tokio::select! {status = cmd.wait() => {status?.code()}_ = kill_rx => {cmd.kill().await?;None}};debug!("process exited with {:?}", status);debug!("stderr {}", buf_stderr);debug!("stdout {}", buf_stdout);let now = chrono::Utc::now();use crate::db::jobs::dsl as jobs;diesel::update(jobs::jobs.find(id)).set((jobs::status.eq(status), jobs::ended.eq(&now))).execute(&mut self.db.get().await.unwrap()).await?;status_tx.send(Some((now, status))).unwrap();self.jobs.lock().unwrap().remove(&id);Ok::<_, Error>(())