pijul nest
guest [sign in]

Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

server.rs
use elpe::extract::*;
use elpe::*;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender;
use tonic::codegen::tokio_stream::StreamExt;
use tracing::*;

pub struct Elpe {
    deb_client: elpe::Client,
    sender: tokio::sync::mpsc::UnboundedSender<(
        crate::container::BuildRequest,
        tokio::sync::mpsc::Sender<crate::container::Msg>,
    )>,
    t: Option<tokio::task::JoinHandle<Result<(), elpe::Error>>>,
}

pub mod proto {
    tonic::include_proto!("elpe");
}

impl Elpe {
    pub fn new(
        deb_client: elpe::Client,
        container_channel: elpe::container::ContainerChannel,
    ) -> Self {
        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(
            crate::container::BuildRequest,
            tokio::sync::mpsc::Sender<crate::container::Msg>,
        )>();

        let t = tokio::spawn(crate::container::forward(receiver, container_channel));

        Elpe {
            deb_client,
            sender,
            t: Some(t),
        }
    }

    pub async fn serve(mut self, addr: std::net::SocketAddr) {
        let t = self.t.take().unwrap();
        tokio::select! {
        _ =         tonic::transport::Server::builder()
            .add_service(proto::elpe_server::ElpeServer::new(self))
            .serve(addr)
            => {}
        _ = t => {}
           }
    }
}

use std::pin::Pin;

type ResponseStream =
    Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>;

#[tonic::async_trait]
impl proto::elpe_server::Elpe for Elpe {
    async fn handshake(
        &self,
        request: tonic::Request<proto::Empty>,
    ) -> Result<tonic::Response<proto::PlatformReply>, tonic::Status> {
        Ok(proto::PlatformReply {
            endianness: if cfg!(target_endian = "big") { 0 } else { 1 },
            pointer_width: std::mem::size_of::<usize>() as i32,
            arch: match std::env::consts::ARCH {
                "x86_64" => 0,
                "aarch64" => 1,
                _ => unimplemented!(),
            },
        }
        .into())
    }

    async fn add_path(
        &self,
        request: tonic::Request<tonic::Streaming<proto::AddPathRequest>>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
        let mut r = request.into_inner();
        debug!("add_path");
        let mut current_file = None;
        let ref store = self.deb_client.store_path();
        debug!("store");
        let tmp_dir = tempfile::tempdir_in(store).unwrap();
        debug!("store {:?}", tmp_dir);

        let mut hasher = blake3::Hasher::new();
        debug!("loop");
        loop {
            trace!("waiting for next in stream");
            let Some(r) = r.next().await else { break };
            let r = r.unwrap();
            match r.request {
                Some(proto::add_path_request::Request::File(f)) => {
                    info!("Adding file {:?}", f.name);
                    hasher.update(b"\0f");
                    hasher.update(f.name.as_bytes());
                    hasher.update(b"\0");
                    let p = tmp_dir.path().join(&f.name);
                    tokio::fs::create_dir_all(p.parent().unwrap()).await?;
                    current_file = Some(tokio::fs::File::create(&p).await?)
                }
                Some(proto::add_path_request::Request::Directory(d)) => {
                    info!("Adding file {:?}", d.name);
                    hasher.update(b"\0d");
                    hasher.update(d.name.as_bytes());
                    hasher.update(b"\0");
                    let p = tmp_dir.path().join(&d.name);
                    tokio::fs::create_dir_all(&p).await.unwrap();
                }
                Some(proto::add_path_request::Request::Contents(c)) => {
                    hasher.update(&c.content);
                    current_file.as_mut().unwrap().write_all(&c.content).await?;
                }
                None => break,
            }
        }
        debug!("loop done");
        let path = store.join(data_encoding::HEXLOWER.encode(hasher.finalize().as_bytes()));
        use tokio::io::ErrorKind;
        let new = tmp_dir.into_path();
        match tokio::fs::rename(&new, &path).await {
            Ok(()) => (),
            Err(e) if e.kind() == ErrorKind::DirectoryNotEmpty => (),
            Err(e) => {
                tokio::fs::remove_dir_all(&new).await?;
                return Err(e.into());
            }
        }
        info!("add_path extracted to {:?}", path);
        Ok(tonic::Response::new(proto::DerivationReply {
            result: Some(proto::derivation_reply::Result::Ok(
                proto::DerivationResult {
                    destdir: vec![path.to_str().unwrap().to_string()],
                    paths: Vec::new(),
                    path_patterns: Vec::new(),
                },
            )),
        }))
    }

    type DerivationStream = ResponseStream;

    async fn derivation(
        &self,
        request: tonic::Request<proto::DerivationRequest>,
    ) -> Result<tonic::Response<ResponseStream>, tonic::Status> {
        debug!("derivation request");
        let now = std::time::Instant::now();
        let r = request.into_inner();
        let (tx, rx) = tokio::sync::mpsc::channel(200);
        debug!("derivation request: {:?} {:?}", r.name, r.paths);
        self.sender
            .send((
                crate::container::BuildRequest {
                    name: r.name.clone(),
                    paths: r.paths,
                    script: r.builder,
                    target: r.target,
                    output_hash: r.output_hash,
                },
                tx,
            ))
            .unwrap();
        use crate::container::Msg;
        let output_stream = ReceiverStream::new(rx).map(|x| {
            Ok(match x {
                Msg::Ok(out) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Ok(
                        proto::DerivationResult {
                            destdir: out
                                .into_iter()
                                .map(|x| x.to_str().unwrap().to_string())
                                .collect(),
                            paths: Vec::new(),
                            path_patterns: Vec::new(),
                        },
                    )),
                },
                Msg::Error(p) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Error(p)),
                },
                Msg::Stdout(p) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Stdout(p)),
                },
                Msg::Stderr(p) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Stderr(p)),
                },
            })
        });
        info!("request {:?}: {:?}", r.name, now.elapsed());
        Ok(tonic::Response::new(Box::pin(output_stream)))
    }

    async fn add_url(
        &self,
        request: tonic::Request<proto::AddUrlRequest>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
        let r = request.into_inner();
        debug!("add_url request {:?}", r);
        let p = self
            .deb_client
            .http_download(
                &r.url,
                match r.hash_algorithm {
                    0 => {
                        let mut h = [0; 32];
                        h.clone_from_slice(&r.hash);
                        Hash::Blake3(h)
                    }
                    1 => {
                        let mut h = [0; 32];
                        h.clone_from_slice(&r.hash);
                        Hash::Sha256(h)
                    }
                    2 => {
                        let mut h = [0; 64];
                        h.clone_from_slice(&r.hash);
                        Hash::Sha512(h)
                    }
                    _ => unreachable!(),
                },
            )
            .await
            .unwrap();
        Ok(tonic::Response::new(proto::DerivationReply {
            result: Some(proto::derivation_reply::Result::Ok(
                proto::DerivationResult {
                    destdir: vec![p.to_str().unwrap().to_string()],
                    paths: Vec::new(),
                    path_patterns: Vec::new(),
                },
            )),
        }))
    }

    async fn ubuntu_release(
        &self,
        request: tonic::Request<proto::UbuntuReleaseRequest>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
        debug!("ubuntu release {:?}", request);
        let now = std::time::Instant::now();
        let r = request.into_inner();
        let h = self.deb_client.in_release(r.release.clone()).await.unwrap();
        let arch = match r.arch {
            0 => "amd64",
            1 => "aarch64",
            _ => unreachable!(),
        };
        let p = self
            .deb_client
            .packages(&h, &r.repository, arch)
            .await
            .unwrap();
        info!(
            "ubuntu release request {:?}: {:?}",
            r.release,
            now.elapsed()
        );
        Ok(tonic::Response::new(proto::DerivationReply {
            result: Some(proto::derivation_reply::Result::Ok(
                proto::DerivationResult {
                    destdir: vec![p.to_str().unwrap().to_string()],
                    paths: Vec::new(),
                    path_patterns: Vec::new(),
                },
            )),
        }))
    }

    type UbuntuPackageStream = ResponseStream;

    async fn ubuntu_package(
        &self,
        request: tonic::Request<proto::UbuntuPackageRequest>,
    ) -> Result<tonic::Response<Self::UbuntuPackageStream>, tonic::Status> {
        let now = std::time::Instant::now();
        let r = request.into_inner();
        let index: Result<Vec<_>, _> = r
            .index
            .into_iter()
            .map(|index| deb::Index::open(&index))
            .collect();
        let index = index.unwrap();

        let link_extra: Vec<_> = r
            .link_extra
            .into_iter()
            .map(|l| {
                (
                    regex::Regex::new(&l.pkg).unwrap(),
                    regex::Regex::new(&l.dep).unwrap(),
                )
            })
            .collect();

        let (tx, rx) = tokio::sync::mpsc::channel(200);
        use crate::extract::Msg;
        let output_stream = ReceiverStream::new(rx).map(|x| {
            Ok(match x {
                Msg::Downloading(p) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Loading(p)),
                },
                Msg::Ok(p) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Ok(
                        proto::DerivationResult {
                            destdir: p
                                .result
                                .iter()
                                .rev()
                                .map(|x| x.to_str().unwrap().to_string())
                                .collect(),
                            paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
                            path_patterns: Vec::new(),
                        },
                    )),
                },
                Msg::Error(e) => proto::DerivationReply {
                    result: Some(proto::derivation_reply::Result::Error(e.to_string())),
                },
            })
        });

        match download_extract_deps(
            &index,
            &self.deb_client,
            &r.name,
            &link_extra,
            PollSender::new(tx.clone()),
        )
        .await
        {
            Ok(p) => {
                info!("path {:?} {:#?}", r.name, p);
                tx.send(Msg::Ok(p)).await.unwrap();
            }
            Err(e) => tx.send(Msg::Error(e)).await.unwrap(),
        }

        info!("ubuntu package request {:?}: {:?}", r.name, now.elapsed());

        Ok(tonic::Response::new(Box::pin(output_stream)))
    }
}