replication.rs
use crate::repository::*;
use ::replication::Update;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use pijul_core::changestore::ChangeStore;
use pijul_core::fs::FsErrorC;
use pijul_core::output::{FileError, OutputError};
use pijul_core::pristine::sanakirja::MutTxn0;
use pijul_core::pristine::sanakirja::SanakirjaError;
use pijul_core::pristine::{ForkError, TreeErr, TxnErr};
use pijul_core::{
ApplyError, ArcTxn, Base32, ChannelMutTxnT, MutTxnT, MutTxnTExt, TxnT, TxnTExt, UnrecordError,
};
use serde_derive::*;
use std::pin::Pin;
use std::process::Stdio;
use thiserror::*;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tracing::*;
#[derive(Clone)]
pub struct H {
ci: crate::config_file::CiConfig,
jobs: crate::config::Jobs,
locks: RepositoryLocks,
db: crate::config::Db,
builders: std::sync::Arc<tokio::sync::Semaphore>,
client: reqwest::Client,
}
impl H {
pub fn new(
ci: crate::config_file::CiConfig,
jobs: crate::config::Jobs,
locks: RepositoryLocks,
db: crate::config::Db,
builders: std::sync::Arc<tokio::sync::Semaphore>,
) -> Self {
H {
ci,
jobs,
locks,
db,
builders,
client: reqwest::Client::new(),
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Sk(#[from] SanakirjaError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Txn(#[from] TxnErr<SanakirjaError>),
#[error(transparent)]
Tree(#[from] TreeErr<SanakirjaError>),
#[error(transparent)]
File(#[from] FileError<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Fs(#[from] FsErrorC<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Fork(#[from] ForkError<SanakirjaError>),
#[error(transparent)]
Unrec(#[from] UnrecordError<crate::repository::changestore::Error, std::io::Error, MutTxn0>),
#[error(transparent)]
Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Output(#[from] OutputError<crate::repository::changestore::Error, MutTxn0, std::io::Error>),
#[error("Join error")]
Join,
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Utf8(#[from] std::string::FromUtf8Error),
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
}
impl ::replication::Handler for H {
type Error = Error;
type F =
Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;
fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
let s = self.clone();
Box::pin(async move {
match update {
Update::Change { .. } => {}
Update::Apply {
repo,
channel,
hash,
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let txn = pri.arc_txn_begin()?;
let channel_ = format!("{}_{}", repo, channel);
let mut txn_ = txn.write();
let channel_ = txn_.open_or_create_channel(&channel_)?;
let mut channel__ = channel_.write();
let result = txn_.apply_change(&repo_.changes, &mut *channel__, &hash);
match result {
Err(pijul_core::ApplyError::LocalChange(
pijul_core::LocalApplyError::ChangeAlreadyOnChannel { .. },
)) => {
error!(
"ignored error, apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(None);
}
Err(pijul_core::ApplyError::LocalChange(
pijul_core::LocalApplyError::DependencyMissing { hash },
)) => {
error!(
"apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(Some(
pijul_core::ApplyError::LocalChange(
pijul_core::LocalApplyError::DependencyMissing { hash },
)
.into(),
));
}
Ok(_) => {
debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
txn_.touch_channel(&mut *channel__, None);
std::mem::drop(channel__);
std::mem::drop(txn_);
txn.commit()?;
Ok::<_, Error>(None)
})
.await
{
Ok(Ok(Some(x))) => return Ok(Some(x)),
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::Eof { repo, channel } => {
if cfg!(feature = "jobs") {
if let Some((state, deployment)) =
get_config_state(&s.locks, repo, channel).await?
{
debug!("{:?} {:?}", state, deployment);
let mut db = s.db.get().await.unwrap();
use crate::db::jobs::dsl as jobs;
let id = diesel::insert_into(jobs::jobs)
.values((
jobs::repo.eq(repo),
jobs::repo_state.eq(state.to_base32()),
))
.returning(jobs::id)
.get_result::<uuid::Uuid>(&mut db)
.await
.unwrap();
use crate::db::repositories::dsl as repositories;
use crate::db::users::dsl as users;
let (owner, repo) = repositories::repositories
.find(repo)
.inner_join(users::users)
.select((users::login, repositories::name))
.get_result::<(String, String)>(&mut db)
.await
.unwrap();
let mut targets = std::collections::HashMap::new();
targets.insert(deployment.clone(), format!(".#{}", deployment));
let body = serde_json::to_string(&ci::Trigger {
id,
owner,
repo,
state: state.to_base32(),
targets,
})
.unwrap();
debug!("{:?} {}", s.ci.url, body);
for (n, ci) in s.ci.url.iter().enumerate() {
let res = s
.client
.post(ci.clone() + "/trigger")
.header("Content-Type", "application/json")
.body(body.clone())
.send()
.await
.unwrap();
debug!("{:?}", res);
if let reqwest::StatusCode::OK = res.status() {
tokio::spawn(async move {
sync_job(&mut db, s.client, &s.ci, n, id).await;
});
break;
}
}
}
}
}
Update::Unrecord {
repo,
channel,
hash,
..
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let mut channel_ = txn.open_or_create_channel(&channel)?;
let result = txn.unrecord(
&repo_.changes,
&mut channel_,
&hash,
0,
&pijul_core::working_copy::sink(),
);
match result {
Err(pijul_core::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
let mut in_other_channels = false;
for ch in txn.channels("")? {
if txn.get_revchanges(&ch, &hash.into())?.is_some() {
in_other_channels = true;
break;
}
}
txn.touch_channel(&mut *channel_.write(), None);
txn.commit()?;
Ok::<_, Error>(in_other_channels)
})
.await
{
Ok(Ok(false)) => {
let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
pijul_core::changestore::filesystem::push_filename(&mut p, &hash);
if let Ok(meta) = tokio::fs::metadata(&p).await {
std::fs::remove_file(&p).unwrap_or(());
std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
if is_source {
crate::repository::free_used_storage(
&mut *s.db.get().await.unwrap(),
repo,
meta.len(),
)
.await
.unwrap();
}
}
}
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::NewChannel { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.open_or_create_channel(&channel)?;
txn.commit()?;
}
Update::Fork { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let chan = txn.open_or_create_channel(&channel)?;
let new = format!("{}_{}", repo, new);
match txn.fork(&chan, &new) {
Ok(_) => txn.commit()?,
Err(ForkError::ChannelNameExists(_)) => {}
Err(e) => return Err(e.into()),
}
}
Update::Prune { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.drop_channel(&channel)?;
txn.commit()?;
}
Update::Rename { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
if let Some(mut c) = txn.load_channel(&channel)? {
let new = format!("{}_{}", repo, new);
debug!("rename {:?} to {:?}", channel, new);
txn.rename_channel(&mut c, &new)?;
txn.commit()?;
} else {
debug!("not rename");
}
}
}
Ok(None)
})
}
}
#[derive(Debug, Deserialize)]
struct Config {
deployment: Option<String>,
}
fn get_file<C: ChangeStore<Error = crate::repository::changestore::Error>>(
txn: &ArcTxn<pijul_core::pristine::sanakirja::MutTxn0>,
channel: &pijul_core::ChannelRef<pijul_core::pristine::sanakirja::MutTxn0>,
changes: &C,
path: &str,
) -> Result<Option<String>, Error> {
let (pos, is_dir) = txn.read().follow_oldest_path(changes, channel, path)?;
if is_dir {
return Ok(None);
}
let mut out = crate::repository::RawVertexBuf { out: Vec::new() };
use pijul_core::ChannelTxnT;
let mut graph = {
let txn = txn.read();
pijul_core::alive::retrieve(&*txn, txn.graph(&*channel.read()), pos, false)?
};
let mut forward = Vec::new();
pijul_core::alive::output_graph(changes, &txn, &channel, &mut out, &mut graph, &mut forward)
.map_err(|x| Error::File(x))?;
debug!("{:?}", out);
Ok(Some(String::from_utf8(out.out)?))
}
pub async fn get_config_state(
locks: &RepositoryLocks,
repo: uuid::Uuid,
channel: String,
) -> Result<Option<(pijul_core::Merkle, String)>, Error> {
debug!("get_config_state");
let repo_ = locks.get(&repo).await.unwrap();
tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let txn_ = pri.arc_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
if let Some(channel) = txn_.read().load_channel(&channel)? {
if let Some(config) = get_file(&txn_, &channel, &repo_.changes, "pijul.toml")? {
debug!("config = {:?}", config);
if let Ok(parsed) = toml::from_str::<Config>(&config) {
if let Some(dep) = parsed.deployment {
if let Some(Ok((_, (_, state)))) =
txn_.read().reverse_log(&channel.read(), None)?.next()
{
return Ok(Some((state.into(), dep)));
}
}
}
};
}
Ok(None)
})
.await
.unwrap()
}
pub async fn sync_job(
db: &mut diesel_async::AsyncPgConnection,
client: reqwest::Client,
ci: &crate::config_file::CiConfig,
ci_n: usize,
job: uuid::Uuid,
) {
let mut files = if let Some(ref f) = ci.filesystem {
let stdout = OpenOptions::new()
.append(true)
.create(true)
.open(&f.join(&format!("{}.stdout", job)))
.await
.unwrap();
let stderr = OpenOptions::new()
.append(true)
.create(true)
.open(&f.join(&format!("{}.stderr", job)))
.await
.unwrap();
Some((stdout, stderr))
} else {
None
};
loop {
debug!("Querying status");
let status = client
.get(format!("{}/status/{}", ci.url[ci_n], job))
.send()
.await
.unwrap();
let status = if let reqwest::StatusCode::OK = status.status() {
let status: ci::Status = status.json().await.unwrap();
debug!("STATUS {:?}", status);
use crate::db::jobs::dsl as jobs;
diesel::update(jobs::jobs.find(job))
.set((
jobs::status.eq(status.code),
jobs::ended.eq(status.finished),
))
.execute(db)
.await
.unwrap();
Some(status)
} else {
None
};
if let Some((ref mut stdout, ref mut stderr)) = files {
let stdout_len = stdout.metadata().await.unwrap().len();
let stderr_len = stderr.metadata().await.unwrap().len();
let (stdout_, stderr_) = tokio::join!(
client
.get(format!("{}/stdout/{}", ci.url[ci_n], job))
.header("Range", format!("bytes={stdout_len}-*"))
.send(),
client
.get(format!("{}/stderr/{}", ci.url[ci_n], job))
.header("Range", format!("bytes={stderr_len}-*"))
.send()
);
let stdout_ = stdout_.unwrap();
let stderr_ = stderr_.unwrap();
debug!("{:?} {:?}", stdout_, stderr_);
let bytes = stdout_.bytes().await.unwrap();
stdout.write(&bytes).await.unwrap();
let bytes = stderr_.bytes().await.unwrap();
stderr.write(&bytes).await.unwrap();
}
if let Some(status) = status {
let (mut status_tx, _status_rx) = tokio::sync::watch::channel(None);
let (_kill_tx, mut kill_rx) = tokio::sync::oneshot::channel();
for (script, s) in status.results.iter() {
let mut cmd = tokio::process::Command::new("nix-store");
cmd.arg("-r").arg(&s);
let cmd = cmd
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx)
.await
.unwrap();
}
for (script, s) in status.results.iter() {
debug!("launching {:?} {:?}", s, script);
let cmd = tokio::process::Command::new(format!("{}/bin/{script}", s.display()))
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
ci::dump_cmd(cmd, &mut files, &mut kill_rx, &mut status_tx)
.await
.unwrap();
}
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await
}
}