use clap::*;
use elpe::extract::*;
use elpe::*;
use serde_derive::*;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tonic::codegen::tokio_stream::StreamExt;
use tracing::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub mod proto {
tonic::include_proto!("elpe");
}
pub struct Elpe {
deb_client: elpe::Client,
sender: tokio::sync::mpsc::UnboundedSender<(
crate::container::BuildRequest,
tokio::sync::oneshot::Sender<Result<PathBuf, String>>,
)>,
}
#[tonic::async_trait]
impl proto::elpe_server::Elpe for Elpe {
async fn add_path(
&self,
request: tonic::Request<tonic::Streaming<proto::AddPathRequest>>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
let mut r = request.into_inner();
let mut current_file = None;
let ref store = self.deb_client.store_path();
let tmp_dir = tempfile::tempdir_in(store)?;
let mut hasher = blake3::Hasher::new();
loop {
trace!("waiting for next in stream");
let Some(r) = r.next().await else { break };
let r = r.unwrap();
match r.request {
Some(proto::add_path_request::Request::File(f)) => {
hasher.update(b"\0f");
hasher.update(f.name.as_bytes());
hasher.update(b"\0");
let p = tmp_dir.path().join(&f.name);
tokio::fs::create_dir_all(p.parent().unwrap()).await?;
current_file = Some(tokio::fs::File::create(&p).await?)
}
Some(proto::add_path_request::Request::Directory(d)) => {
hasher.update(b"\0d");
hasher.update(d.name.as_bytes());
hasher.update(b"\0");
let p = tmp_dir.path().join(&d.name);
tokio::fs::create_dir_all(&p).await.unwrap();
}
Some(proto::add_path_request::Request::Contents(c)) => {
hasher.update(&c.content);
current_file.as_mut().unwrap().write_all(&c.content).await?;
}
None => break,
}
}
let path = store.join(data_encoding::HEXLOWER.encode(hasher.finalize().as_bytes()));
use tokio::io::ErrorKind;
let new = tmp_dir.into_path();
match tokio::fs::rename(&new, &path).await {
Ok(()) => (),
Err(e) if e.kind() == ErrorKind::DirectoryNotEmpty => (),
Err(e) => {
tokio::fs::remove_dir_all(&new).await?;
return Err(e.into());
}
}
info!("add_path extracted to {:?}", path);
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![path.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
}))
}
async fn derivation(
&self,
request: tonic::Request<proto::DerivationRequest>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
debug!("derivation request");
let r = request.into_inner();
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
.send((
crate::container::BuildRequest {
name: r.name,
paths: r.paths,
script: r.builder,
target: r.target,
output_hash: r.output_hash,
},
tx,
))
.unwrap();
Ok(tonic::Response::new(match rx.await.unwrap() {
Ok(out) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![out.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
},
Err(e) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Error(e)),
},
}))
}
async fn ubuntu_release(
&self,
request: tonic::Request<proto::UbuntuReleaseRequest>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
debug!("ubuntu release {:?}", request);
let r = request.into_inner();
let h = self.deb_client.in_release(r.release).await.unwrap();
let arch = match r.arch {
0 => "amd64",
1 => "aarch64",
_ => unreachable!(),
};
let p = self
.deb_client
.packages(&h, &r.repository, arch)
.await
.unwrap();
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![p.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
}))
}
async fn ubuntu_package(
&self,
request: tonic::Request<proto::UbuntuPackageRequest>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
debug!("request {:?}", request);
let r = request.into_inner();
let index: Result<Vec<_>, _> = r
.index
.into_iter()
.map(|index| deb::Index::open(&index))
.collect();
let index = index.unwrap();
let p = download_extract_deps(&index, &self.deb_client, &r.name)
.await
.unwrap();
info!("path {:?} {:#?}", r.name, p);
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: p
.result
.iter()
.rev()
.map(|x| x.to_str().unwrap().to_string())
.collect(),
paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
path_patterns: Vec::new(),
},
)),
}))
}
}
#[derive(Deserialize)]
pub struct Config {
pgp_home: PathBuf,
store_path: PathBuf,
package_index: String,
user: String,
}
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
config: PathBuf,
}
fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| String::new().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let args = Args::parse();
let config: Config = toml::from_str(&std::fs::read_to_string(&args.config).unwrap()).unwrap();
let container_channel = crate::container::serve(&config.user, &config.store_path);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
privdrop::PrivDrop::default()
.user(&config.user)
.apply()
.unwrap();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(
crate::container::BuildRequest,
tokio::sync::oneshot::Sender<Result<PathBuf, String>>,
)>();
let t = tokio::spawn(crate::container::forward(receiver, container_channel));
let elpe = Elpe {
deb_client: Client::new(&config.pgp_home, &config.store_path, &config.package_index),
sender,
};
let addr = "0.0.0.0:50051".parse().unwrap();
tokio::select! {
_ = tonic::transport::Server::builder()
.add_service(proto::elpe_server::ElpeServer::new(elpe))
.serve(addr)
=> {}
_ = t => {}
}
})
}