Removing locks when deploying

pmeunier
Apr 25, 2026, 5:37 PM
W4LOGDLDBZIUUNACCXWWUZGVEGWAXYAZI3C66HYHURFQ3CWZ7XKAC

Dependencies

Change contents

  • replacement in api/src/replication.rs at line 152
    [2.20400][2.20400:20859]()
    let pri = repo_.pristine.blocking_write();
    let txn = pri.arc_txn_begin()?;
    let channel = format!("{}_{}", repo, channel);
    let channel_ = {
    let mut txn_ = txn.write();
    txn_.open_or_create_channel(&channel)?
    };
    s.deploy(&txn, &channel_, &repo_.changes, repo)?;
    [2.20400]
    [2.20859]
    if let Some((temp, depl)) = {
    let pri = repo_.pristine.blocking_write();
    let txn = pri.arc_txn_begin()?;
    let channel = format!("{}_{}", repo, channel);
    let channel_ = {
    let mut txn_ = txn.write();
    txn_.open_or_create_channel(&channel)?
    };
    let changes = repo_.changes.clone();
    s.output_for_deployment(&txn, &channel_, &changes)?
    } {
    tokio::spawn(async move {
    use crate::db::jobs::dsl as jobs;
    let id = diesel::insert_into(jobs::jobs)
    .values((jobs::repo.eq(repo),))
    .returning(jobs::id)
    .get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap())
    .await
    .unwrap();
    let permit = s.builders.acquire().await.unwrap();
    {
    s.deploy(id, temp, depl).await?;
    }
    std::mem::drop(permit);
    Ok::<_, Error>(())
    });
    }
  • replacement in api/src/replication.rs at line 329
    [2.22273][2.22273:22288]()
    fn deploy<
    [2.22273]
    [2.22288]
    fn output_for_deployment<
  • replacement in api/src/replication.rs at line 336
    [2.22566][2.22566:22621]()
    repo: uuid::Uuid,
    ) -> Result<(), Error> {
    [2.22566]
    [2.22621]
    ) -> Result<Option<(tempfile::TempDir, String)>, Error> {
  • edit in api/src/replication.rs at line 343
    [2.22870][2.22870:22916]()
    let db = self.db.clone();
  • replacement in api/src/replication.rs at line 346
    [2.23061][2.23061:23215]()
    let jobs = self.jobs.clone();
    let builders = self.builders.clone();
    let ci = self.ci.clone();
    [2.23061]
    [2.23215]
    let tmp_dir = tempfile::tempdir()?;
    let wc =
    libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());
    libpijul::output::output_repository_no_pending(
    &wc, &changes, &txn, &channel, "", true, None, 1, 0,
    )?;
    return Ok(Some((tmp_dir, depl)));
    }
    }
    }
    Ok(None)
    }
  • replacement in api/src/replication.rs at line 359
    [2.23216][2.23216:23650](),[2.23650][4.442:516](),[4.516][2.23687:23688](),[2.23687][2.23687:23688](),[2.23688][4.517:636](),[4.636][2.23748:24271](),[2.23748][2.23748:24271](),[2.24271][4.637:706](),[4.706][2.24307:24526](),[2.24307][2.24307:24526](),[2.24526][4.707:819]()
    tokio::spawn(async move {
    let permit = builders.acquire().await.unwrap();
    use crate::db::jobs::dsl as jobs;
    let id = diesel::insert_into(jobs::jobs)
    .values((jobs::repo.eq(repo),))
    .returning(jobs::id)
    .get_result::<uuid::Uuid>(&mut db.get().await.unwrap())
    .await
    .unwrap();
    let mut tmp_dir = tempfile::tempdir()?;
    tmp_dir.disable_cleanup(true);
    let wc = libpijul::working_copy::filesystem::FileSystem::from_root(
    tmp_dir.path(),
    );
    tokio::task::spawn_blocking(move || {
    libpijul::output::output_repository_no_pending(
    &wc, &changes, &txn, &channel, "", true, None, 1, 0,
    )?;
    Ok::<_, Error>(())
    })
    .await
    .unwrap()
    .unwrap();
    use std::process::Stdio;
    let (status_tx, status_rx) = tokio::sync::watch::channel(None);
    let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
    let p = tmp_dir.path().join(depl);
    debug!("launching {:?}", p);
    [2.23216]
    [4.819]
    async fn deploy(
    &self,
    id: uuid::Uuid,
    tmp_dir: tempfile::TempDir,
    depl: String,
    ) -> Result<(), Error> {
    use std::process::Stdio;
    let (status_tx, status_rx) = tokio::sync::watch::channel(None);
    let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
    let p = tmp_dir.path().join(depl);
    debug!("launching {:?}", p);
  • replacement in api/src/replication.rs at line 371
    [4.820][4.820:890](),[4.890][2.24620:24831](),[2.24620][2.24620:24831](),[2.24831][4.891:967]()
    let mut cmd = tokio::process::Command::new(p)
    .current_dir(tmp_dir.path())
    .stderr(Stdio::piped())
    .stdout(Stdio::piped())
    .stdin(Stdio::null())
    .spawn()
    .unwrap();
    [4.820]
    [2.24870]
    let mut cmd = tokio::process::Command::new(p)
    .current_dir(tmp_dir.path())
    .stderr(Stdio::piped())
    .stdout(Stdio::piped())
    .stdin(Stdio::null())
    .spawn()
    .unwrap();
  • replacement in api/src/replication.rs at line 379
    [2.24871][2.24871:25275]()
    use tokio::io::AsyncBufReadExt;
    let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
    let mut stdout = stdout.lines();
    let mut stdout_ok = true;
    let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
    let mut stderr = stderr.lines();
    [2.24871]
    [2.25275]
    use tokio::io::AsyncBufReadExt;
    let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
    let mut stdout = stdout.lines();
    let mut stdout_ok = true;
    let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
    let mut stderr = stderr.lines();
  • replacement in api/src/replication.rs at line 386
    [2.25276][2.25276:25432]()
    jobs.lock()
    .unwrap()
    .insert(id, (kill_tx, status_tx.clone(), status_rx));
    [2.25276]
    [2.25432]
    self.jobs
    .lock()
    .unwrap()
    .insert(id, (kill_tx, status_tx.clone(), status_rx));
  • replacement in api/src/replication.rs at line 391
    [2.25433][2.25433:25812](),[2.25812][3.0:347](),[3.347][4.968:1058](),[4.1058][3.392:624](),[3.392][3.392:624](),[3.624][4.1059:1149](),[4.1149][3.669:793](),[3.669][3.669:793](),[3.793][2.26613:27608](),[2.26613][2.26613:27608]()
    let mut stderr_ok = true;
    let mut buf_stdout = String::new();
    let mut last_stdout = std::time::UNIX_EPOCH;
    let mut buf_stderr = String::new();
    let mut last_stderr = std::time::UNIX_EPOCH;
    let bound = std::time::Duration::from_secs(1);
    let mut files = if let Some(ref path) = ci.filesystem {
    Some((
    OpenOptions::new()
    .append(true)
    .create(true)
    .open(&path.join(&format!("{}.stdout", id)))
    .await
    .unwrap(),
    OpenOptions::new()
    .append(true)
    .create(true)
    .open(&path.join(&format!("{}.stderr", id)))
    .await
    .unwrap(),
    ))
    } else {
    None
    };
    while stdout_ok || stderr_ok {
    debug!(
    "stdout || stderr {:?} {:?}",
    buf_stdout.len(),
    buf_stderr.len()
    );
    tokio::select! {
    line = stdout.next_line(), if stdout_ok => {
    let n = if let Some(line) = line? {
    buf_stdout.push_str(&line);
    buf_stdout.push('\n');
    line.len()
    } else {
    0
    };
    if last_stdout.elapsed().unwrap() >= bound || n == 0 {
    debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
    [2.25433]
    [2.27608]
    let mut stderr_ok = true;
    let mut buf_stdout = String::new();
    let mut last_stdout = std::time::UNIX_EPOCH;
    let mut buf_stderr = String::new();
    let mut last_stderr = std::time::UNIX_EPOCH;
    let bound = std::time::Duration::from_secs(1);
    let mut files = if let Some(ref path) = self.ci.filesystem {
    Some((
    OpenOptions::new()
    .append(true)
    .create(true)
    .open(&path.join(&format!("{}.stdout", id)))
    .await
    .unwrap(),
    OpenOptions::new()
    .append(true)
    .create(true)
    .open(&path.join(&format!("{}.stderr", id)))
    .await
    .unwrap(),
    ))
    } else {
    None
    };
    while stdout_ok || stderr_ok {
    debug!(
    "stdout || stderr {:?} {:?}",
    buf_stdout.len(),
    buf_stderr.len()
    );
    tokio::select! {
    line = stdout.next_line(), if stdout_ok => {
    let n = if let Some(line) = line? {
    buf_stdout.push_str(&line);
    buf_stdout.push('\n');
    line.len()
    } else {
    0
    };
    if last_stdout.elapsed().unwrap() >= bound || n == 0 {
    debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
  • replacement in api/src/replication.rs at line 433
    [2.27609][2.27609:27784](),[2.27784][4.1150:1217](),[4.1217][2.27784:29115](),[2.27784][2.27784:29115](),[2.29115][4.1218:1285](),[4.1285][2.29115:29617](),[2.29115][2.29115:29617]()
    if let Some((ref mut stdout, _)) = files {
    stdout.write_all(buf_stdout.as_bytes()).await?;
    stdout.flush().await?;
    }
    buf_stdout.clear();
    debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
    last_stdout = std::time::SystemTime::now();
    }
    if n == 0 {
    stdout_ok = false
    }
    }
    line = stderr.next_line(), if stderr_ok => {
    let n = if let Some(line) = line ?{
    buf_stderr.push_str(&line);
    buf_stderr.push('\n');
    line.len()
    } else {
    0
    };
    if last_stderr.elapsed().unwrap() >= bound || n == 0 {
    debug!("sending stderr to db {:?}", buf_stderr.len());
    if let Some((_, ref mut stderr)) = files {
    stderr.write_all(buf_stderr.as_bytes()).await?;
    stderr.flush().await?;
    }
    buf_stderr.clear();
    last_stderr = std::time::SystemTime::now();
    }
    debug!("{:?}", buf_stderr.len());
    if n == 0 {
    stderr_ok = false
    }
    }
    }
    [2.27609]
    [2.29617]
    if let Some((ref mut stdout, _)) = files {
    stdout.write_all(buf_stdout.as_bytes()).await?;
    stdout.flush().await?;
  • replacement in api/src/replication.rs at line 437
    [2.29643][2.29643:30743]()
    let status = tokio::select! {
    status = cmd.wait() => {
    status?.code()
    }
    _ = kill_rx => {
    cmd.kill().await?;
    None
    }
    };
    debug!("process exited with {:?}", status);
    debug!("stderr {}", buf_stderr);
    debug!("stdout {}", buf_stdout);
    let now = chrono::Utc::now();
    diesel::update(jobs::jobs.find(id))
    .set((jobs::status.eq(status), jobs::ended.eq(&now)))
    .execute(&mut db.get().await.unwrap())
    .await?;
    status_tx.send(Some((now, status))).unwrap();
    jobs.lock().unwrap().remove(&id);
    std::mem::drop(permit);
    Ok::<_, Error>(())
    });
    [2.29643]
    [2.30743]
    buf_stdout.clear();
    debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
    last_stdout = std::time::SystemTime::now();
    }
    if n == 0 {
    stdout_ok = false
    }
    }
    line = stderr.next_line(), if stderr_ok => {
    let n = if let Some(line) = line ?{
    buf_stderr.push_str(&line);
    buf_stderr.push('\n');
    line.len()
    } else {
    0
    };
    if last_stderr.elapsed().unwrap() >= bound || n == 0 {
    debug!("sending stderr to db {:?}", buf_stderr.len());
    if let Some((_, ref mut stderr)) = files {
    stderr.write_all(buf_stderr.as_bytes()).await?;
    stderr.flush().await?;
    }
    buf_stderr.clear();
    last_stderr = std::time::SystemTime::now();
    }
    debug!("{:?}", buf_stderr.len());
    if n == 0 {
    stderr_ok = false
    }
  • edit in api/src/replication.rs at line 468
    [2.30775][2.30775:30829]()
    } else {
    debug!("No pijul.toml");
  • replacement in api/src/replication.rs at line 469
    [2.30839][2.30839:30854]()
    Ok(())
    [2.30839]
    [2.30854]
    let status = tokio::select! {
    status = cmd.wait() => {
    status?.code()
    }
    _ = kill_rx => {
    cmd.kill().await?;
    None
    }
    };
    debug!("process exited with {:?}", status);
    debug!("stderr {}", buf_stderr);
    debug!("stdout {}", buf_stdout);
    let now = chrono::Utc::now();
    use crate::db::jobs::dsl as jobs;
    diesel::update(jobs::jobs.find(id))
    .set((jobs::status.eq(status), jobs::ended.eq(&now)))
    .execute(&mut self.db.get().await.unwrap())
    .await?;
    status_tx.send(Some((now, status))).unwrap();
    self.jobs.lock().unwrap().remove(&id);
    Ok::<_, Error>(())