Removing locks when deploying
Dependencies
- [2]
YYJ76Q7VInitial attempt at CI/CD - [3]
OA6YWBAYcreate log files - [4]
NRBML6UEUpdating ssh + debug - [5]
D4SBRY6KInitial patch
Change contents
- replacement in api/src/replication.rs at line 152
let pri = repo_.pristine.blocking_write();let txn = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);let channel_ = {let mut txn_ = txn.write();txn_.open_or_create_channel(&channel)?};s.deploy(&txn, &channel_, &repo_.changes, repo)?;if let Some((temp, depl)) = {let pri = repo_.pristine.blocking_write();let txn = pri.arc_txn_begin()?;let channel = format!("{}_{}", repo, channel);let channel_ = {let mut txn_ = txn.write();txn_.open_or_create_channel(&channel)?};let changes = repo_.changes.clone();s.output_for_deployment(&txn, &channel_, &changes)?} {tokio::spawn(async move {use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap()).await.unwrap();let permit = s.builders.acquire().await.unwrap();{s.deploy(id, temp, depl).await?;}std::mem::drop(permit);Ok::<_, Error>(())});} - replacement in api/src/replication.rs at line 329
fn deploy<fn output_for_deployment< - replacement in api/src/replication.rs at line 336
repo: uuid::Uuid,) -> Result<(), Error> {) -> Result<Option<(tempfile::TempDir, String)>, Error> { - edit in api/src/replication.rs at line 343
let db = self.db.clone(); - replacement in api/src/replication.rs at line 346
let jobs = self.jobs.clone();let builders = self.builders.clone();let ci = self.ci.clone();let tmp_dir = tempfile::tempdir()?;let wc =libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());libpijul::output::output_repository_no_pending(&wc, &changes, &txn, &channel, "", true, None, 1, 0,)?;return Ok(Some((tmp_dir, depl)));}}}Ok(None)} - replacement in api/src/replication.rs at line 359[2.23216]→[2.23216:23650](∅→∅),[2.23650]→[4.442:516](∅→∅),[4.516]→[2.23687:23688](∅→∅),[2.23687]→[2.23687:23688](∅→∅),[2.23688]→[4.517:636](∅→∅),[4.636]→[2.23748:24271](∅→∅),[2.23748]→[2.23748:24271](∅→∅),[2.24271]→[4.637:706](∅→∅),[4.706]→[2.24307:24526](∅→∅),[2.24307]→[2.24307:24526](∅→∅),[2.24526]→[4.707:819](∅→∅)
tokio::spawn(async move {let permit = builders.acquire().await.unwrap();use crate::db::jobs::dsl as jobs;let id = diesel::insert_into(jobs::jobs).values((jobs::repo.eq(repo),)).returning(jobs::id).get_result::<uuid::Uuid>(&mut db.get().await.unwrap()).await.unwrap();let mut tmp_dir = tempfile::tempdir()?;tmp_dir.disable_cleanup(true);let wc = libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path(),);tokio::task::spawn_blocking(move || {libpijul::output::output_repository_no_pending(&wc, &changes, &txn, &channel, "", true, None, 1, 0,)?;Ok::<_, Error>(())}).await.unwrap().unwrap();use std::process::Stdio;let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();let p = tmp_dir.path().join(depl);debug!("launching {:?}", p);async fn deploy(&self,id: uuid::Uuid,tmp_dir: tempfile::TempDir,depl: String,) -> Result<(), Error> {use std::process::Stdio;let (status_tx, status_rx) = tokio::sync::watch::channel(None);let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();let p = tmp_dir.path().join(depl);debug!("launching {:?}", p); - replacement in api/src/replication.rs at line 371[4.820]→[4.820:890](∅→∅),[4.890]→[2.24620:24831](∅→∅),[2.24620]→[2.24620:24831](∅→∅),[2.24831]→[4.891:967](∅→∅)
let mut cmd = tokio::process::Command::new(p).current_dir(tmp_dir.path()).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap();let mut cmd = tokio::process::Command::new(p).current_dir(tmp_dir.path()).stderr(Stdio::piped()).stdout(Stdio::piped()).stdin(Stdio::null()).spawn().unwrap(); - replacement in api/src/replication.rs at line 379
use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let mut stdout_ok = true;let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines();use tokio::io::AsyncBufReadExt;let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());let mut stdout = stdout.lines();let mut stdout_ok = true;let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());let mut stderr = stderr.lines(); - replacement in api/src/replication.rs at line 386
jobs.lock().unwrap().insert(id, (kill_tx, status_tx.clone(), status_rx));self.jobs.lock().unwrap().insert(id, (kill_tx, status_tx.clone(), status_rx)); - replacement in api/src/replication.rs at line 391[2.25433]→[2.25433:25812](∅→∅),[2.25812]→[3.0:347](∅→∅),[3.347]→[4.968:1058](∅→∅),[4.1058]→[3.392:624](∅→∅),[3.392]→[3.392:624](∅→∅),[3.624]→[4.1059:1149](∅→∅),[4.1149]→[3.669:793](∅→∅),[3.669]→[3.669:793](∅→∅),[3.793]→[2.26613:27608](∅→∅),[2.26613]→[2.26613:27608](∅→∅)
let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);let mut files = if let Some(ref path) = ci.filesystem {Some((OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stderr", id))).await.unwrap(),))} else {None};while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len());tokio::select! {line = stdout.next_line(), if stdout_ok => {let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len()} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());let mut stderr_ok = true;let mut buf_stdout = String::new();let mut last_stdout = std::time::UNIX_EPOCH;let mut buf_stderr = String::new();let mut last_stderr = std::time::UNIX_EPOCH;let bound = std::time::Duration::from_secs(1);let mut files = if let Some(ref path) = self.ci.filesystem {Some((OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stdout", id))).await.unwrap(),OpenOptions::new().append(true).create(true).open(&path.join(&format!("{}.stderr", id))).await.unwrap(),))} else {None};while stdout_ok || stderr_ok {debug!("stdout || stderr {:?} {:?}",buf_stdout.len(),buf_stderr.len());tokio::select! {line = stdout.next_line(), if stdout_ok => {let n = if let Some(line) = line? {buf_stdout.push_str(&line);buf_stdout.push('\n');line.len()} else {0};if last_stdout.elapsed().unwrap() >= bound || n == 0 {debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len()); - replacement in api/src/replication.rs at line 433[2.27609]→[2.27609:27784](∅→∅),[2.27784]→[4.1150:1217](∅→∅),[4.1217]→[2.27784:29115](∅→∅),[2.27784]→[2.27784:29115](∅→∅),[2.29115]→[4.1218:1285](∅→∅),[4.1285]→[2.29115:29617](∅→∅),[2.29115]→[2.29115:29617](∅→∅)
if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?;}buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {let n = if let Some(line) = line ?{buf_stderr.push_str(&line);buf_stderr.push('\n');line.len()} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false}}}if let Some((ref mut stdout, _)) = files {stdout.write_all(buf_stdout.as_bytes()).await?;stdout.flush().await?; - replacement in api/src/replication.rs at line 437
let status = tokio::select! {status = cmd.wait() => {status?.code()}_ = kill_rx => {cmd.kill().await?;None}};debug!("process exited with {:?}", status);debug!("stderr {}", buf_stderr);debug!("stdout {}", buf_stdout);let now = chrono::Utc::now();diesel::update(jobs::jobs.find(id)).set((jobs::status.eq(status), jobs::ended.eq(&now))).execute(&mut db.get().await.unwrap()).await?;status_tx.send(Some((now, status))).unwrap();jobs.lock().unwrap().remove(&id);std::mem::drop(permit);Ok::<_, Error>(())});buf_stdout.clear();debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());last_stdout = std::time::SystemTime::now();}if n == 0 {stdout_ok = false}}line = stderr.next_line(), if stderr_ok => {let n = if let Some(line) = line ?{buf_stderr.push_str(&line);buf_stderr.push('\n');line.len()} else {0};if last_stderr.elapsed().unwrap() >= bound || n == 0 {debug!("sending stderr to db {:?}", buf_stderr.len());if let Some((_, ref mut stderr)) = files {stderr.write_all(buf_stderr.as_bytes()).await?;stderr.flush().await?;}buf_stderr.clear();last_stderr = std::time::SystemTime::now();}debug!("{:?}", buf_stderr.len());if n == 0 {stderr_ok = false} - edit in api/src/replication.rs at line 468
} else {debug!("No pijul.toml"); - replacement in api/src/replication.rs at line 469
Ok(())let status = tokio::select! {status = cmd.wait() => {status?.code()}_ = kill_rx => {cmd.kill().await?;None}};debug!("process exited with {:?}", status);debug!("stderr {}", buf_stderr);debug!("stdout {}", buf_stdout);let now = chrono::Utc::now();use crate::db::jobs::dsl as jobs;diesel::update(jobs::jobs.find(id)).set((jobs::status.eq(status), jobs::ended.eq(&now))).execute(&mut self.db.get().await.unwrap()).await?;status_tx.send(Some((now, status))).unwrap();self.jobs.lock().unwrap().remove(&id);Ok::<_, Error>(())