server.rs
use elpe::extract::*;
use elpe::*;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender;
use tonic::codegen::tokio_stream::StreamExt;
use tracing::*;
pub struct Elpe {
deb_client: elpe::Client,
sender: tokio::sync::mpsc::UnboundedSender<(
crate::container::BuildRequest,
tokio::sync::mpsc::Sender<crate::container::Msg>,
)>,
t: Option<tokio::task::JoinHandle<Result<(), elpe::Error>>>,
}
pub mod proto {
tonic::include_proto!("elpe");
}
impl Elpe {
pub fn new(
deb_client: elpe::Client,
container_channel: elpe::container::ContainerChannel,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(
crate::container::BuildRequest,
tokio::sync::mpsc::Sender<crate::container::Msg>,
)>();
let t = tokio::spawn(crate::container::forward(receiver, container_channel));
Elpe {
deb_client,
sender,
t: Some(t),
}
}
pub async fn serve(mut self, addr: std::net::SocketAddr) {
let t = self.t.take().unwrap();
tokio::select! {
_ = tonic::transport::Server::builder()
.add_service(proto::elpe_server::ElpeServer::new(self))
.serve(addr)
=> {}
_ = t => {}
}
}
}
use std::pin::Pin;
type ResponseStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>;
#[tonic::async_trait]
impl proto::elpe_server::Elpe for Elpe {
async fn handshake(
&self,
request: tonic::Request<proto::Empty>,
) -> Result<tonic::Response<proto::PlatformReply>, tonic::Status> {
Ok(proto::PlatformReply {
endianness: if cfg!(target_endian = "big") { 0 } else { 1 },
pointer_width: std::mem::size_of::<usize>() as i32,
arch: match std::env::consts::ARCH {
"x86_64" => 0,
"aarch64" => 1,
_ => unimplemented!(),
},
}
.into())
}
async fn add_path(
&self,
request: tonic::Request<tonic::Streaming<proto::AddPathRequest>>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
let mut r = request.into_inner();
debug!("add_path");
let mut current_file = None;
let ref store = self.deb_client.store_path();
debug!("store");
let tmp_dir = tempfile::tempdir_in(store).unwrap();
debug!("store {:?}", tmp_dir);
let mut hasher = blake3::Hasher::new();
debug!("loop");
loop {
trace!("waiting for next in stream");
let Some(r) = r.next().await else { break };
let r = r.unwrap();
match r.request {
Some(proto::add_path_request::Request::File(f)) => {
info!("Adding file {:?}", f.name);
hasher.update(b"\0f");
hasher.update(f.name.as_bytes());
hasher.update(b"\0");
let p = tmp_dir.path().join(&f.name);
tokio::fs::create_dir_all(p.parent().unwrap()).await?;
current_file = Some(tokio::fs::File::create(&p).await?)
}
Some(proto::add_path_request::Request::Directory(d)) => {
info!("Adding file {:?}", d.name);
hasher.update(b"\0d");
hasher.update(d.name.as_bytes());
hasher.update(b"\0");
let p = tmp_dir.path().join(&d.name);
tokio::fs::create_dir_all(&p).await.unwrap();
}
Some(proto::add_path_request::Request::Contents(c)) => {
hasher.update(&c.content);
current_file.as_mut().unwrap().write_all(&c.content).await?;
}
None => break,
}
}
debug!("loop done");
let path = store.join(data_encoding::HEXLOWER.encode(hasher.finalize().as_bytes()));
use tokio::io::ErrorKind;
let new = tmp_dir.into_path();
match tokio::fs::rename(&new, &path).await {
Ok(()) => (),
Err(e) if e.kind() == ErrorKind::DirectoryNotEmpty => (),
Err(e) => {
tokio::fs::remove_dir_all(&new).await?;
return Err(e.into());
}
}
info!("add_path extracted to {:?}", path);
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![path.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
}))
}
type DerivationStream = ResponseStream;
async fn derivation(
&self,
request: tonic::Request<proto::DerivationRequest>,
) -> Result<tonic::Response<ResponseStream>, tonic::Status> {
debug!("derivation request");
let now = std::time::Instant::now();
let r = request.into_inner();
let (tx, rx) = tokio::sync::mpsc::channel(200);
debug!("derivation request: {:?} {:?}", r.name, r.paths);
self.sender
.send((
crate::container::BuildRequest {
name: r.name.clone(),
paths: r.paths,
script: r.builder,
target: r.target,
output_hash: r.output_hash,
},
tx,
))
.unwrap();
use crate::container::Msg;
let output_stream = ReceiverStream::new(rx).map(|x| {
Ok(match x {
Msg::Ok(out) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: out
.into_iter()
.map(|x| x.to_str().unwrap().to_string())
.collect(),
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
},
Msg::Error(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Error(p)),
},
Msg::Stdout(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Stdout(p)),
},
Msg::Stderr(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Stderr(p)),
},
})
});
info!("request {:?}: {:?}", r.name, now.elapsed());
Ok(tonic::Response::new(Box::pin(output_stream)))
}
async fn add_url(
&self,
request: tonic::Request<proto::AddUrlRequest>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
let r = request.into_inner();
debug!("add_url request {:?}", r);
let p = self
.deb_client
.http_download(
&r.url,
match r.hash_algorithm {
0 => {
let mut h = [0; 32];
h.clone_from_slice(&r.hash);
Hash::Blake3(h)
}
1 => {
let mut h = [0; 32];
h.clone_from_slice(&r.hash);
Hash::Sha256(h)
}
2 => {
let mut h = [0; 64];
h.clone_from_slice(&r.hash);
Hash::Sha512(h)
}
_ => unreachable!(),
},
)
.await
.unwrap();
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![p.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
}))
}
async fn ubuntu_release(
&self,
request: tonic::Request<proto::UbuntuReleaseRequest>,
) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
debug!("ubuntu release {:?}", request);
let now = std::time::Instant::now();
let r = request.into_inner();
let h = self.deb_client.in_release(r.release.clone()).await.unwrap();
let arch = match r.arch {
0 => "amd64",
1 => "aarch64",
_ => unreachable!(),
};
let p = self
.deb_client
.packages(&h, &r.repository, arch)
.await
.unwrap();
info!(
"ubuntu release request {:?}: {:?}",
r.release,
now.elapsed()
);
Ok(tonic::Response::new(proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: vec![p.to_str().unwrap().to_string()],
paths: Vec::new(),
path_patterns: Vec::new(),
},
)),
}))
}
type UbuntuPackageStream = ResponseStream;
async fn ubuntu_package(
&self,
request: tonic::Request<proto::UbuntuPackageRequest>,
) -> Result<tonic::Response<Self::UbuntuPackageStream>, tonic::Status> {
let now = std::time::Instant::now();
let r = request.into_inner();
let index: Result<Vec<_>, _> = r
.index
.into_iter()
.map(|index| deb::Index::open(&index))
.collect();
let index = index.unwrap();
let link_extra: Vec<_> = r
.link_extra
.into_iter()
.map(|l| {
(
regex::Regex::new(&l.pkg).unwrap(),
regex::Regex::new(&l.dep).unwrap(),
)
})
.collect();
let (tx, rx) = tokio::sync::mpsc::channel(200);
use crate::extract::Msg;
let output_stream = ReceiverStream::new(rx).map(|x| {
Ok(match x {
Msg::Downloading(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Loading(p)),
},
Msg::Ok(p) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Ok(
proto::DerivationResult {
destdir: p
.result
.iter()
.rev()
.map(|x| x.to_str().unwrap().to_string())
.collect(),
paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
path_patterns: Vec::new(),
},
)),
},
Msg::Error(e) => proto::DerivationReply {
result: Some(proto::derivation_reply::Result::Error(e.to_string())),
},
})
});
match download_extract_deps(
&index,
&self.deb_client,
&r.name,
&link_extra,
PollSender::new(tx.clone()),
)
.await
{
Ok(p) => {
info!("path {:?} {:#?}", r.name, p);
tx.send(Msg::Ok(p)).await.unwrap();
}
Err(e) => tx.send(Msg::Error(e)).await.unwrap(),
}
info!("ubuntu package request {:?}: {:?}", r.name, now.elapsed());
Ok(tonic::Response::new(Box::pin(output_stream)))
}
}