2GMRXGVB4TIODHNVO53MLMSVTZW4TVWITZAGPFIH5CCHVHLOGHYAC use lazy_static::lazy_static;use std::collections::HashMap;use std::ffi::OsStr;use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};use thiserror::*;use tracing::*;mod index;pub use index::*;/// A parsed .deb file.#[derive(Debug)]pub struct Deb {md5sums: HashMap<String, String>,control_file: String,_global: Header,_control: Header,data: Header,data_offset: u64,}/// A "stanza", i.e. a description of a Debian package.#[derive(Debug, Default, Clone)]pub struct Stanza<'a> {#[allow(missing_docs)]pub package: &'a str,#[allow(missing_docs)]pub version: Version<'a>,#[allow(missing_docs)]pub architecture: &'a str,#[allow(missing_docs)]pub depends: Vec<Dep<'a>>,#[allow(missing_docs)]pub sha256: Option<&'a str>,#[allow(missing_docs)]pub size: Option<usize>,/// Path of the .deb file inside an index.pub file_name: Option<&'a str>,}/// A dependency, which is either a "simple" dependency, or a list of/// alternatives.#[derive(Debug, Clone)]pub enum Dep<'a> {/// A simple dependency.Simple(SimpleDep<'a>),/// A dependency that can be supplied by multiple packages. This/// is an alternative mechanism to virtual packages.Alternatives {#[allow(missing_docs)]alt: Vec<SimpleDep<'a>>,},}/// A single dependency.#[derive(Debug, Clone)]pub struct SimpleDep<'a> {/// Name of the dependency.pub name: &'a str,/// Whether this dependency can have any version.pub any: bool,/// Constraints on dependencies.pub constraints: Vec<(&'a str, Version<'a>)>,}fn parse_control(s: &str) -> IResult<&str, Stanza> {let mut st = Stanza::default();for l in s.lines() {if l.is_empty() {let (_, b) = s.split_at(l.as_ptr() as usize - s.as_ptr() as usize);return Ok((b, st));}if let Some(i) = l.find(':') {let (key, val) = l.split_at(i);let (_, val) = val.split_at(1);debug!("key {:?} {:?}", key, val);match key.trim() {"Package" => st.package = val.trim(),"Version" => st.version = parse_version(val.trim()).unwrap().1,"Architecture" => st.architecture = val.trim(),"SHA256" => st.sha256 = Some(val.trim()),"Filename" => st.file_name = Some(val.trim()),"Depends" | "Pre-Depends" => {let d = parse_deps(val.trim());debug!("{:?}", d);if let Ok(("", a)) = d {st.depends = a} else {error!("error {:?}", val.trim());return IResult::Err(nom::Err::Error(nom::error::make_error(s,nom::error::ErrorKind::Tag,)));}}_ => {}}}}Ok(("", st))}#[test]fn test_grub_common() {use tracing_subscriber::prelude::*;use tracing_subscriber::util::SubscriberInitExt;tracing_subscriber::registry().with(tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| String::new().into()),).with(tracing_subscriber::fmt::layer()).try_init().unwrap();let (rest, p) = parse_control("Package: grub-common\nArchitecture: amd64\nVersion: 2.12-1ubuntu7\nBuilt-Using: lzo2 (= 2.10-2build3)\nMulti-Arch: foreign\nPriority: optional\nSection: admin\nSource: grub2\nOrigin: Ubuntu\nMaintainer: Ubuntu Developers <ubuntu-devel-discuss@lists.ubuntu.com>\nOriginal-Maintainer: GRUB Maintainers <pkg-grub-devel@alioth-lists.debian.net>\nBugs: https://bugs.launchpad.net/ubuntu/+filebug\nInstalled-Size: 12476\nDepends: libc6 (>= 2.38), libdevmapper1.02.1 (>= 2:1.02.36), libefiboot1t64 (>=38), libefivar1t64 (>= 38), libfreetype6 (>= 2.2.1), libfuse3-3 (>= 3.2.3), liblzma5 (>= 5.1.1alpha+20120614), debconf (>= 0.5) | debconf-2.0, gettext-base, lsb-base (>= 3.0-6), python3, python3-apt\nRecommends: os-prober (>= 1.33)\nSuggests: multiboot-doc, grub-emu, mtools, xorriso (>= 0.5.6.pl00), desktop-base (>= 4.0.6), console-setup\nConflicts: init-select\nBreaks: apport (<< 2.1.1), friendly-recovery (<< 0.2.13), lupin-support (<< 0.55), mdadm (<< 2.6.7-2)\nReplaces: grub-coreboot (<< 2.00-4), grub-efi (<< 1.99-1), grub-efi-amd64 (<< 2.00-4), grub-efi-ia32 (<< 2.00-4), grub-efi-ia64 (<< 2.00-4), grub-ieee1275 (<< 2.00-4), grub-linuxbios (<< 1.96+20080831-1), grub-pc (<< 2.00-4), grub-yeeloong (<< 2.00-4), init-select\nFilename: pool/main/g/grub2/grub-common_2.12-1ubuntu7_amd64.deb\nSize: 2119862\nMD5sum: 86e63081ae4ee34d6a4f408ac6dbf0e4\nSHA1: b98f9f88e5480423e041fd647b438ba2f6e6f5ea\nSHA256: d8110b97b6fb6da40449c74b9cb527f02965dffaae8344399a711617f5a69249\nSHA512: 85c63b8d3e6a870df48533ab525df13abe9ec4861ff4b5577def94f65a2ebf6ac44a54a6cd0eca1c825e1c7aa2bd14e4d4ed53f83d381963ac1dc51daa16482a\nHomepage: https://www.gnu.org/software/grub/\nDescription: GRand Unified Bootloader (common files)\nTask: ubuntu-desktop-minimal, ubuntu-desktop, ubuntu-desktop-raspi, kubuntu-desktop, xubuntu-minimal, xubuntu-desktop, lubuntu-desktop, ubuntustudio-desktop-core, ubuntustudio-desktop, ubuntukylin-desktop,ubuntukylin-desktop-minimal, ubuntu-mate-core, ubuntu-mate-desktop, ubuntu-budgie-desktop-minimal, ubuntu-budgie-desktop, ubuntu-budgie-desktop-raspi, ubuntu-unity-desktop, edubuntu-desktop-gnome-minimal, edubuntu-desktop-gnome-raspi, ubuntucinnamon-desktop-minimal, ubuntucinnamon-desktop-raspi\nDescription-md5: 9c75036dc0a0792fedbc58df208ed227").unwrap();assert!(p.depends.iter().any(|x| match x {Dep::Simple(s) => s.name == "liblzma5",_ => false,}));assert!(rest.is_empty())}/// A version of a package.#[derive(Debug, Clone)]pub struct Version<'a> {/// Epoch, i.e. a Debian mechanism to reorder upstream versions if/// necessary, for example if the upstream versioning scheme/// changes.pub epoch: u32,/// Upstream version.pub upstream_version: Upstream<'a>,/// Debian-specific suffix to distinguish between patches.pub debian: Option<&'a str>,}impl<'a> Default for Version<'a> {fn default() -> Version<'a> {Version {epoch: 0,upstream_version: Upstream(""),debian: None,}}}fn parse_constraint<'a>(s: &'a str) -> IResult<&'a str, (&'a str, Version<'a>)> {let (s, constraint) = tag("<<").or(tag("<=")).or(tag("=")).or(tag(">=")).or(tag(">>")).parse(s)?;debug!("parse_constraint {:?}", constraint);let (s, _) = space0(s)?;let (s, version) = parse_version(s)?;debug!("parse_constraint {:?} {:?}", s, (constraint, &version));Ok((s, (constraint, version)))}fn parse_dep<'a>(s0: &'a str) -> IResult<&'a str, Dep<'a>> {let (s, name) = parse_name(s0)?;let (s, any) = if s.starts_with(":any") {let (_, b) = s.split_at(4);(b, true)} else {(s, false)};let (s, _) = space0(s)?;let mut constraints = Vec::new();if let Ok((s, _)) = tag::<_, _, ()>("(").parse(s) {let (s, version) = if let Some(i) = s.find(')') {let (a, b) = s.split_at(i);let (_, b) = b.split_at(1);(b, a)} else {return IResult::Err(nom::Err::Error(nom::error::make_error(s,nom::error::ErrorKind::Tag,)));};let mut v = version;while let Ok((v_, c)) = parse_constraint(v) {constraints.push(c);let (v_, _) = space0(v_)?;if let Ok((v_, _)) = tag::<_, _, ()>(",").parse(v_) {let (v_, _) = space0(v_)?;v = v_;} else {break;}}Ok((s,Dep::Simple(SimpleDep {name,any,constraints,}),))} else {Ok((s,Dep::Simple(SimpleDep {name,any,constraints,}),))}}fn parse_name(s: &str) -> IResult<&str, &str> {if let Some(i) = s.find(|c| {!((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '.' || c == '+' || c == '-')}) {if i > 0 {let (a, b) = s.split_at(i);return Ok((b, a));}} else {return Ok(("", s));}error!("ERROR {:?}", s);return IResult::Err(nom::Err::Error(nom::error::make_error(s,nom::error::ErrorKind::Tag,)));}fn parse_deps<'a>(s: &'a str) -> IResult<&'a str, Vec<Dep<'a>>> {let mut d = Vec::new();let (mut s, _) = space0(s)?;let mut alt_is_open = false;while let Ok((s_, d_)) = parse_dep(s) {let (s_, _) = space0(s_)?;if let Ok((s_, _)) = tag::<_, _, ()>("|").parse(s_) {let (s_, _) = space0(s_)?;s = s_;alt_is_open = true;if let Dep::Simple(d_) = d_ {if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {alt.push(d_)} else {d.push(Dep::Alternatives { alt: vec![d_] })}} else {panic!("{:?}", s);}} else if let Ok((s_, _)) = tag::<_, _, ()>(",").parse(s_) {if alt_is_open {if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {if let Dep::Simple(d_) = d_ {alt.push(d_)} else {panic!("{:?}", s);}}} else {d.push(d_)}alt_is_open = false;s = s_;} else {if alt_is_open {if let Some(Dep::Alternatives { ref mut alt }) = d.last_mut() {if let Dep::Simple(d_) = d_ {alt.push(d_)} else {panic!("{:?}", s);}}} else {d.push(d_)}s = s_;break;}let (s_, _) = space0(s)?;s = s_;}Ok((s, d))}/// The representation of an upstream version. Will implement `Ord`.#[derive(Debug, Clone, PartialEq, Eq)]pub struct Upstream<'a>(&'a str);impl<'a> std::ops::Deref for Upstream<'a> {type Target = str;fn deref(&self) -> &str {self.0}}impl<'a> PartialOrd for Upstream<'a> {/// Following the algorithm described here:/// https://www.debian.org/doc/debian-policy/ch-controlfields.html#versionfn partial_cmp(&self, b: &Upstream<'a>) -> Option<std::cmp::Ordering> {fn split_initial(s: &str) -> (&str, &str, &str) {// Find `u`, the first digit, and `v > u`, the first non-digit after `u`.let mut u = s.len();let mut v = s.len();for (n, &b) in s.as_bytes().iter().enumerate() {if u == s.len() {if b >= b'0' && b <= b'9' {u = n}} else if b < b'0' || b > b'9' {v = n;break;}}let (ab, c) = s.split_at(v);let (a, b) = ab.split_at(u);(a, b, c)}let mut a = self.0;let mut b = b.0;use std::cmp::Ordering;loop {let (a0, a1, a_) = split_initial(a);let (b0, b1, b_) = split_initial(b);// Compare a0 / b0 using the modified ASCII ordering described in the Debian manual.let mut a0i = a0.as_bytes().iter();let mut b0i = b0.as_bytes().iter();loop {match (a0i.next(), b0i.next()) {(None, None) => break,(Some(c), Some(d)) if c == d => continue,(Some(b'~'), _) => return Some(Ordering::Less),(_, Some(b'~')) => return Some(Ordering::Greater),(Some(c), Some(d)) if c.is_ascii_alphabetic() && !d.is_ascii_alphabetic() => {return Some(Ordering::Less);}(Some(c), Some(d)) if d.is_ascii_alphabetic() && !c.is_ascii_alphabetic() => {return Some(Ordering::Greater);}(c, d) => return Some(c.cmp(&d)),}}// If we haven't returned, a0 == b0. Compare the digits part.let a1 = if a1.is_empty() {0} else {a1.parse::<u64>().unwrap()};let b1 = if b1.is_empty() {0} else {b1.parse::<u64>().unwrap()};let cmp1 = a1.cmp(&b1);if let Ordering::Equal = cmp1 {a = a_;b = b_;} else {return Some(cmp1);}}}}impl<'a> Ord for Upstream<'a> {fn cmp(&self, b: &Upstream<'a>) -> std::cmp::Ordering {self.partial_cmp(b).unwrap()}}#[test]fn test_upstream_ordering() {let mut x = [Upstream("~~"),Upstream("~~a"),Upstream("~"),Upstream(""),Upstream("a"),];let y = x.clone();x.sort();assert_eq!(x, y,);}use nom::{bytes::tag,character::complete::{digit1, newline, space0},multi::separated_list1,IResult, Parser,};/// Parse a version in the Debian version format.fn parse_version<'a>(s: &'a str) -> IResult<&'a str, Version<'a>> {fn epoch(s: &str) -> IResult<&str, u32> {let (s, i) = digit1(s)?;let (s, _) = tag(":").parse(s)?;Ok((s, i.parse().unwrap()))}let (s, epoch) = epoch(s).unwrap_or((s, 0));lazy_static! {static ref RE_DASH: regex::Regex =regex::Regex::new(r"^[a-z0-9\.~+-]+-([a-z0-9\.~+]+)").unwrap();static ref RE: regex::Regex = regex::Regex::new(r"^[a-z0-9\.~+]+").unwrap();}let (s, v) = if let Some(m) = RE_DASH.find(s) {debug!("parse_version RE_DASH {:?}", s);let (a, b) = s.split_at(m.end());(b, a)} else if let Some(m) = RE.find(s) {debug!("parse_version RE {:?}", s);let (a, b) = s.split_at(m.end());(b, a)} else {debug!("parse_version failed {:?}", s);return IResult::Err(nom::Err::Error(nom::error::make_error(s,nom::error::ErrorKind::Tag,)));};let (upstream_version, debian) = if let Some(i) = v.rfind('-') {let (a, b) = v.split_at(i);(Upstream(a), Some(b.split_at(1).1))} else {(Upstream(v), None)};Ok((s,Version {epoch,upstream_version,debian,},))}/// Structure of the "headers" in the debian file. No alignment/// necessary, the numbers are in decimal. No heap allocation.#[derive(Debug)]#[repr(C)]struct H {file_id: [u8; 16],timestamp: [u8; 12],owner_id: [u8; 6],group_id: [u8; 6],file_mode: [u8; 8],file_size: [u8; 10],end_char: [u8; 2],}/// Parsed version of `H`, still no heap allocation.#[allow(dead_code)]#[derive(Debug)]struct Header {file_id: [u8; 16],timestamp: u64,owner_id: u32,group_id: u32,file_mode: u32,file_size: u64,}#[allow(missing_docs)]/// Possible errors for .deb files.#[derive(Debug, Error)]pub enum Error {#[error("Deb file parse error")]ParseError,#[error(transparent)]Utf8(#[from] std::str::Utf8Error),#[error(transparent)]Int(#[from] std::num::ParseIntError),#[error(transparent)]IO(#[from] std::io::Error),#[error("Unsupported compression algorithm: {file_id}")]UnsupportedCompression { file_id: String },#[error("Version parse: {version}")]Version { version: String },}impl Header {pub fn file_id(&self) -> &str {std::str::from_utf8(&self.file_id).unwrap().trim()}}impl H {fn parse(&self) -> Result<Header, Error> {std::str::from_utf8(&self.file_id)?;Ok(Header {file_id: self.file_id,timestamp: std::str::from_utf8(&self.timestamp)?.trim().parse()?,owner_id: std::str::from_utf8(&self.owner_id)?.trim().parse()?,group_id: std::str::from_utf8(&self.group_id)?.trim().parse().unwrap(),file_mode: u32::from_str_radix(std::str::from_utf8(&self.file_mode)?.trim(), 8)?,file_size: std::str::from_utf8(&self.file_size)?.trim().parse()?,})}}fn read_hdr<R: Read>(r: &mut R) -> Result<Header, Error> {unsafe {assert_eq!(size_of::<H>(), HEADER_SIZE as usize);let mut b: H = std::mem::zeroed();let pb: *mut H = &mut b;r.read_exact(&mut std::slice::from_raw_parts_mut(pb as *mut u8,size_of::<H>(),))?;debug!("{:?}", b);b.parse()}}const HEADER_SIZE: u64 = 60;impl Deb {/// Parse the control part, containing the description (stanza) of this package.pub fn parse_control(&self) -> Vec<Stanza> {if let Ok(("", st)) = separated_list1(newline, parse_control).parse(&self.control_file) {st} else {Vec::new()}}/// Return the md5sums of each file.pub fn md5sums(&self) -> &HashMap<String, String> {&self.md5sums}/// Decompress the data part of the deb file into a directory.pub fn decompress<R: BufRead + Seek, P: AsRef<std::path::Path>>(&self,mut r: R,path: P,) -> Result<(), Error> {r.seek(SeekFrom::Start(self.data_offset))?;debug!("decompress {:?}", self.data.file_id());if self.data.file_id().ends_with(".tar.gz") {unimplemented!()} else if self.data.file_id().ends_with(".tar.zst") {let mut ar = tar::Archive::new(zstd::stream::read::Decoder::new(r.take(self.data.file_size)).unwrap(),);for e in ar.entries().unwrap() {let mut e = e.unwrap();debug!("zstd decompress {:?}", e.path());// f(&e.path()?);e.unpack_in(&path).unwrap();}} else if self.data.file_id().ends_with(".tar.xz") {let mut ar = tar::Archive::new(xz::read::XzDecoder::new(r.take(self.data.file_size)));for e in ar.entries().unwrap() {let mut e = e.unwrap();debug!("xz decompress {:?}", e.path());// f(&e.path()?);e.unpack_in(&path).unwrap();}} else if self.data.file_id().ends_with(".tar") {let mut ar = tar::Archive::new(r.take(self.data.file_size));for e in ar.entries().unwrap() {let mut e = e.unwrap();debug!("tar extract {:?}", e.path());// f(&e.path()?);e.unpack_in(&path).unwrap();}} else {unimplemented!()};Ok(())}/// Parse a `.deb` file from a buffered reader.pub fn read<R: BufRead + Seek>(mut r: R) -> Result<Deb, Error> {let mut b = [0; 8];r.read_exact(&mut b)?;if &b != b"!<arch>\n" {return Err(Error::ParseError);}let global = read_hdr(&mut r)?;trace!("GLOBAL {:?}", global);assert_eq!(global.file_size, 4);r.seek(SeekFrom::Current(global.file_size as i64))?;// r.skip_until(b'c')?; // 'control.…'// r.seek(SeekFrom::Current(-1))?;let control = read_hdr(&mut r)?;trace!("CONTROL {:?}", control);// Control files are expected to be short, decompress.let mut ctrl = vec![0; control.file_size as usize];r.read_exact(&mut ctrl)?;// gzip xz zstd nonelet ctrl_tar = if control.file_id().ends_with(".tar.gz") {unimplemented!()} else if control.file_id().ends_with(".tar.zst") {zstd::decode_all(&ctrl[..])?} else if control.file_id().ends_with(".tar.xz") {let mut decompressor = xz::read::XzDecoder::new(&ctrl[..]);let mut result = Vec::new();decompressor.read_to_end(&mut result)?;result} else if control.file_id().ends_with(".tar") {ctrl} else {return Err(Error::UnsupportedCompression {file_id: control.file_id().to_string(),});};let mut ctrl_tar = tar::Archive::new(&ctrl_tar[..]);let mut control_file = String::new();let mut md5sums = HashMap::new();for e in ctrl_tar.entries().unwrap() {let mut e = e.unwrap();if e.path()?.file_name() == Some(OsStr::new("control")) {e.read_to_string(&mut control_file)?;} else if e.path()?.file_name() == Some(OsStr::new("md5sums")) {let mut e = BufReader::new(e);let mut s = String::new();while e.read_line(&mut s)? > 0 {let mut it = s.split(' ').filter(|x| !x.is_empty());if let (Some(hash), Some(file)) = (it.next(), it.next()) {md5sums.insert(file.trim().to_string(), hash.trim().to_string());}s.clear();}}}if control.file_size % 2 == 1 {r.seek(SeekFrom::Current(1))?;}let data = read_hdr(&mut r)?;debug!("data = {:?}", data);Ok(Deb {md5sums,_global: global,_control: control,control_file,data,data_offset: r.seek(SeekFrom::Current(0))?,})}}
//! Run bash scripts in containers on Linux, with only deterministic//! Debian or Ubuntu packages in scope.#![deny(missing_docs,trivial_casts,trivial_numeric_casts,unused_import_braces,unused_qualifications)]use futures::SinkExt;use futures::StreamExt;use std::collections::HashMap;use std::path::PathBuf;use std::sync::Arc;use tempfile::NamedTempFile;use thiserror::*;use tokio::sync::{Mutex, Semaphore};use tracing::*;/// Handle Debian packages.pub mod deb;/// Extract a Debian package and all dependencies into the store.pub mod extract;/// Recursive iterator on the files/directories in a path.pub mod find_files;/// Mount and unmount paths.pub mod mount;/// Run scripts in a chroot inside a new Linux namespace, with the/// right directories mounted and symbolically linked to simulate a/// Debian/Ubuntu filesystem hierarchy.pub mod container;mod dummy_sink;use dummy_sink::*;/// Errors#[derive(Debug, Error)]pub enum Error {/// An error from manipulating a Debian package.#[error(transparent)]Deb(#[from] deb::Error),/// IO error#[error(transparent)]IO(#[from] std::io::Error),/// An error from persisting a temporary file.#[error(transparent)]Persist(#[from] tempfile::PersistError),/// An error from creating a temporary file.#[error(transparent)]Tempfile(#[from] async_tempfile::Error),#[error(transparent)]/// An error from `elfedit`, this crate's companion ELF patcher.Elf(#[from] elfedit::Error),/// HTTP errors#[error(transparent)]Reqwest(#[from] reqwest::Error),/// Wrong signature (on an Ubuntu index)#[error("Signature error")]Signature,/// Wrong package hash#[error("Hash: expected {expected}, got {got}.")]WrongHash {/// Expected hash.expected: String,/// Obtained hash.got: String,},/// Wrong package or file size#[error("Wrong size")]WrongSize,/// Wrong result symlink: Elpe packages have one "input name"/// given by the hash of their inputs, and that directory is/// symlinked from the "output name", which is the hash of their/// output. This makes the outputs verifiable all the way down:/// failures to verify result in this error.#[error("Wrong result symlink, expected {expected:?}, got {got:?}")]WrongResultSymlink {/// Expected output.expected: PathBuf,/// Obtained output.got: PathBuf,},/// The build process returned a status other than 0.#[error("Build process returned {status}")]BuildReturn {/// Return status.status: i32,},/// Package not found#[error("Package not found: {pkg}")]PackageNotFound {/// The package name.pkg: String,},}/// A Debian index client.#[derive(Clone)]pub struct Client {c: lazy_init::Lazy<reqwest::Client>,pgp_home: PathBuf,mirror: String,store_path: PathBuf,in_release: Arc<Mutex<HashMap<std::borrow::Cow<'static, str>, Arc<InRelease>>>>,download_sem: Arc<Semaphore>,store_locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,timeout: std::time::Duration,}/// Hashes, used for example in URL downloads.#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum Hash {#[allow(missing_docs)]Blake3([u8; 32]),#[allow(missing_docs)]Sha256([u8; 32]),#[allow(missing_docs)]Sha512([u8; 64]),}/// Hasher for each different hash algorithm.pub enum Hasher {#[allow(missing_docs)]Blake3(blake3::Hasher),#[allow(missing_docs)]Sha256(sha2::Sha256),#[allow(missing_docs)]Sha512(sha2::Sha512),}impl Hash {/// Create a hasher for this hash algorithm.pub fn hasher(&self) -> Hasher {match self {Hash::Blake3(_) => Hasher::Blake3(blake3::Hasher::new()),Hash::Sha256(_) => Hasher::Sha256(sha2::Sha256::new()),Hash::Sha512(_) => Hasher::Sha512(sha2::Sha512::new()),}}}impl Hasher {/// Update the hasher with new bytes.pub fn update(&mut self, b: &[u8]) {match self {Hasher::Blake3(h) => {h.update(b);}Hasher::Sha256(h) => {h.update(b);}Hasher::Sha512(h) => {h.update(b);}}}/// Create the hash, to be compared with the one supplied by the user.pub fn finalize(self) -> Hash {match self {Hasher::Blake3(h) => Hash::Blake3(h.finalize().into()),Hasher::Sha256(h) => Hash::Sha256(h.finalize().into()),Hasher::Sha512(h) => Hash::Sha512(h.finalize().into()),}}}impl std::ops::Deref for Hash {type Target = [u8];fn deref(&self) -> &Self::Target {match self {Hash::Blake3(h) => &h[..],Hash::Sha256(h) => &h[..],Hash::Sha512(h) => &h[..],}}}impl std::ops::DerefMut for Hash {fn deref_mut(&mut self) -> &mut Self::Target {match self {Hash::Blake3(h) => &mut h[..],Hash::Sha256(h) => &mut h[..],Hash::Sha512(h) => &mut h[..],}}}impl Client {/// Download a URL.pub async fn http_download(&self, url: &str, h: Hash) -> Result<PathBuf, Error> {let expected_path = self.store_path.join(data_encoding::BASE32_DNSSEC.encode(&h));if let Ok(mut f) = tokio::fs::File::open(&expected_path).await {use tokio::io::AsyncReadExt;let mut buf = [0; 4096];let mut hasher = h.hasher();while let Ok(n) = f.read(&mut buf).await {if n == 0 {break;}hasher.update(&buf[..n]);}let got = hasher.finalize();return if got == h {Ok(expected_path)} else {Err(Error::WrongHash {expected: data_encoding::HEXLOWER.encode(&h),got: data_encoding::HEXLOWER.encode(&got),})};}let mut r = self.client().get(url).send().await?;let mut t: Option<tokio::task::JoinHandle<Result<tokio::fs::File, tokio::io::Error>>> =None;let file = NamedTempFile::new_in(&self.store_path)?;let mut f = Some(tokio::fs::File::create(&file).await?);let mut hasher = h.hasher();while let Some(chunk) = r.chunk().await? {hasher.update(&chunk);if let Some(t) = t.take() {f = Some(t.await.unwrap().unwrap())}let mut f = f.take().unwrap();t = Some(tokio::spawn(async move {f.write_all(&chunk).await?;Ok(f)}))}let got = hasher.finalize();if got == h {file.persist(&expected_path)?;Ok(expected_path)} else {Err(Error::WrongHash {expected: data_encoding::HEXLOWER.encode(&h),got: data_encoding::HEXLOWER.encode(&got),})}}}/// A locked store path, to prevent concurrent writes to the/// store. For performance reasons, these are memory locks rather than/// filesystem locks.pub struct StoreLock {locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,p: PathBuf,_lock: tokio::sync::OwnedMutexGuard<()>,}impl Drop for StoreLock {fn drop(&mut self) {debug!("store lock: dropping {:?}", self.p);self.locks.lock().unwrap().remove(&self.p);}}#[derive(Debug)]struct Downloaded {path: PathBuf,}use sha2::Digest;use std::process::Stdio;use tokio::io::{AsyncBufReadExt, AsyncWriteExt};use tokio::process::Command;/// Root file of a package index, containing the signed hashes of the package list.#[derive(Debug)]pub struct InRelease {hashed: HashMap<String, (usize, [u8; 32])>,release: std::borrow::Cow<'static, str>,}struct HashedWriter<W> {hasher: sha2::Sha256,w: W,}impl<W: std::io::Write> std::io::Write for HashedWriter<W> {fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {self.hasher.update(b);self.w.write_all(b).unwrap();Ok(b.len())}fn flush(&mut self) -> Result<(), std::io::Error> {self.w.flush()}}const MAX_PARALLEL_DOWNLOADS: usize = 20;impl Client {/// Create a client.pub fn new<P: AsRef<std::path::Path>, Q: AsRef<std::path::Path>>(pgp_home: P,store_path: Q,mirror: &str,) -> Self {std::fs::create_dir_all(store_path.as_ref()).unwrap();Client {c: lazy_init::Lazy::new(),mirror: mirror.to_string(),store_path: store_path.as_ref().to_path_buf(),in_release: Arc::new(HashMap::new().into()),download_sem: Arc::new(Semaphore::new(MAX_PARALLEL_DOWNLOADS)),store_locks: Arc::new(std::sync::Mutex::new(HashMap::new())),timeout: std::time::Duration::from_secs(30),pgp_home: pgp_home.as_ref().to_path_buf(),}}/// Retrieve the store path this client was created with.pub fn store_path(&self) -> &std::path::Path {&self.store_path}fn client(&self) -> &reqwest::Client {self.c.get_or_create(|| {reqwest::ClientBuilder::new().read_timeout(self.timeout).build().unwrap()})}/// Like `tokio::process::Command::new`, but where `cmd` is taken/// from inside the supplied package. This decompresses the/// package.pub async fn command<R: Into<std::borrow::Cow<'static, str>>, P: AsRef<std::path::Path>>(&self,release: R,pkg: &str,cmd: P,) -> Command {let h = self.in_release(release.into()).await.unwrap();let arch = if cfg!(target_arch = "x86_64") {"amd64"} else if cfg!(target_arch = "aarch64") {"aarch64"} else {unreachable!()};let (main, universe) = tokio::join!(self.packages(&h, "main", arch),self.packages(&h, "universe", arch));let main = main.unwrap();let universe = universe.unwrap();let index = vec![deb::Index::open(&main).unwrap(),deb::Index::open(&universe).unwrap(),];let p = extract::download_extract_deps(&index, self, pkg, &[], DummySink {}).await.unwrap();debug!("running {:?}", p.result.last().unwrap().join(&cmd));Command::new(p.result.last().unwrap().join(&cmd))}/// This locks a store path in order to avoid races to write to/// the same store path.////// For performance and portability reasons, this is implemented/// using in-memory Mutexes rather than using filesystem/// locks. This process is the only one allowed to write to the/// store anyway.pub async fn lock_store_path<P: AsRef<std::path::Path>>(&self, path: P) -> StoreLock {debug!("locking store path {:?}", path.as_ref());let p = path.as_ref().to_path_buf();let mutex = {let mut locks = self.store_locks.lock().unwrap();locks.entry(p.clone()).or_insert_with(|| Arc::new(Mutex::new(()))).clone()};let lock = mutex.lock_owned().await;debug!("lock acquired: {:?}", p);StoreLock {locks: self.store_locks.clone(),p,_lock: lock,}}/// Download a list of packages.pub async fn packages(&self,in_release: &InRelease,component: &str,arch: &str,) -> Result<PathBuf, Error> {let end = format!("{component}/binary-{arch}/Packages");let end_xz = format!("{component}/binary-{arch}/Packages.xz");let (size_xz, expected_hash_xz) = in_release.hashed.get(&end_xz).expect(&format!("{} not found in InRelease", end_xz));let (size, expected_hash) = in_release.hashed.get(&end).expect(&format!("{} not found in InRelease", &end));let expected_xz = data_encoding::BASE32_DNSSEC.encode(&*expected_hash_xz);let expected = data_encoding::BASE32_DNSSEC.encode(&*expected_hash);std::fs::create_dir_all(&self.store_path)?;let expected_path = self.store_path.join(&expected);if let Ok(_) = std::fs::metadata(&expected_path) {return Ok(expected_path);}let ref m = self.mirror;let ref release = in_release.release;let r = {let client = self.client();let permit = self.download_sem.acquire().await.unwrap();let r = client.get(&format!("{m}/dists/{release}/{component}/binary-{arch}/Packages.xz")).send().await?;drop(permit);r};let file_xz = NamedTempFile::new_in(&self.store_path)?;let file = NamedTempFile::new_in(&self.store_path)?;let mut xz_dec = xz2::write::XzDecoder::new(HashedWriter {w: std::fs::File::create(&file)?,hasher: sha2::Sha256::new(),});let mut f = tokio::fs::File::create(&file_xz).await?;let mut r = r.bytes_stream();let mut hasher_xz = sha2::Sha256::new();warn!("{:?}", expected_hash_xz);let mut total = 0;while let Some(item) = r.next().await {let item = item?;total += item.len();{// we can't use write_all here, since lzma has padding// at the end of the file and the xz2 crate rejects// the padding.use std::io::Write;let mut i = 0;loop {warn!("{:?}", &item[i..]);let n = xz_dec.write(&item[i..]);warn!("{:?}", n);let n = n?;if n == 0 {break;} else {i += n}}}hasher_xz.update(&item);f.write_all(&item).await?;}if total != *size_xz || xz_dec.total_out() != *size as u64 {return Err(Error::WrongSize);}let w = xz_dec.finish().unwrap();let got_xz = &hasher_xz.finalize()[..];let got = &w.hasher.finalize()[..];warn!("haha: {:?}", expected_hash_xz);if got_xz == expected_hash_xz && got == expected_hash {let mut expected_path_xz = self.store_path.join(&expected_xz);expected_path_xz.set_extension("xz");file_xz.persist(&expected_path_xz)?;file.persist(&expected_path)?;Ok(expected_path)} else {Err(Error::WrongHash {expected: expected_xz,got: data_encoding::HEXLOWER.encode(&got),})}}/// Download the "InRelease" file of the index, and verify the signatures.pub async fn in_release<R: Into<std::borrow::Cow<'static, str>>>(&self,release: R,) -> Result<Arc<InRelease>, Error> {let ref m = self.mirror;let release = release.into();debug!("downloading in_release {:?}", release);let mut ubuntu = self.store_path.join("ubuntu");std::fs::create_dir_all(&ubuntu)?;ubuntu.push(&*release);let mut lock = self.in_release.lock().await;if let Some(l) = (&*lock).get(&release) {debug!("in_release: lock");return Ok(l.clone());}debug!("open {:?}", ubuntu);let f = tokio::fs::File::open(&ubuntu).await;if let Ok(f) = f {let r = Arc::new(self.read_in_release(release.clone(), f).await?);lock.insert(release.clone(), r.clone());Ok(r)} else {debug!("downloading InRelease to {:?}", ubuntu);let store_lock = self.lock_store_path(&ubuntu).await;let r = self.client().get(&format!("{m}/dists/{release}/InRelease")).send().await?;let tmp = async_tempfile::TempFile::new_in(ubuntu.parent().unwrap()).await?;// Sequoia wants to extract the verification code from// their binary and make it available as a lib, but// haven't done so as of June 2025.let mut com = Command::new("sq").env("HOME", &self.pgp_home).arg("verify").arg("--overwrite").arg(&format!("--output={}", tmp.file_path().to_str().unwrap())).arg("--message").stdin(Stdio::piped()).spawn()?;let mut i = com.stdin.take().unwrap();let mut r = r.bytes_stream();// Here we know `sq` doesn't start writing until consuming all// its stdin.while let Some(item) = r.next().await {i.write_all(&item?).await?;}drop(i);let status = com.wait().await?;let result = if status.success() {self.read_in_release(release.clone(),tokio::fs::File::open(&tmp.file_path()).await?,).await} else {Err(Error::Signature)};let r = Arc::new(result?);lock.insert(release, r.clone());drop(store_lock);tokio::fs::rename(&tmp.file_path(), &ubuntu).await?;std::mem::forget(tmp);Ok(r)}}/// Read an already downloaded InRelease file, without re-checking/// the signatures.pub async fn read_in_release<R: tokio::io::AsyncRead + Unpin>(&self,release: std::borrow::Cow<'static, str>,o: R,) -> Result<InRelease, Error> {let mut b = tokio::io::BufReader::new(o).lines();let mut result = HashMap::new();let mut recording = false;while let Some(l) = b.next_line().await? {if recording && l.starts_with(" ") {let mut s = l.split(' ').filter(|x| !x.is_empty());let hash = s.next().unwrap();let mut hash_ = [0; 32];data_encoding::HEXLOWER_PERMISSIVE.decode_mut(hash.as_bytes(), &mut hash_).unwrap();let size = s.next().unwrap();let name = s.next().unwrap();result.insert(name.to_string(), (size.parse().unwrap(), hash_));} else {recording = l == "SHA256:";}}Ok(InRelease {release,hashed: result,})}fn url(&self, file_name: Option<&str>) -> String {let ref m = self.mirror;let filename = file_name.unwrap();format!("{m}/{filename}")}/// Download the given URL and check the hash, moving the/// resulting dir atomically.async fn download_url<'a, S: futures::Sink<extract::Msg> + Unpin>(&self,url: &str,sha256: &str,mut tx: S,) -> Result<(Downloaded, bool), Error>whereS::Error: std::fmt::Debug,{let base = url.split('/').last().unwrap();let path = self.store_path.join(&format!("{}-{}", sha256, base));if tokio::fs::metadata(&path).await.is_ok() {debug!("is_ok {:?}", path);return Ok((Downloaded { path }, false));}let lock = self.lock_store_path(path.clone()).await;tx.send(extract::Msg::Downloading(url.to_string())).await.unwrap();info!("download url {:?} {:?}", url, sha256);const MAX_ATTEMPTS: usize = 10;for i in 1..=MAX_ATTEMPTS {match self.try_download(url, sha256, &path).await {Ok(()) => break,Err(e) if i == MAX_ATTEMPTS => return Err(e),Err(e) => {error!("attempt {:?} at downloading {:?}: {:?}", i, url, e);tokio::time::sleep(std::time::Duration::from_secs(1)).await;}}}drop(lock); // Ensure lock lives until here.Ok((Downloaded { path }, true))}async fn try_download(&self,url: &str,sha256: &str,path: &std::path::Path,) -> Result<(), Error> {let r = self.client().get(url).send().await?;let mut s = r.bytes_stream();tokio::fs::create_dir_all(&self.store_path).await?;let mut hasher = sha2::Sha256::new();let mut f = async_tempfile::TempFile::new_in(self.store_path.as_path()).await?;while let Some(item) = s.next().await {let item = item?;hasher.update(&item);f.write_all(&item).await?;}f.flush().await?;let hash = data_encoding::HEXLOWER_PERMISSIVE.encode(&hasher.finalize());if &hash != sha256 {return Err(Error::WrongHash {expected: sha256.to_string(),got: hash,});}tokio::fs::rename(&f.file_path(), &path).await?;std::mem::forget(f);Ok(())}}
use clap::*;use elpe::*;use serde_derive::*;use std::path::PathBuf;use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};mod server;#[derive(Deserialize)]pub struct Config {pgp_home: PathBuf,store_path: PathBuf,package_index: String,user: String,}/// Elpe#[derive(Parser, Debug)]#[command(version, about, long_about = None)]struct Args {/// Path to the configuration file#[arg(short, long)]config: PathBuf,}fn main() {tracing_subscriber::registry().with(tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| String::new().into()),).with(tracing_subscriber::fmt::layer()).init();let args = Args::parse();let config: Config = toml::from_str(&std::fs::read_to_string(&args.config).unwrap()).unwrap();let container_channel = crate::container::serve(&config.user, &config.store_path);let rt = tokio::runtime::Runtime::new().unwrap();rt.block_on(async move {privdrop::PrivDrop::default().user(&config.user).apply().unwrap();let elpe = server::Elpe::new(Client::new(&config.pgp_home, &config.store_path, &config.package_index),container_channel,);let addr = "0.0.0.0:50051".parse().unwrap();elpe.serve(addr).await})}
use elpe::extract::*;use elpe::*;use std::sync::Arc;use tokio::io::AsyncWriteExt;use tokio_stream::wrappers::ReceiverStream;use tokio_util::sync::PollSender;use tonic::codegen::tokio_stream::StreamExt;use tracing::*;pub struct Elpe {deb_client: elpe::Client,sender: tokio::sync::mpsc::UnboundedSender<(crate::container::BuildRequest,tokio::sync::mpsc::Sender<crate::container::Msg>,)>,t: Option<tokio::task::JoinHandle<Result<(), elpe::Error>>>,}pub mod proto {tonic::include_proto!("elpe");}impl Elpe {pub fn new(deb_client: elpe::Client,container_channel: elpe::container::ContainerChannel,) -> Self {let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(crate::container::BuildRequest,tokio::sync::mpsc::Sender<crate::container::Msg>,)>();let t = tokio::spawn(crate::container::forward(receiver, container_channel));Elpe {deb_client,sender,t: Some(t),}}pub async fn serve(mut self, addr: std::net::SocketAddr) {let t = self.t.take().unwrap();tokio::select! {_ = tonic::transport::Server::builder().add_service(proto::elpe_server::ElpeServer::new(self)).serve(addr)=> {}_ = t => {}}}}use std::pin::Pin;type ResponseStream =Pin<Box<dyn tokio_stream::Stream<Item = Result<proto::DerivationReply, tonic::Status>> + Send>>;#[tonic::async_trait]impl proto::elpe_server::Elpe for Elpe {async fn handshake(&self,request: tonic::Request<proto::Empty>,) -> Result<tonic::Response<proto::PlatformReply>, tonic::Status> {Ok(proto::PlatformReply {endianness: if cfg!(target_endian = "big") { 0 } else { 1 },pointer_width: std::mem::size_of::<usize>() as i32,arch: match std::env::consts::ARCH {"x86_64" => 0,"aarch64" => 1,_ => unimplemented!(),},}.into())}async fn add_path(&self,request: tonic::Request<tonic::Streaming<proto::AddPathRequest>>,) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {let mut r = request.into_inner();debug!("add_path");let mut current_file = None;let ref store = self.deb_client.store_path();debug!("store");let tmp_dir = tempfile::tempdir_in(store).unwrap();debug!("store {:?}", tmp_dir);let mut hasher = blake3::Hasher::new();debug!("loop");loop {trace!("waiting for next in stream");let Some(r) = r.next().await else { break };let r = r.unwrap();match r.request {Some(proto::add_path_request::Request::File(f)) => {info!("Adding file {:?}", f.name);hasher.update(b"\0f");hasher.update(f.name.as_bytes());hasher.update(b"\0");let p = tmp_dir.path().join(&f.name);tokio::fs::create_dir_all(p.parent().unwrap()).await?;current_file = Some(tokio::fs::File::create(&p).await?)}Some(proto::add_path_request::Request::Directory(d)) => {info!("Adding file {:?}", d.name);hasher.update(b"\0d");hasher.update(d.name.as_bytes());hasher.update(b"\0");let p = tmp_dir.path().join(&d.name);tokio::fs::create_dir_all(&p).await.unwrap();}Some(proto::add_path_request::Request::Contents(c)) => {hasher.update(&c.content);current_file.as_mut().unwrap().write_all(&c.content).await?;}None => break,}}debug!("loop done");let path = store.join(data_encoding::HEXLOWER.encode(hasher.finalize().as_bytes()));use tokio::io::ErrorKind;let new = tmp_dir.into_path();match tokio::fs::rename(&new, &path).await {Ok(()) => (),Err(e) if e.kind() == ErrorKind::DirectoryNotEmpty => (),Err(e) => {tokio::fs::remove_dir_all(&new).await?;return Err(e.into());}}info!("add_path extracted to {:?}", path);Ok(tonic::Response::new(proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: vec![path.to_str().unwrap().to_string()],paths: Vec::new(),path_patterns: Vec::new(),},)),}))}type DerivationStream = ResponseStream;async fn derivation(&self,request: tonic::Request<proto::DerivationRequest>,) -> Result<tonic::Response<ResponseStream>, tonic::Status> {debug!("derivation request");let now = std::time::Instant::now();let r = request.into_inner();let (tx, rx) = tokio::sync::mpsc::channel(200);debug!("derivation request: {:?} {:?}", r.name, r.paths);self.sender.send((crate::container::BuildRequest {name: r.name.clone(),paths: r.paths,script: r.builder,target: r.target,output_hash: r.output_hash,},tx,)).unwrap();use crate::container::Msg;let output_stream = ReceiverStream::new(rx).map(|x| {Ok(match x {Msg::Ok(out) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: out.into_iter().map(|x| x.to_str().unwrap().to_string()).collect(),paths: Vec::new(),path_patterns: Vec::new(),},)),},Msg::Error(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Error(p)),},Msg::Stdout(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Stdout(p)),},Msg::Stderr(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Stderr(p)),},})});info!("request {:?}: {:?}", r.name, now.elapsed());Ok(tonic::Response::new(Box::pin(output_stream)))}async fn add_url(&self,request: tonic::Request<proto::AddUrlRequest>,) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {let r = request.into_inner();debug!("add_url request {:?}", r);let p = self.deb_client.http_download(&r.url,match r.hash_algorithm {0 => {let mut h = [0; 32];h.clone_from_slice(&r.hash);Hash::Blake3(h)}1 => {let mut h = [0; 32];h.clone_from_slice(&r.hash);Hash::Sha256(h)}2 => {let mut h = [0; 64];h.clone_from_slice(&r.hash);Hash::Sha512(h)}_ => unreachable!(),},).await.unwrap();Ok(tonic::Response::new(proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: vec![p.to_str().unwrap().to_string()],paths: Vec::new(),path_patterns: Vec::new(),},)),}))}async fn ubuntu_release(&self,request: tonic::Request<proto::UbuntuReleaseRequest>,) -> Result<tonic::Response<proto::DerivationReply>, tonic::Status> {debug!("ubuntu release {:?}", request);let now = std::time::Instant::now();let r = request.into_inner();let h = self.deb_client.in_release(r.release.clone()).await.unwrap();let arch = match r.arch {0 => "amd64",1 => "arm64",_ => unreachable!(),};let p = self.deb_client.packages(&h, &r.repository, arch).await.expect(&format!("{} {}", &r.repository,&arch));info!("ubuntu release request {:?}: {:?}",r.release,now.elapsed());Ok(tonic::Response::new(proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: vec![p.to_str().unwrap().to_string()],paths: Vec::new(),path_patterns: Vec::new(),},)),}))}type UbuntuPackageStream = ResponseStream;async fn ubuntu_package(&self,request: tonic::Request<proto::UbuntuPackageRequest>,) -> Result<tonic::Response<Self::UbuntuPackageStream>, tonic::Status> {let now = std::time::Instant::now();let r = request.into_inner();let index: Result<Vec<_>, _> = r.index.into_iter().map(|index| deb::Index::open(&index)).collect();let index = index.unwrap();let link_extra: Vec<_> = r.link_extra.into_iter().map(|l| {(regex::Regex::new(&l.pkg).unwrap(),regex::Regex::new(&l.dep).unwrap(),)}).collect();let (tx, rx) = tokio::sync::mpsc::channel(200);use crate::extract::Msg;let output_stream = ReceiverStream::new(rx).map(|x| {Ok(match x {Msg::Downloading(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Loading(p)),},Msg::Ok(p) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Ok(proto::DerivationResult {destdir: p.result.iter().rev().map(|x| x.to_str().unwrap().to_string()).collect(),paths: p.paths.into_iter().filter_map(Arc::into_inner).collect(),path_patterns: Vec::new(),},)),},Msg::Error(e) => proto::DerivationReply {result: Some(proto::derivation_reply::Result::Error(e.to_string())),},})});match download_extract_deps(&index,&self.deb_client,&r.name,&link_extra,PollSender::new(tx.clone()),).await{Ok(p) => {info!("path {:?} {:#?}", r.name, p);tx.send(Msg::Ok(p)).await.unwrap();}Err(e) => tx.send(Msg::Error(e)).await.unwrap(),}info!("ubuntu package request {:?}: {:?}", r.name, now.elapsed());Ok(tonic::Response::new(Box::pin(output_stream)))}}