pijul nest
guest [sign in]

Some better error handling + implements gzipping packets + corrects bug with architecture name in ubuntu repos

Madx
Feb 4, 2026, 3:06 PM
VK2LRA5ED6CWHXT4QZY2VT3KH5CSJTZ6OST2EZLCWQPVEK7LA2CQC

Dependencies

  • [2] UWQB743K First working shell (with ocaml code)
  • [3] 2GMRXGVB Some better error handling + implements gzipping packets + corrects bug with architecture name in ubuntu repos
  • [4] LIMIMBOU Bootstrapping the Rust part
  • [5] ODUDDQRY Adding the OCaml interface
  • [6] AJLMC7UM Forwarding log to frontend

Change contents

  • file deletion: deb.rs~ (----------)
    [2.15][3.0:31](),[3.31][3.32:32]()
    use lazy_static::lazy_static;
    use std::collections::HashMap;
    use std::ffi::OsStr;
    use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
    use thiserror::*;
    use tracing::*;
    mod index;
    pub use index::*;
    /// A parsed .deb file.
    #[derive(Debug)]
    pub struct Deb {
    md5sums: HashMap<String, String>,
    control_file: String,
    _global: Header,
    _control: Header,
    data: Header,
    data_offset: u64,
    }
    /// A "stanza", i.e. a description of a Debian package.
    #[derive(Debug, Default, Clone)]
    pub struct Stanza<'a> {
    #[allow(missing_docs)]
    pub package: &'a str,
    #[allow(missing_docs)]
    pub version: Version<'a>,
    #[allow(missing_docs)]
    pub architecture: &'a str,
    #[allow(missing_docs)]
    pub depends: Vec<Dep<'a>>,
    #[allow(missing_docs)]
    pub sha256: Option<&'a str>,
    #[allow(missing_docs)]
    pub size: Option<usize>,
    /// Path of the .deb file inside an index.
    pub file_name: Option<&'a str>,
    }
    /// A dependency, which is either a "simple" dependency, or a list of
    /// alternatives.
    #[derive(Debug, Clone)]
    pub enum Dep<'a> {
    /// A simple dependency.
    Simple(SimpleDep<'a>),
    /// A dependency that can be supplied by multiple packages. This
    /// is an alternative mechanism to virtual packages.
    Alternatives {
    #[allow(missing_docs)]
    alt: Vec<SimpleDep<'a>>,
    },
    }
    /// A single dependency.
    #[derive(Debug, Clone)]
    pub struct SimpleDep<'a> {
    /// Name of the dependency.
    pub name: &'a str,
    /// Whether this dependency can have any version.
    pub any: bool,
    /// Constraints on dependencies.
    pub constraints: Vec<(&'a str, Version<'a>)>,
    }
    fn parse_control(s: &str) -> IResult<&str, Stanza> {
    let mut st = Stanza::default();
    for l in s.lines() {
    if l.is_empty() {
    let (_, b) = s.split_at(l.as_ptr() as usize - s.as_ptr() as usize);
    return Ok((b, st));
    }
    if let Some(i) = l.find(':') {
    let (key, val) = l.split_at(i);
    let (_, val) = val.split_at(1);
    debug!("key {:?} {:?}", key, val);
    match key.trim() {
    "Package" => st.package = val.trim(),
    "Version" => st.version = parse_version(val.trim()).unwrap().1,
    "Architecture" => st.architecture = val.trim(),
    "SHA256" => st.sha256 = Some(val.trim()),
    "Filename" => st.file_name = Some(val.trim()),
    "Depends" | "Pre-Depends" => {
    let d = parse_deps(val.trim());
    debug!("{:?}", d);
    if let Ok(("", a)) = d {
    st.depends = a
    } else {
    error!("error {:?}", val.trim());
    return IResult::Err(nom::Err::Error(nom::error::make_error(
    s,
    nom::error::ErrorKind::Tag,
    )));
    }
    }
    _ => {}
    }
    }
    }
    Ok(("", st))
    }
    #[test]
    fn test_grub_common() {
    use tracing_subscriber::prelude::*;
    use tracing_subscriber::util::SubscriberInitExt;
    tracing_subscriber::registry()
    .with(
    tracing_subscriber::EnvFilter::try_from_default_env()
    .unwrap_or_else(|_| String::new().into()),
    )
    .with(tracing_subscriber::fmt::layer())
    .try_init()
    .unwrap();
    let (rest, p) = parse_control("Package: grub-common\nArchitecture: amd64\nVersion: 2.12-1ubuntu7\nBuilt-Using: lzo2 (= 2.10-2build3)\nMulti-Arch: foreign\nPriority: optional\nSection: admin\nSource: grub2\nOrigin: Ubuntu\nMaintainer: Ubuntu Developers <ubuntu-devel-discuss@lists.ubuntu.com>\nOriginal-Maintainer: GRUB Maintainers <pkg-grub-devel@alioth-lists.debian.net>\nBugs: https://bugs.launchpad.net/ubuntu/+filebug\nInstalled-Size: 12476\nDepends: libc6 (>= 2.38), libdevmapper1.02.1 (>= 2:1.02.36), libefiboot1t64 (>=38), libefivar1t64 (>= 38), libfreetype6 (>= 2.2.1), libfuse3-3 (>= 3.2.3), liblzma5 (>= 5.1.1alpha+20120614), debconf (>= 0.5) | debconf-2.0, gettext-base, lsb-base (>= 3.0-6), python3, python3-apt\nRecommends: os-prober (>= 1.33)\nSuggests: multiboot-doc, grub-emu, mtools, xorriso (>= 0.5.6.pl00), desktop-base (>= 4.0.6), console-setup\nConflicts: init-select\nBreaks: apport (<< 2.1.1), friendly-recovery (<< 0.2.13), lupin-support (<< 0.55), mdadm (<< 2.6.7-2)\nReplaces: grub-coreboot (<< 2.00-4), grub-efi (<< 1.99-1), grub-efi-amd64 (<< 2.00-4), grub-efi-ia32 (<< 2.00-4), grub-efi-ia64 (<< 2.00-4), grub-ieee1275 (<< 2.00-4), grub-linuxbios (<< 1.96+20080831-1), grub-pc (<< 2.00-4), grub-yeeloong (<< 2.00-4), init-select\nFilename: pool/main/g/grub2/grub-common_2.12-1ubuntu7_amd64.deb\nSize: 2119862\nMD5sum: 86e63081ae4ee34d6a4f408ac6dbf0e4\nSHA1: b98f9f88e5480423e041fd647b438ba2f6e6f5ea\nSHA256: d8110b97b6fb6da40449c74b9cb527f02965dffaae8344399a711617f5a69249\nSHA512: 85c63b8d3e6a870df48533ab525df13abe9ec4861ff4b5577def94f65a2ebf6ac44a54a6cd0eca1c825e1c7aa2bd14e4d4ed53f83d381963ac1dc51daa16482a\nHomepage: https://www.gnu.org/software/grub/\nDescription: GRand Unified Bootloader (common files)\nTask: ubuntu-desktop-minimal, ubuntu-desktop, ubuntu-desktop-raspi, kubuntu-desktop, xubuntu-minimal, xubuntu-desktop, lubuntu-desktop, ubuntustudio-desktop-core, ubuntustudio-desktop, ubuntukylin-desktop,ubuntukylin-desktop-minimal, ubuntu-mate-core, ubuntu-mate-desktop, ubuntu-budgie-desktop-minimal, ubuntu-budgie-desktop, ubuntu-budgie-desktop-raspi, ubuntu-unity-desktop, edubuntu-desktop-gnome-minimal, edubuntu-desktop-gnome-raspi, ubuntucinnamon-desktop-minimal, ubuntucinnamon-desktop-raspi\nDescription-md5: 9c75036dc0a0792fedbc58df208ed227").unwrap();
    assert!(p.depends.iter().any(|x| match x {
    Dep::Simple(s) => s.name == "liblzma5",
    _ => false,
    }));
    assert!(rest.is_empty())
    }
    /// A version of a package.
    #[derive(Debug, Clone)]
    pub struct Version<'a> {
    /// Epoch, i.e. a Debian mechanism to reorder upstream versions if
    /// necessary, for example if the upstream versioning scheme
    /// changes.
    pub epoch: u32,
    /// Upstream version.
    pub upstream_version: Upstream<'a>,
    /// Debian-specific suffix to distinguish between patches.
    pub debian: Option<&'a str>,
    }
    impl<'a> Default for Version<'a> {
    fn default() -> Version<'a> {
    Version {
    epoch: 0,
    upstream_version: Upstream(""),
    debian: None,
    }
    }
    }
    fn parse_constraint<'a>(s: &'a str) -> IResult<&'a str, (&'a str, Version<'a>)> {
    let (s, constraint) = tag("<<")
    .or(tag("<="))
    .or(tag("="))
    .or(tag(">="))
    .or(tag(">>"))
    .parse(s)?;
    debug!("parse_constraint {:?}", constraint);
    let (s, _) = space0(s)?;
    let (s, version) = parse_version(s)?;
    debug!("parse_constraint {:?} {:?}", s, (constraint, &version));
    Ok((s, (constraint, version)))
    }
    fn parse_dep<'a>(s0: &'a str) -> IResult<&'a str, Dep<'a>> {
    let (s, name) = parse_name(s0)?;
    let (s, any) = if s.starts_with(":any") {
    let (_, b) = s.split_at(4);
    (b, true)
    } else {
    (s, false)
    };
    let (s, _) = space0(s)?;
    let mut constraints = Vec::new();
    if let Ok((s, _)) = tag::<_, _, ()>("(").parse(s) {
    let (s, version) = if let Some(i) = s.find(')') {
    let (a, b) = s.split_at(i);
    let (_, b) = b.split_at(1);
    (b, a)
    } else {
    return IResult::Err(nom::Err::Error(nom::error::make_error(
    s,
    nom::error::ErrorKind::Tag,
    )));
    };
    let mut v = version;
    while let Ok((v_, c)) = parse_constraint(v) {
    constraints.push(c);
    let (v_, _) = space0(v_)?;
    if let Ok((v_, _)) = tag::<_, _, ()>(",").parse(v_) {
    let (v_, _) = space0(v_)?;
    v = v_;
    } else {
    break;
    }
    }
    Ok((
    s,
    Dep::Simple(SimpleDep {
    name,
    any,
    constraints,
    }),
    ))
    } else {
    Ok((
    s,
    Dep::Simple(SimpleDep {
    name,
    any,
    constraints,
    }),
    ))
    }
    }
    fn parse_name(s: &str) -> IResult<&str, &str> {
    if let Some(i) = s.find(|c| {
    !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '.' || c == '+' || c == '-')
    }) {
    if i > 0 {
    let (a, b) = s.split_at(i);
    return Ok((b, a));
    }
    } else {
    return Ok(("", s));
    }
    error!("ERROR {:?}", s);
    return IResult::Err(nom::Err::Error(nom::error::make_error(
    s,
    nom::error::ErrorKind::Tag,
    )));
    }
    fn parse_deps<'a>(s: &'a str) -> IResult<&'a str, Vec<Dep<'a>>> {
    let mut d = Vec::new();
    let (mut s, _) = space0(s)?;
    let mut alt_is_open = false;
    while let Ok((s_, d_)) = parse_dep(s) {
    let (s_, _) = space0(s_)?;
    if let Ok((s_, _)) = tag::<_, _, ()>("|").parse(s_) {
    let (s_, _) = space0(s_)?;
    s = s_;
    alt_is_open = true;
    if let Dep::Simple(d_) = d_ {
    if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {
    alt.push(d_)
    } else {
    d.push(Dep::Alternatives { alt: vec![d_] })
    }
    } else {
    panic!("{:?}", s);
    }
    } else if let Ok((s_, _)) = tag::<_, _, ()>(",").parse(s_) {
    if alt_is_open {
    if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {
    if let Dep::Simple(d_) = d_ {
    alt.push(d_)
    } else {
    panic!("{:?}", s);
    }
    }
    } else {
    d.push(d_)
    }
    alt_is_open = false;
    s = s_;
    } else {
    if alt_is_open {
    if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {
    if let Dep::Simple(d_) = d_ {
    alt.push(d_)
    } else {
    panic!("{:?}", s);
    }
    }
    } else {
    d.push(d_)
    }
    s = s_;
    break;
    }
    let (s_, _) = space0(s)?;
    s = s_;
    }
    Ok((s, d))
    }
    /// The representation of an upstream version. Will implement `Ord`.
    #[derive(Debug, Clone, PartialEq, Eq)]
    pub struct Upstream<'a>(&'a str);
    impl<'a> std::ops::Deref for Upstream<'a> {
    type Target = str;
    fn deref(&self) -> &str {
    self.0
    }
    }
    impl<'a> PartialOrd for Upstream<'a> {
    /// Following the algorithm described here:
    /// https://www.debian.org/doc/debian-policy/ch-controlfields.html#version
    fn partial_cmp(&self, b: &Upstream<'a>) -> Option<std::cmp::Ordering> {
    fn split_initial(s: &str) -> (&str, &str, &str) {
    // Find `u`, the first digit, and `v > u`, the first non-digit after `u`.
    let mut u = s.len();
    let mut v = s.len();
    for (n, &b) in s.as_bytes().iter().enumerate() {
    if u == s.len() {
    if b >= b'0' && b <= b'9' {
    u = n
    }
    } else if b < b'0' || b > b'9' {
    v = n;
    break;
    }
    }
    let (ab, c) = s.split_at(v);
    let (a, b) = ab.split_at(u);
    (a, b, c)
    }
    let mut a = self.0;
    let mut b = b.0;
    use std::cmp::Ordering;
    loop {
    let (a0, a1, a_) = split_initial(a);
    let (b0, b1, b_) = split_initial(b);
    // Compare a0 / b0 using the modified ASCII ordering described in the Debian manual.
    let mut a0i = a0.as_bytes().iter();
    let mut b0i = b0.as_bytes().iter();
    loop {
    match (a0i.next(), b0i.next()) {
    (None, None) => break,
    (Some(c), Some(d)) if c == d => continue,
    (Some(b'~'), _) => return Some(Ordering::Less),
    (_, Some(b'~')) => return Some(Ordering::Greater),
    (Some(c), Some(d)) if c.is_ascii_alphabetic() && !d.is_ascii_alphabetic() => {
    return Some(Ordering::Less);
    }
    (Some(c), Some(d)) if d.is_ascii_alphabetic() && !c.is_ascii_alphabetic() => {
    return Some(Ordering::Greater);
    }
    (c, d) => return Some(c.cmp(&d)),
    }
    }
    // If we haven't returned, a0 == b0. Compare the digits part.
    let a1 = if a1.is_empty() {
    0
    } else {
    a1.parse::<u64>().unwrap()
    };
    let b1 = if b1.is_empty() {
    0
    } else {
    b1.parse::<u64>().unwrap()
    };
    let cmp1 = a1.cmp(&b1);
    if let Ordering::Equal = cmp1 {
    a = a_;
    b = b_;
    } else {
    return Some(cmp1);
    }
    }
    }
    }
    impl<'a> Ord for Upstream<'a> {
    fn cmp(&self, b: &Upstream<'a>) -> std::cmp::Ordering {
    self.partial_cmp(b).unwrap()
    }
    }
    #[test]
    fn test_upstream_ordering() {
    let mut x = [
    Upstream("~~"),
    Upstream("~~a"),
    Upstream("~"),
    Upstream(""),
    Upstream("a"),
    ];
    let y = x.clone();
    x.sort();
    assert_eq!(x, y,);
    }
    use nom::{
    bytes::tag,
    character::complete::{digit1, newline, space0},
    multi::separated_list1,
    IResult, Parser,
    };
    /// Parse a version in the Debian version format.
    fn parse_version<'a>(s: &'a str) -> IResult<&'a str, Version<'a>> {
    fn epoch(s: &str) -> IResult<&str, u32> {
    let (s, i) = digit1(s)?;
    let (s, _) = tag(":").parse(s)?;
    Ok((s, i.parse().unwrap()))
    }
    let (s, epoch) = epoch(s).unwrap_or((s, 0));
    lazy_static! {
    static ref RE_DASH: regex::Regex =
    regex::Regex::new(r"^[a-z0-9\.~+-]+-([a-z0-9\.~+]+)").unwrap();
    static ref RE: regex::Regex = regex::Regex::new(r"^[a-z0-9\.~+]+").unwrap();
    }
    let (s, v) = if let Some(m) = RE_DASH.find(s) {
    debug!("parse_version RE_DASH {:?}", s);
    let (a, b) = s.split_at(m.end());
    (b, a)
    } else if let Some(m) = RE.find(s) {
    debug!("parse_version RE {:?}", s);
    let (a, b) = s.split_at(m.end());
    (b, a)
    } else {
    debug!("parse_version failed {:?}", s);
    return IResult::Err(nom::Err::Error(nom::error::make_error(
    s,
    nom::error::ErrorKind::Tag,
    )));
    };
    let (upstream_version, debian) = if let Some(i) = v.rfind('-') {
    let (a, b) = v.split_at(i);
    (Upstream(a), Some(b.split_at(1).1))
    } else {
    (Upstream(v), None)
    };
    Ok((
    s,
    Version {
    epoch,
    upstream_version,
    debian,
    },
    ))
    }
    /// Structure of the "headers" in the debian file. No alignment
    /// necessary, the numbers are in decimal. No heap allocation.
    #[derive(Debug)]
    #[repr(C)]
    struct H {
    file_id: [u8; 16],
    timestamp: [u8; 12],
    owner_id: [u8; 6],
    group_id: [u8; 6],
    file_mode: [u8; 8],
    file_size: [u8; 10],
    end_char: [u8; 2],
    }
    /// Parsed version of `H`, still no heap allocation.
    #[allow(dead_code)]
    #[derive(Debug)]
    struct Header {
    file_id: [u8; 16],
    timestamp: u64,
    owner_id: u32,
    group_id: u32,
    file_mode: u32,
    file_size: u64,
    }
    #[allow(missing_docs)]
    /// Possible errors for .deb files.
    #[derive(Debug, Error)]
    pub enum Error {
    #[error("Deb file parse error")]
    ParseError,
    #[error(transparent)]
    Utf8(#[from] std::str::Utf8Error),
    #[error(transparent)]
    Int(#[from] std::num::ParseIntError),
    #[error(transparent)]
    IO(#[from] std::io::Error),
    #[error("Unsupported compression algorithm: {file_id}")]
    UnsupportedCompression { file_id: String },
    #[error("Version parse: {version}")]
    Version { version: String },
    }
    impl Header {
    pub fn file_id(&self) -> &str {
    std::str::from_utf8(&self.file_id).unwrap().trim()
    }
    }
    impl H {
    fn parse(&self) -> Result<Header, Error> {
    std::str::from_utf8(&self.file_id)?;
    Ok(Header {
    file_id: self.file_id,
    timestamp: std::str::from_utf8(&self.timestamp)?.trim().parse()?,
    owner_id: std::str::from_utf8(&self.owner_id)?.trim().parse()?,
    group_id: std::str::from_utf8(&self.group_id)?.trim().parse().unwrap(),
    file_mode: u32::from_str_radix(std::str::from_utf8(&self.file_mode)?.trim(), 8)?,
    file_size: std::str::from_utf8(&self.file_size)?.trim().parse()?,
    })
    }
    }
    fn read_hdr<R: Read>(r: &mut R) -> Result<Header, Error> {
    unsafe {
    assert_eq!(size_of::<H>(), HEADER_SIZE as usize);
    let mut b: H = std::mem::zeroed();
    let pb: *mut H = &mut b;
    r.read_exact(&mut std::slice::from_raw_parts_mut(
    pb as *mut u8,
    size_of::<H>(),
    ))?;
    debug!("{:?}", b);
    b.parse()
    }
    }
    const HEADER_SIZE: u64 = 60;
    impl Deb {
    /// Parse the control part, containing the description (stanza) of this package.
    pub fn parse_control(&self) -> Vec<Stanza> {
    if let Ok(("", st)) = separated_list1(newline, parse_control).parse(&self.control_file) {
    st
    } else {
    Vec::new()
    }
    }
    /// Return the md5sums of each file.
    pub fn md5sums(&self) -> &HashMap<String, String> {
    &self.md5sums
    }
    /// Decompress the data part of the deb file into a directory.
    pub fn decompress<R: BufRead + Seek, P: AsRef<std::path::Path>>(
    &self,
    mut r: R,
    path: P,
    ) -> Result<(), Error> {
    r.seek(SeekFrom::Start(self.data_offset))?;
    debug!("decompress {:?}", self.data.file_id());
    if self.data.file_id().ends_with(".tar.gz") {
    unimplemented!()
    } else if self.data.file_id().ends_with(".tar.zst") {
    let mut ar = tar::Archive::new(
    zstd::stream::read::Decoder::new(r.take(self.data.file_size)).unwrap(),
    );
    for e in ar.entries().unwrap() {
    let mut e = e.unwrap();
    debug!("zstd decompress {:?}", e.path());
    // f(&e.path()?);
    e.unpack_in(&path).unwrap();
    }
    } else if self.data.file_id().ends_with(".tar.xz") {
    let mut ar = tar::Archive::new(xz::read::XzDecoder::new(r.take(self.data.file_size)));
    for e in ar.entries().unwrap() {
    let mut e = e.unwrap();
    debug!("xz decompress {:?}", e.path());
    // f(&e.path()?);
    e.unpack_in(&path).unwrap();
    }
    } else if self.data.file_id().ends_with(".tar") {
    let mut ar = tar::Archive::new(r.take(self.data.file_size));
    for e in ar.entries().unwrap() {
    let mut e = e.unwrap();
    debug!("tar extract {:?}", e.path());
    // f(&e.path()?);
    e.unpack_in(&path).unwrap();
    }
    } else {
    unimplemented!()
    };
    Ok(())
    }
    /// Parse a `.deb` file from a buffered reader.
    pub fn read<R: BufRead + Seek>(mut r: R) -> Result<Deb, Error> {
    let mut b = [0; 8];
    r.read_exact(&mut b)?;
    if &b != b"!<arch>\n" {
    return Err(Error::ParseError);
    }
    let global = read_hdr(&mut r)?;
    trace!("GLOBAL {:?}", global);
    assert_eq!(global.file_size, 4);
    r.seek(SeekFrom::Current(global.file_size as i64))?;
    // r.skip_until(b'c')?; // 'control.…'
    // r.seek(SeekFrom::Current(-1))?;
    let control = read_hdr(&mut r)?;
    trace!("CONTROL {:?}", control);
    // Control files are expected to be short, decompress.
    let mut ctrl = vec![0; control.file_size as usize];
    r.read_exact(&mut ctrl)?;
    // gzip xz zstd none
    let ctrl_tar = if control.file_id().ends_with(".tar.gz") {
    unimplemented!()
    } else if control.file_id().ends_with(".tar.zst") {
    zstd::decode_all(&ctrl[..])?
    } else if control.file_id().ends_with(".tar.xz") {
    let mut decompressor = xz::read::XzDecoder::new(&ctrl[..]);
    let mut result = Vec::new();
    decompressor.read_to_end(&mut result)?;
    result
    } else if control.file_id().ends_with(".tar") {
    ctrl
    } else {
    return Err(Error::UnsupportedCompression {
    file_id: control.file_id().to_string(),
    });
    };
    let mut ctrl_tar = tar::Archive::new(&ctrl_tar[..]);
    let mut control_file = String::new();
    let mut md5sums = HashMap::new();
    for e in ctrl_tar.entries().unwrap() {
    let mut e = e.unwrap();
    if e.path()?.file_name() == Some(OsStr::new("control")) {
    e.read_to_string(&mut control_file)?;
    } else if e.path()?.file_name() == Some(OsStr::new("md5sums")) {
    let mut e = BufReader::new(e);
    let mut s = String::new();
    while e.read_line(&mut s)? > 0 {
    let mut it = s.split(' ').filter(|x| !x.is_empty());
    if let (Some(hash), Some(file)) = (it.next(), it.next()) {
    md5sums.insert(file.trim().to_string(), hash.trim().to_string());
    }
    s.clear();
    }
    }
    }
    if control.file_size % 2 == 1 {
    r.seek(SeekFrom::Current(1))?;
    }
    let data = read_hdr(&mut r)?;
    debug!("data = {:?}", data);
    Ok(Deb {
    md5sums,
    _global: global,
    _control: control,
    control_file,
    data,
    data_offset: r.seek(SeekFrom::Current(0))?,
    })
    }
    }
  • file deletion: lib.rs~ (----------)
    [2.15][3.22535:22566](),[3.22566][3.22567:22567]()
    //! Run bash scripts in containers on Linux, with only deterministic
    //! Debian or Ubuntu packages in scope.
    #![deny(
    missing_docs,
    trivial_casts,
    trivial_numeric_casts,
    unused_import_braces,
    unused_qualifications
    )]
    use futures::SinkExt;
    use futures::StreamExt;
    use std::collections::HashMap;
    use std::path::PathBuf;
    use std::sync::Arc;
    use tempfile::NamedTempFile;
    use thiserror::*;
    use tokio::sync::{Mutex, Semaphore};
    use tracing::*;
    /// Handle Debian packages.
    pub mod deb;
    /// Extract a Debian package and all dependencies into the store.
    pub mod extract;
    /// Recursive iterator on the files/directories in a path.
    pub mod find_files;
    /// Mount and unmount paths.
    pub mod mount;
    /// Run scripts in a chroot inside a new Linux namespace, with the
    /// right directories mounted and symbolically linked to simulate a
    /// Debian/Ubuntu filesystem hierarchy.
    pub mod container;
    mod dummy_sink;
    use dummy_sink::*;
    /// Errors
    #[derive(Debug, Error)]
    pub enum Error {
    /// An error from manipulating a Debian package.
    #[error(transparent)]
    Deb(#[from] deb::Error),
    /// IO error
    #[error(transparent)]
    IO(#[from] std::io::Error),
    /// An error from persisting a temporary file.
    #[error(transparent)]
    Persist(#[from] tempfile::PersistError),
    /// An error from creating a temporary file.
    #[error(transparent)]
    Tempfile(#[from] async_tempfile::Error),
    #[error(transparent)]
    /// An error from `elfedit`, this crate's companion ELF patcher.
    Elf(#[from] elfedit::Error),
    /// HTTP errors
    #[error(transparent)]
    Reqwest(#[from] reqwest::Error),
    /// Wrong signature (on an Ubuntu index)
    #[error("Signature error")]
    Signature,
    /// Wrong package hash
    #[error("Hash: expected {expected}, got {got}.")]
    WrongHash {
    /// Expected hash.
    expected: String,
    /// Obtained hash.
    got: String,
    },
    /// Wrong package or file size
    #[error("Wrong size")]
    WrongSize,
    /// Wrong result symlink: Elpe packages have one "input name"
    /// given by the hash of their inputs, and that directory is
    /// symlinked from the "output name", which is the hash of their
    /// output. This makes the outputs verifiable all the way down:
    /// failures to verify result in this error.
    #[error("Wrong result symlink, expected {expected:?}, got {got:?}")]
    WrongResultSymlink {
    /// Expected output.
    expected: PathBuf,
    /// Obtained output.
    got: PathBuf,
    },
    /// The build process returned a status other than 0.
    #[error("Build process returned {status}")]
    BuildReturn {
    /// Return status.
    status: i32,
    },
    /// Package not found
    #[error("Package not found: {pkg}")]
    PackageNotFound {
    /// The package name.
    pkg: String,
    },
    }
    /// A Debian index client.
    #[derive(Clone)]
    pub struct Client {
    c: lazy_init::Lazy<reqwest::Client>,
    pgp_home: PathBuf,
    mirror: String,
    store_path: PathBuf,
    in_release: Arc<Mutex<HashMap<std::borrow::Cow<'static, str>, Arc<InRelease>>>>,
    download_sem: Arc<Semaphore>,
    store_locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,
    timeout: std::time::Duration,
    }
    /// Hashes, used for example in URL downloads.
    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    pub enum Hash {
    #[allow(missing_docs)]
    Blake3([u8; 32]),
    #[allow(missing_docs)]
    Sha256([u8; 32]),
    #[allow(missing_docs)]
    Sha512([u8; 64]),
    }
    /// Hasher for each different hash algorithm.
    pub enum Hasher {
    #[allow(missing_docs)]
    Blake3(blake3::Hasher),
    #[allow(missing_docs)]
    Sha256(sha2::Sha256),
    #[allow(missing_docs)]
    Sha512(sha2::Sha512),
    }
    impl Hash {
    /// Create a hasher for this hash algorithm.
    pub fn hasher(&self) -> Hasher {
    match self {
    Hash::Blake3(_) => Hasher::Blake3(blake3::Hasher::new()),
    Hash::Sha256(_) => Hasher::Sha256(sha2::Sha256::new()),
    Hash::Sha512(_) => Hasher::Sha512(sha2::Sha512::new()),
    }
    }
    }
    impl Hasher {
    /// Update the hasher with new bytes.
    pub fn update(&mut self, b: &[u8]) {
    match self {
    Hasher::Blake3(h) => {
    h.update(b);
    }
    Hasher::Sha256(h) => {
    h.update(b);
    }
    Hasher::Sha512(h) => {
    h.update(b);
    }
    }
    }
    /// Create the hash, to be compared with the one supplied by the user.
    pub fn finalize(self) -> Hash {
    match self {
    Hasher::Blake3(h) => Hash::Blake3(h.finalize().into()),
    Hasher::Sha256(h) => Hash::Sha256(h.finalize().into()),
    Hasher::Sha512(h) => Hash::Sha512(h.finalize().into()),
    }
    }
    }
    impl std::ops::Deref for Hash {
    type Target = [u8];
    fn deref(&self) -> &Self::Target {
    match self {
    Hash::Blake3(h) => &h[..],
    Hash::Sha256(h) => &h[..],
    Hash::Sha512(h) => &h[..],
    }
    }
    }
    impl std::ops::DerefMut for Hash {
    fn deref_mut(&mut self) -> &mut Self::Target {
    match self {
    Hash::Blake3(h) => &mut h[..],
    Hash::Sha256(h) => &mut h[..],
    Hash::Sha512(h) => &mut h[..],
    }
    }
    }
    impl Client {
    /// Download a URL.
    pub async fn http_download(&self, url: &str, h: Hash) -> Result<PathBuf, Error> {
    let expected_path = self
    .store_path
    .join(data_encoding::BASE32_DNSSEC.encode(&h));
    if let Ok(mut f) = tokio::fs::File::open(&expected_path).await {
    use tokio::io::AsyncReadExt;
    let mut buf = [0; 4096];
    let mut hasher = h.hasher();
    while let Ok(n) = f.read(&mut buf).await {
    if n == 0 {
    break;
    }
    hasher.update(&buf[..n]);
    }
    let got = hasher.finalize();
    return if got == h {
    Ok(expected_path)
    } else {
    Err(Error::WrongHash {
    expected: data_encoding::HEXLOWER.encode(&h),
    got: data_encoding::HEXLOWER.encode(&got),
    })
    };
    }
    let mut r = self.client().get(url).send().await?;
    let mut t: Option<tokio::task::JoinHandle<Result<tokio::fs::File, tokio::io::Error>>> =
    None;
    let file = NamedTempFile::new_in(&self.store_path)?;
    let mut f = Some(tokio::fs::File::create(&file).await?);
    let mut hasher = h.hasher();
    while let Some(chunk) = r.chunk().await? {
    hasher.update(&chunk);
    if let Some(t) = t.take() {
    f = Some(t.await.unwrap().unwrap())
    }
    let mut f = f.take().unwrap();
    t = Some(tokio::spawn(async move {
    f.write_all(&chunk).await?;
    Ok(f)
    }))
    }
    let got = hasher.finalize();
    if got == h {
    file.persist(&expected_path)?;
    Ok(expected_path)
    } else {
    Err(Error::WrongHash {
    expected: data_encoding::HEXLOWER.encode(&h),
    got: data_encoding::HEXLOWER.encode(&got),
    })
    }
    }
    }
    /// A locked store path, to prevent concurrent writes to the
    /// store. For performance reasons, these are memory locks rather than
    /// filesystem locks.
    pub struct StoreLock {
    locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,
    p: PathBuf,
    _lock: tokio::sync::OwnedMutexGuard<()>,
    }
    impl Drop for StoreLock {
    fn drop(&mut self) {
    debug!("store lock: dropping {:?}", self.p);
    self.locks.lock().unwrap().remove(&self.p);
    }
    }
    #[derive(Debug)]
    struct Downloaded {
    path: PathBuf,
    }
    use sha2::Digest;
    use std::process::Stdio;
    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
    use tokio::process::Command;
    /// Root file of a package index, containing the signed hashes of the package list.
    #[derive(Debug)]
    pub struct InRelease {
    hashed: HashMap<String, (usize, [u8; 32])>,
    release: std::borrow::Cow<'static, str>,
    }
    struct HashedWriter<W> {
    hasher: sha2::Sha256,
    w: W,
    }
    impl<W: std::io::Write> std::io::Write for HashedWriter<W> {
    fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
    self.hasher.update(b);
    self.w.write_all(b).unwrap();
    Ok(b.len())
    }
    fn flush(&mut self) -> Result<(), std::io::Error> {
    self.w.flush()
    }
    }
    const MAX_PARALLEL_DOWNLOADS: usize = 20;
    impl Client {
    /// Create a client.
    pub fn new<P: AsRef<std::path::Path>, Q: AsRef<std::path::Path>>(
    pgp_home: P,
    store_path: Q,
    mirror: &str,
    ) -> Self {
    std::fs::create_dir_all(store_path.as_ref()).unwrap();
    Client {
    c: lazy_init::Lazy::new(),
    mirror: mirror.to_string(),
    store_path: store_path.as_ref().to_path_buf(),
    in_release: Arc::new(HashMap::new().into()),
    download_sem: Arc::new(Semaphore::new(MAX_PARALLEL_DOWNLOADS)),
    store_locks: Arc::new(std::sync::Mutex::new(HashMap::new())),
    timeout: std::time::Duration::from_secs(30),
    pgp_home: pgp_home.as_ref().to_path_buf(),
    }
    }
    /// Retrieve the store path this client was created with.
    pub fn store_path(&self) -> &std::path::Path {
    &self.store_path
    }
    fn client(&self) -> &reqwest::Client {
    self.c.get_or_create(|| {
    reqwest::ClientBuilder::new()
    .read_timeout(self.timeout)
    .build()
    .unwrap()
    })
    }
    /// Like `tokio::process::Command::new`, but where `cmd` is taken
    /// from inside the supplied package. This decompresses the
    /// package.
    pub async fn command<R: Into<std::borrow::Cow<'static, str>>, P: AsRef<std::path::Path>>(
    &self,
    release: R,
    pkg: &str,
    cmd: P,
    ) -> Command {
    let h = self.in_release(release.into()).await.unwrap();
    let arch = if cfg!(target_arch = "x86_64") {
    "amd64"
    } else if cfg!(target_arch = "aarch64") {
    "aarch64"
    } else {
    unreachable!()
    };
    let (main, universe) = tokio::join!(
    self.packages(&h, "main", arch),
    self.packages(&h, "universe", arch)
    );
    let main = main.unwrap();
    let universe = universe.unwrap();
    let index = vec![
    deb::Index::open(&main).unwrap(),
    deb::Index::open(&universe).unwrap(),
    ];
    let p = extract::download_extract_deps(&index, self, pkg, &[], DummySink {})
    .await
    .unwrap();
    debug!("running {:?}", p.result.last().unwrap().join(&cmd));
    Command::new(p.result.last().unwrap().join(&cmd))
    }
    /// This locks a store path in order to avoid races to write to
    /// the same store path.
    ///
    /// For performance and portability reasons, this is implemented
    /// using in-memory Mutexes rather than using filesystem
    /// locks. This process is the only one allowed to write to the
    /// store anyway.
    pub async fn lock_store_path<P: AsRef<std::path::Path>>(&self, path: P) -> StoreLock {
    debug!("locking store path {:?}", path.as_ref());
    let p = path.as_ref().to_path_buf();
    let mutex = {
    let mut locks = self.store_locks.lock().unwrap();
    locks
    .entry(p.clone())
    .or_insert_with(|| Arc::new(Mutex::new(())))
    .clone()
    };
    let lock = mutex.lock_owned().await;
    debug!("lock acquired: {:?}", p);
    StoreLock {
    locks: self.store_locks.clone(),
    p,
    _lock: lock,
    }
    }
    /// Download a list of packages.
    pub async fn packages(
    &self,
    in_release: &InRelease,
    component: &str,
    arch: &str,
    ) -> Result<PathBuf, Error> {
    let end = format!("{component}/binary-{arch}/Packages");
    let end_xz = format!("{component}/binary-{arch}/Packages.xz");
    let (size_xz, expected_hash_xz) = in_release.hashed.get(&end_xz).expect(&format!("{} not found in InRelease", end_xz));
    let (size, expected_hash) = in_release.hashed.get(&end).expect(&format!("{} not found in InRelease", &end));
    let expected_xz = data_encoding::BASE32_DNSSEC.encode(&*expected_hash_xz);
    let expected = data_encoding::BASE32_DNSSEC.encode(&*expected_hash);
    std::fs::create_dir_all(&self.store_path)?;
    let expected_path = self.store_path.join(&expected);
    if let Ok(_) = std::fs::metadata(&expected_path) {
    return Ok(expected_path);
    }
    let ref m = self.mirror;
    let ref release = in_release.release;
    let r = {
    let client = self.client();
    let permit = self.download_sem.acquire().await.unwrap();
    let r = client
    .get(&format!(
    "{m}/dists/{release}/{component}/binary-{arch}/Packages.xz"
    ))
    .send()
    .await?;
    drop(permit);
    r
    };
    let file_xz = NamedTempFile::new_in(&self.store_path)?;
    let file = NamedTempFile::new_in(&self.store_path)?;
    let mut xz_dec = xz2::write::XzDecoder::new(HashedWriter {
    w: std::fs::File::create(&file)?,
    hasher: sha2::Sha256::new(),
    });
    let mut f = tokio::fs::File::create(&file_xz).await?;
    let mut r = r.bytes_stream();
    let mut hasher_xz = sha2::Sha256::new();
    warn!("{:?}", expected_hash_xz);
    let mut total = 0;
    while let Some(item) = r.next().await {
    let item = item?;
    total += item.len();
    {
    // we can't use write_all here, since lzma has padding
    // at the end of the file and the xz2 crate rejects
    // the padding.
    use std::io::Write;
    let mut i = 0;
    loop {
    warn!("{:?}", &item[i..]);
    let n = xz_dec.write(&item[i..]);
    warn!("{:?}", n);
    let n = n?;
    if n == 0 {
    break;
    } else {
    i += n
    }
    }
    }
    hasher_xz.update(&item);
    f.write_all(&item).await?;
    }
    if total != *size_xz || xz_dec.total_out() != *size as u64 {
    return Err(Error::WrongSize);
    }
    let w = xz_dec.finish().unwrap();
    let got_xz = &hasher_xz.finalize()[..];
    let got = &w.hasher.finalize()[..];
    warn!("haha: {:?}", expected_hash_xz);
    if got_xz == expected_hash_xz && got == expected_hash {
    let mut expected_path_xz = self.store_path.join(&expected_xz);
    expected_path_xz.set_extension("xz");
    file_xz.persist(&expected_path_xz)?;
    file.persist(&expected_path)?;
    Ok(expected_path)
    } else {
    Err(Error::WrongHash {
    expected: expected_xz,
    got: data_encoding::HEXLOWER.encode(&got),
    })
    }
    }
    /// Download the "InRelease" file of the index, and verify the signatures.
    pub async fn in_release<R: Into<std::borrow::Cow<'static, str>>>(
    &self,
    release: R,
    ) -> Result<Arc<InRelease>, Error> {
    let ref m = self.mirror;
    let release = release.into();
    debug!("downloading in_release {:?}", release);
    let mut ubuntu = self.store_path.join("ubuntu");
    std::fs::create_dir_all(&ubuntu)?;
    ubuntu.push(&*release);
    let mut lock = self.in_release.lock().await;
    if let Some(l) = (&*lock).get(&release) {
    debug!("in_release: lock");
    return Ok(l.clone());
    }
    debug!("open {:?}", ubuntu);
    let f = tokio::fs::File::open(&ubuntu).await;
    if let Ok(f) = f {
    let r = Arc::new(self.read_in_release(release.clone(), f).await?);
    lock.insert(release.clone(), r.clone());
    Ok(r)
    } else {
    debug!("downloading InRelease to {:?}", ubuntu);
    let store_lock = self.lock_store_path(&ubuntu).await;
    let r = self
    .client()
    .get(&format!("{m}/dists/{release}/InRelease"))
    .send()
    .await?;
    let tmp = async_tempfile::TempFile::new_in(ubuntu.parent().unwrap()).await?;
    // Sequoia wants to extract the verification code from
    // their binary and make it available as a lib, but
    // haven't done so as of June 2025.
    let mut com = Command::new("sq")
    .env("HOME", &self.pgp_home)
    .arg("verify")
    .arg("--overwrite")
    .arg(&format!("--output={}", tmp.file_path().to_str().unwrap()))
    .arg("--message")
    .stdin(Stdio::piped())
    .spawn()?;
    let mut i = com.stdin.take().unwrap();
    let mut r = r.bytes_stream();
    // Here we know `sq` doesn't start writing until consuming all
    // its stdin.
    while let Some(item) = r.next().await {
    i.write_all(&item?).await?;
    }
    drop(i);
    let status = com.wait().await?;
    let result = if status.success() {
    self.read_in_release(
    release.clone(),
    tokio::fs::File::open(&tmp.file_path()).await?,
    )
    .await
    } else {
    Err(Error::Signature)
    };
    let r = Arc::new(result?);
    lock.insert(release, r.clone());
    drop(store_lock);
    tokio::fs::rename(&tmp.file_path(), &ubuntu).await?;
    std::mem::forget(tmp);
    Ok(r)
    }
    }
    /// Read an already downloaded InRelease file, without re-checking
    /// the signatures.
    pub async fn read_in_release<R: tokio::io::AsyncRead + Unpin>(
    &self,
    release: std::borrow::Cow<'static, str>,
    o: R,
    ) -> Result<InRelease, Error> {
    let mut b = tokio::io::BufReader::new(o).lines();
    let mut result = HashMap::new();
    let mut recording = false;
    while let Some(l) = b.next_line().await? {
    if recording && l.starts_with(" ") {
    let mut s = l.split(' ').filter(|x| !x.is_empty());
    let hash = s.next().unwrap();
    let mut hash_ = [0; 32];
    data_encoding::HEXLOWER_PERMISSIVE
    .decode_mut(hash.as_bytes(), &mut hash_)
    .unwrap();
    let size = s.next().unwrap();
    let name = s.next().unwrap();
    result.insert(name.to_string(), (size.parse().unwrap(), hash_));
    } else {
    recording = l == "SHA256:";
    }
    }
    Ok(InRelease {
    release,
    hashed: result,
    })
    }
    fn url(&self, file_name: Option<&str>) -> String {
    let ref m = self.mirror;
    let filename = file_name.unwrap();
    format!("{m}/{filename}")
    }
    /// Download the given URL and check the hash, moving the
    /// resulting dir atomically.
    async fn download_url<'a, S: futures::Sink<extract::Msg> + Unpin>(
    &self,
    url: &str,
    sha256: &str,
    mut tx: S,
    ) -> Result<(Downloaded, bool), Error>
    where
    S::Error: std::fmt::Debug,
    {
    let base = url.split('/').last().unwrap();
    let path = self.store_path.join(&format!("{}-{}", sha256, base));
    if tokio::fs::metadata(&path).await.is_ok() {
    debug!("is_ok {:?}", path);
    return Ok((Downloaded { path }, false));
    }
    let lock = self.lock_store_path(path.clone()).await;
    tx.send(extract::Msg::Downloading(url.to_string()))
    .await
    .unwrap();
    info!("download url {:?} {:?}", url, sha256);
    const MAX_ATTEMPTS: usize = 10;
    for i in 1..=MAX_ATTEMPTS {
    match self.try_download(url, sha256, &path).await {
    Ok(()) => break,
    Err(e) if i == MAX_ATTEMPTS => return Err(e),
    Err(e) => {
    error!("attempt {:?} at downloading {:?}: {:?}", i, url, e);
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
    }
    }
    drop(lock); // Ensure lock lives until here.
    Ok((Downloaded { path }, true))
    }
    async fn try_download(
    &self,
    url: &str,
    sha256: &str,
    path: &std::path::Path,
    ) -> Result<(), Error> {
    let r = self.client().get(url).send().await?;
    let mut s = r.bytes_stream();
    tokio::fs::create_dir_all(&self.store_path).await?;
    let mut hasher = sha2::Sha256::new();
    let mut f = async_tempfile::TempFile::new_in(self.store_path.as_path()).await?;
    while let Some(item) = s.next().await {
    let item = item?;
    hasher.update(&item);
    f.write_all(&item).await?;
    }
    f.flush().await?;
    let hash = data_encoding::HEXLOWER_PERMISSIVE.encode(&hasher.finalize());
    if &hash != sha256 {
    return Err(Error::WrongHash {
    expected: sha256.to_string(),
    got: hash,
    });
    }
    tokio::fs::rename(&f.file_path(), &path).await?;
    std::mem::forget(f);
    Ok(())
    }
    }
  • file deletion: main.rs~ (----------)
    [2.15][3.44502:44534](),[3.44534][3.44535:44535]()
    use clap::*;
    use elpe::*;
    use serde_derive::*;
    use std::path::PathBuf;
    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
    mod server;
    #[derive(Deserialize)]
    pub struct Config {
    pgp_home: PathBuf,
    store_path: PathBuf,
    package_index: String,
    user: String,
    }
    /// Elpe
    #[derive(Parser, Debug)]
    #[command(version, about, long_about = None)]
    struct Args {
    /// Path to the configuration file
    #[arg(short, long)]
    config: PathBuf,
    }
    fn main() {
    tracing_subscriber::registry()
    .with(
    tracing_subscriber::EnvFilter::try_from_default_env()
    .unwrap_or_else(|_| String::new().into()),
    )
    .with(tracing_subscriber::fmt::layer())
    .init();
    let args = Args::parse();
    let config: Config = toml::from_str(&std::fs::read_to_string(&args.config).unwrap()).unwrap();
    let container_channel = crate::container::serve(&config.user, &config.store_path);
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async move {
    privdrop::PrivDrop::default()
    .user(&config.user)
    .apply()
    .unwrap();
    let elpe = server::Elpe::new(
    Client::new(&config.pgp_home, &config.store_path, &config.package_index),
    container_channel,
    );
    let addr = "0.0.0.0:50051".parse().unwrap();
    elpe.serve(addr).await
    })
    }
  • file deletion: server.rs~ (----------)
    [2.15][3.45955:45989](),[3.45989][3.45990:45990]()
    use elpe::extract::*;
    use elpe::*;
    use std::sync::Arc;
    use tokio::io::AsyncWriteExt;
    use tokio_stream::wrappers::ReceiverStream;
    use tokio_util::sync::PollSender;
    use tonic::codegen::tokio_stream::StreamExt;
    use tracing::*;
    pub struct Elpe {
    deb_client: elpe::Client,
    sender: tokio::sync::mpsc::UnboundedSender<(
    crate::container::BuildRequest,
    tokio::sync::mpsc::Sender<crate::container::Msg>,
    )>,
    t: Option<tokio::task::JoinHandle<Result<(), elpe::Error>>>,
    }
    pub mod proto {
    tonic::include_proto!("elpe");
    }
    impl Elpe {
    pub fn new(
    deb_client: elpe::Client,
    container_channel: elpe::container::ContainerChannel,
    ) -> Self {
    let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(
    crate::container::BuildRequest,
    tokio::sync::mpsc::Sender<crate::container::Msg>,
    )>();
    let t = tokio::spawn(crate::container::forward(receiver, container_channel));
    Elpe {
    deb_client,
    sender,
    t: Some(t),
    }
    }
    pub async fn serve(mut self, addr: std::net::SocketAddr) {
    let t = self.t.take().unwrap();
    tokio::select! {
    _ = tonic::transport::Server::builder()
    .add_service(proto::elpe_server::ElpeServer::new(self))
    .serve(addr)
    => {}
    _ = t => {}
    }
    }
    }
    use std::pin::Pin;
    type ResponseStream =
    Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>;
    #[tonic::async_trait]
    impl proto::elpe_server::Elpe for Elpe {
    async fn handshake(
    &self,
    request: tonic::Request<proto::Empty>,
    ) -> Result<tonic::Response<proto::PlatformReply>, tonic::Status> {
    Ok(proto::PlatformReply {
    endianness: if cfg!(target_endian = "big") { 0 } else { 1 },
    pointer_width: std::mem::size_of::<usize>() as i32,
    arch: match std::env::consts::ARCH {
    "x86_64" => 0,
    "aarch64" => 1,
    _ => unimplemented!(),
    },
    }
    .into())
    }
    async fn add_path(
    &self,
    request: tonic::Request<tonic::Streaming<proto::AddPathRequest>>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
    let mut r = request.into_inner();
    debug!("add_path");
    let mut current_file = None;
    let ref store = self.deb_client.store_path();
    debug!("store");
    let tmp_dir = tempfile::tempdir_in(store).unwrap();
    debug!("store {:?}", tmp_dir);
    let mut hasher = blake3::Hasher::new();
    debug!("loop");
    loop {
    trace!("waiting for next in stream");
    let Some(r) = r.next().await else { break };
    let r = r.unwrap();
    match r.request {
    Some(proto::add_path_request::Request::File(f)) => {
    info!("Adding file {:?}", f.name);
    hasher.update(b"\0f");
    hasher.update(f.name.as_bytes());
    hasher.update(b"\0");
    let p = tmp_dir.path().join(&f.name);
    tokio::fs::create_dir_all(p.parent().unwrap()).await?;
    current_file = Some(tokio::fs::File::create(&p).await?)
    }
    Some(proto::add_path_request::Request::Directory(d)) => {
    info!("Adding file {:?}", d.name);
    hasher.update(b"\0d");
    hasher.update(d.name.as_bytes());
    hasher.update(b"\0");
    let p = tmp_dir.path().join(&d.name);
    tokio::fs::create_dir_all(&p).await.unwrap();
    }
    Some(proto::add_path_request::Request::Contents(c)) => {
    hasher.update(&c.content);
    current_file.as_mut().unwrap().write_all(&c.content).await?;
    }
    None => break,
    }
    }
    debug!("loop done");
    let path = store.join(data_encoding::HEXLOWER.encode(hasher.finalize().as_bytes()));
    use tokio::io::ErrorKind;
    let new = tmp_dir.into_path();
    match tokio::fs::rename(&new, &path).await {
    Ok(()) => (),
    Err(e) if e.kind() == ErrorKind::DirectoryNotEmpty => (),
    Err(e) => {
    tokio::fs::remove_dir_all(&new).await?;
    return Err(e.into());
    }
    }
    info!("add_path extracted to {:?}", path);
    Ok(tonic::Response::new(proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: vec![path.to_str().unwrap().to_string()],
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    }))
    }
    type DerivationStream = ResponseStream;
    async fn derivation(
    &self,
    request: tonic::Request<proto::DerivationRequest>,
    ) -> Result<tonic::Response<ResponseStream>, tonic::Status> {
    debug!("derivation request");
    let now = std::time::Instant::now();
    let r = request.into_inner();
    let (tx, rx) = tokio::sync::mpsc::channel(200);
    debug!("derivation request: {:?} {:?}", r.name, r.paths);
    self.sender
    .send((
    crate::container::BuildRequest {
    name: r.name.clone(),
    paths: r.paths,
    script: r.builder,
    target: r.target,
    output_hash: r.output_hash,
    },
    tx,
    ))
    .unwrap();
    use crate::container::Msg;
    let output_stream = ReceiverStream::new(rx).map(|x| {
    Ok(match x {
    Msg::Ok(out) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: out
    .into_iter()
    .map(|x| x.to_str().unwrap().to_string())
    .collect(),
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    },
    Msg::Error(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Error(p)),
    },
    Msg::Stdout(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Stdout(p)),
    },
    Msg::Stderr(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Stderr(p)),
    },
    })
    });
    info!("request {:?}: {:?}", r.name, now.elapsed());
    Ok(tonic::Response::new(Box::pin(output_stream)))
    }
    async fn add_url(
    &self,
    request: tonic::Request<proto::AddUrlRequest>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
    let r = request.into_inner();
    debug!("add_url request {:?}", r);
    let p = self
    .deb_client
    .http_download(
    &r.url,
    match r.hash_algorithm {
    0 => {
    let mut h = [0; 32];
    h.clone_from_slice(&r.hash);
    Hash::Blake3(h)
    }
    1 => {
    let mut h = [0; 32];
    h.clone_from_slice(&r.hash);
    Hash::Sha256(h)
    }
    2 => {
    let mut h = [0; 64];
    h.clone_from_slice(&r.hash);
    Hash::Sha512(h)
    }
    _ => unreachable!(),
    },
    )
    .await
    .unwrap();
    Ok(tonic::Response::new(proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: vec![p.to_str().unwrap().to_string()],
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    }))
    }
    async fn ubuntu_release(
    &self,
    request: tonic::Request<proto::UbuntuReleaseRequest>,
    ) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {
    debug!("ubuntu release {:?}", request);
    let now = std::time::Instant::now();
    let r = request.into_inner();
    let h = self.deb_client.in_release(r.release.clone()).await.unwrap();
    let arch = match r.arch {
    0 => "amd64",
    1 => "arm64",
    _ => unreachable!(),
    };
    let p = self
    .deb_client
    .packages(&h, &r.repository, arch)
    .await
    .expect(&format!("{} {}", &r.repository,&arch));
    info!(
    "ubuntu release request {:?}: {:?}",
    r.release,
    now.elapsed()
    );
    Ok(tonic::Response::new(proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: vec![p.to_str().unwrap().to_string()],
    paths: Vec::new(),
    path_patterns: Vec::new(),
    },
    )),
    }))
    }
    type UbuntuPackageStream = ResponseStream;
    async fn ubuntu_package(
    &self,
    request: tonic::Request<proto::UbuntuPackageRequest>,
    ) -> Result<tonic::Response<Self::UbuntuPackageStream>, tonic::Status> {
    let now = std::time::Instant::now();
    let r = request.into_inner();
    let index: Result<Vec<_>, _> = r
    .index
    .into_iter()
    .map(|index| deb::Index::open(&index))
    .collect();
    let index = index.unwrap();
    let link_extra: Vec<_> = r
    .link_extra
    .into_iter()
    .map(|l| {
    (
    regex::Regex::new(&l.pkg).unwrap(),
    regex::Regex::new(&l.dep).unwrap(),
    )
    })
    .collect();
    let (tx, rx) = tokio::sync::mpsc::channel(200);
    use crate::extract::Msg;
    let output_stream = ReceiverStream::new(rx).map(|x| {
    Ok(match x {
    Msg::Downloading(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Loading(p)),
    },
    Msg::Ok(p) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Ok(
    proto::DerivationResult {
    destdir: p
    .result
    .iter()
    .rev()
    .map(|x| x.to_str().unwrap().to_string())
    .collect(),
    paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),
    path_patterns: Vec::new(),
    },
    )),
    },
    Msg::Error(e) => proto::DerivationReply {
    result: Some(proto::derivation_reply::Result::Error(e.to_string())),
    },
    })
    });
    match download_extract_deps(
    &index,
    &self.deb_client,
    &r.name,
    &link_extra,
    PollSender::new(tx.clone()),
    )
    .await
    {
    Ok(p) => {
    info!("path {:?} {:#?}", r.name, p);
    tx.send(Msg::Ok(p)).await.unwrap();
    }
    Err(e) => tx.send(Msg::Error(e)).await.unwrap(),
    }
    info!("ubuntu package request {:?}: {:?}", r.name, now.elapsed());
    Ok(tonic::Response::new(Box::pin(output_stream)))
    }
    }