#![deny(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications
)]
use futures::StreamExt;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::NamedTempFile;
use thiserror::*;
use tokio::sync::{Mutex, Semaphore};
use tracing::*;
pub mod deb;
pub mod extract;
pub mod find_files;
pub mod mount;
pub mod container;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Deb(#[from] deb::Error),
#[error(transparent)]
IO(#[from] std::io::Error),
#[error(transparent)]
Persist(#[from] tempfile::PersistError),
#[error(transparent)]
Tempfile(#[from] async_tempfile::Error),
#[error(transparent)]
Elf(#[from] elfedit::Error),
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error("Signature error")]
Signature,
#[error("Hash: expected {expected}, got {got}.")]
WrongHash {
expected: String,
got: String,
},
#[error("Wrong size")]
WrongSize,
#[error("Wrong result symlink, expected {expected:?}, got {got:?}")]
WrongResultSymlink {
expected: PathBuf,
got: PathBuf,
},
#[error("Failed to produce $DESTDIR")]
NoDestDir,
#[error("Build process returned {status}")]
BuildReturn {
status: i32,
},
}
#[derive(Clone)]
pub struct Client {
c: lazy_init::Lazy<reqwest::Client>,
pgp_home: PathBuf,
mirror: String,
store_path: PathBuf,
in_release: Arc<Mutex<Option<Arc<InRelease>>>>,
download_sem: Arc<Semaphore>,
store_locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,
timeout: std::time::Duration,
}
pub struct StoreLock {
locks: Arc<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>>,
p: PathBuf,
_lock: tokio::sync::OwnedMutexGuard<()>,
}
impl Drop for StoreLock {
fn drop(&mut self) {
debug!("store lock: dropping {:?}", self.p);
self.locks.lock().unwrap().remove(&self.p);
}
}
#[derive(Debug)]
struct Downloaded {
path: PathBuf,
}
use sha2::Digest;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::process::Command;
#[derive(Debug)]
pub struct InRelease {
hashed: HashMap<String, (usize, [u8; 32])>,
release: std::borrow::Cow<'static, str>,
}
struct HashedWriter<W> {
hasher: sha2::Sha256,
w: W,
}
impl<W: std::io::Write> std::io::Write for HashedWriter<W> {
fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
self.hasher.update(b);
self.w.write_all(b).unwrap();
Ok(b.len())
}
fn flush(&mut self) -> Result<(), std::io::Error> {
self.w.flush()
}
}
const MAX_PARALLEL_DOWNLOADS: usize = 20;
impl Client {
pub fn new<P: AsRef<std::path::Path>, Q: AsRef<std::path::Path>>(
pgp_home: P,
store_path: Q,
mirror: &str,
) -> Self {
Client {
c: lazy_init::Lazy::new(),
mirror: mirror.to_string(),
store_path: store_path.as_ref().to_path_buf(),
in_release: Arc::new(None.into()),
download_sem: Arc::new(Semaphore::new(MAX_PARALLEL_DOWNLOADS)),
store_locks: Arc::new(std::sync::Mutex::new(HashMap::new())),
timeout: std::time::Duration::from_secs(30),
pgp_home: pgp_home.as_ref().to_path_buf(),
}
}
pub fn store_path(&self) -> &std::path::Path {
&self.store_path
}
fn client(&self) -> &reqwest::Client {
self.c.get_or_create(|| {
reqwest::ClientBuilder::new()
.read_timeout(self.timeout)
.build()
.unwrap()
})
}
pub async fn command<R: Into<std::borrow::Cow<'static, str>>, P: AsRef<std::path::Path>>(
&self,
release: R,
pkg: &str,
cmd: P,
) -> Command {
let h = self.in_release(release.into()).await.unwrap();
let arch = if cfg!(target_arch = "x86_64") {
"amd64"
} else if cfg!(target_arch = "aarch64") {
"aarch64"
} else {
unreachable!()
};
let (main, universe) = tokio::join!(
self.packages(&h, "main", arch),
self.packages(&h, "universe", arch)
);
let main = main.unwrap();
let universe = universe.unwrap();
let index = vec![
deb::Index::open(&main).unwrap(),
deb::Index::open(&universe).unwrap(),
];
let p = extract::download_extract_deps(&index, self, pkg)
.await
.unwrap();
debug!("running {:?}", p.result.last().unwrap().join(&cmd));
Command::new(p.result.last().unwrap().join(&cmd))
}
pub async fn lock_store_path<P: AsRef<std::path::Path>>(&self, path: P) -> StoreLock {
debug!("locking store path {:?}", path.as_ref());
let p = path.as_ref().to_path_buf();
let mutex = {
let mut locks = self.store_locks.lock().unwrap();
locks
.entry(p.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
};
let lock = mutex.lock_owned().await;
debug!("lock acquired: {:?}", p);
StoreLock {
locks: self.store_locks.clone(),
p,
_lock: lock,
}
}
pub async fn packages(
&self,
in_release: &InRelease,
component: &str,
arch: &str,
) -> Result<PathBuf, Error> {
let end = format!("{component}/binary-{arch}/Packages");
let end_xz = format!("{component}/binary-{arch}/Packages.xz");
let (size_xz, expected_hash_xz) = in_release.hashed.get(&end_xz).unwrap();
let (size, expected_hash) = in_release.hashed.get(&end).unwrap();
let expected_xz = data_encoding::BASE32_DNSSEC.encode(&*expected_hash_xz);
let expected = data_encoding::BASE32_DNSSEC.encode(&*expected_hash);
std::fs::create_dir_all(&self.store_path)?;
let expected_path = self.store_path.join(&expected);
if let Ok(_) = std::fs::metadata(&expected_path) {
return Ok(expected_path);
}
let ref m = self.mirror;
let ref release = in_release.release;
let r = self
.client()
.get(&format!(
"{m}/dists/{release}/{component}/binary-{arch}/Packages.xz"
))
.send()
.await?;
let file_xz = NamedTempFile::new_in(&self.store_path)?;
let file = NamedTempFile::new_in(&self.store_path)?;
let mut xz_dec = xz2::write::XzDecoder::new(HashedWriter {
w: std::fs::File::create(&file)?,
hasher: sha2::Sha256::new(),
});
let mut f = tokio::fs::File::create(&file_xz).await?;
let mut r = r.bytes_stream();
let mut hasher_xz = sha2::Sha256::new();
let mut total = 0;
while let Some(item) = r.next().await {
let item = item?;
total += item.len();
{
use std::io::Write;
let mut i = 0;
loop {
let n = xz_dec.write(&item[i..])?;
if n == 0 {
break;
} else {
i += n
}
}
}
hasher_xz.update(&item);
f.write_all(&item).await?;
}
if total != *size_xz || xz_dec.total_out() != *size as u64 {
return Err(Error::WrongSize);
}
let w = xz_dec.finish().unwrap();
let got_xz = &hasher_xz.finalize()[..];
let got = &w.hasher.finalize()[..];
if got_xz == expected_hash_xz && got == expected_hash {
let mut expected_path_xz = self.store_path.join(&expected_xz);
expected_path_xz.set_extension("xz");
file_xz.persist(&expected_path_xz)?;
file.persist(&expected_path)?;
Ok(expected_path)
} else {
Err(Error::WrongHash {
expected: expected_xz,
got: data_encoding::HEXLOWER.encode(&got),
})
}
}
pub async fn in_release<R: Into<std::borrow::Cow<'static, str>>>(
&self,
release: R,
) -> Result<Arc<InRelease>, Error> {
let ref m = self.mirror;
let release = release.into();
debug!("downloading in_release {:?}", release);
let mut ubuntu = self.store_path.join("ubuntu");
std::fs::create_dir_all(&ubuntu)?;
ubuntu.push(&*release);
let mut lock = self.in_release.lock().await;
if let Some(l) = (&*lock).clone() {
debug!("in_release: lock");
return Ok(l);
}
debug!("open {:?}", ubuntu);
let f = tokio::fs::File::open(&ubuntu).await;
if let Ok(f) = f {
let release = Arc::new(self.read_in_release(release, f).await?);
*lock = Some(release.clone());
Ok(release)
} else {
debug!("downloading InRelease to {:?}", ubuntu);
let store_lock = self.lock_store_path(&ubuntu).await;
let r = self
.client()
.get(&format!("{m}/dists/{release}/InRelease"))
.send()
.await?;
let tmp = async_tempfile::TempFile::new_in(ubuntu.parent().unwrap()).await?;
let mut com = Command::new("sq")
.env("HOME", &self.pgp_home)
.arg("verify")
.arg("--overwrite")
.arg(&format!("--output={}", tmp.file_path().to_str().unwrap()))
.arg("--message")
.stdin(Stdio::piped())
.spawn()?;
let mut i = com.stdin.take().unwrap();
let mut r = r.bytes_stream();
while let Some(item) = r.next().await {
i.write_all(&item?).await?;
}
drop(i);
let status = com.wait().await?;
let result = if status.success() {
self.read_in_release(release, tokio::fs::File::open(&tmp.file_path()).await?)
.await
} else {
Err(Error::Signature)
};
let release = Arc::new(result?);
*lock = Some(release.clone());
drop(store_lock);
tokio::fs::rename(&tmp.file_path(), &ubuntu).await?;
std::mem::forget(tmp);
Ok(release)
}
}
pub async fn read_in_release<R: tokio::io::AsyncRead + Unpin>(
&self,
release: std::borrow::Cow<'static, str>,
o: R,
) -> Result<InRelease, Error> {
let mut b = tokio::io::BufReader::new(o).lines();
let mut result = HashMap::new();
let mut recording = false;
while let Some(l) = b.next_line().await? {
if recording && l.starts_with(" ") {
let mut s = l.split(' ').filter(|x| !x.is_empty());
let hash = s.next().unwrap();
let mut hash_ = [0; 32];
data_encoding::HEXLOWER_PERMISSIVE
.decode_mut(hash.as_bytes(), &mut hash_)
.unwrap();
let size = s.next().unwrap();
let name = s.next().unwrap();
result.insert(name.to_string(), (size.parse().unwrap(), hash_));
} else {
recording = l == "SHA256:";
}
}
Ok(InRelease {
release,
hashed: result,
})
}
fn url(&self, file_name: Option<&str>) -> String {
let ref m = self.mirror;
let filename = file_name.unwrap();
format!("{m}/{filename}")
}
async fn download_url<'a>(&self, url: &str, sha256: &str) -> Result<(Downloaded, bool), Error> {
let base = url.split('/').last().unwrap();
let path = self.store_path.join(&format!("{}-{}", sha256, base));
if tokio::fs::metadata(&path).await.is_ok() {
debug!("is_ok {:?}", path);
return Ok((Downloaded { path }, false));
}
let lock = self.lock_store_path(path.clone()).await;
info!("download url {:?} {:?}", url, sha256);
const MAX_ATTEMPTS: usize = 10;
for i in 1..=MAX_ATTEMPTS {
match self.try_download(url, sha256, &path).await {
Ok(()) => break,
Err(e) if i == MAX_ATTEMPTS => return Err(e),
Err(e) => {
error!("attempt {:?} at downloading {:?}: {:?}", i, url, e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
drop(lock); Ok((Downloaded { path }, true))
}
async fn try_download(
&self,
url: &str,
sha256: &str,
path: &std::path::Path,
) -> Result<(), Error> {
let r = self.client().get(url).send().await?;
let mut s = r.bytes_stream();
tokio::fs::create_dir_all(&self.store_path).await?;
let mut hasher = sha2::Sha256::new();
let mut f = async_tempfile::TempFile::new_in(self.store_path.as_path()).await?;
while let Some(item) = s.next().await {
let item = item?;
hasher.update(&item);
f.write_all(&item).await?;
}
f.flush().await?;
let hash = data_encoding::HEXLOWER_PERMISSIVE.encode(&hasher.finalize());
if &hash != sha256 {
return Err(Error::WrongHash {
expected: sha256.to_string(),
got: hash,
});
}
tokio::fs::rename(&f.file_path(), &path).await?;
std::mem::forget(f);
Ok(())
}
}