use nix_mirror::{handle_narinfo, store_path_to_narinfo_hash};
use std::collections::HashSet;
use std::path::PathBuf;
use futures::stream::{self, StreamExt};
use tokio::fs;
use anyhow::Result;
use indicatif::ProgressBar;
use structopt::StructOpt;
/// An application to synchronize nix binary caches
#[derive(StructOpt)]
struct Opt {
/// The path to a store-paths.xz file containing the store paths of the packages
/// that should be synchronized, e.g a copy of http://channels.nixos.org/nixpkgs-unstable/store-paths.xz
store_paths: PathBuf,
/// The directory where the mirror should be stored
mirror_dir: PathBuf,
/// URL to the cache server that should be used
#[structopt(short, long, default_value="https://cache.nixos.org")]
cache_url: String,
/// Maximum number of concurrent downloads
#[structopt(short, long, default_value = "8")]
parallelism: usize,
}
#[tokio::main]
async fn main() -> Result<()> {
let opt = Opt::from_args();
let nar_dir = opt.mirror_dir.join("nar");
fs::create_dir_all(&nar_dir).await?;
// read all store paths to memory, there aren't that many of
// them, so we might as well read all of them into memory
let store_paths = {
use std::{fs::File, io::Read};
let mut s = String::new();
File::open(&opt.store_paths)
.map(xz2::read::XzDecoder::new)
.and_then(|mut rdr| rdr.read_to_string(&mut s))?;
s
};
// a bit hacky, since we don't know how many files we need to process until
// we're done, but it might still be nice to see some progress.
let progress = ProgressBar::new(0);
let client = reqwest::Client::new();
// our initial set of narinfo hashes to process
let mut current_narinfo_hashes = store_paths
.lines()
.map(|x| store_path_to_narinfo_hash(x).map(String::from))
.collect::<Result<HashSet<_>>>()?;
// all narinfo hashes that we have seen
let mut processed_narinfo_hashes = HashSet::new();
while !current_narinfo_hashes.is_empty() {
let mut futures = Vec::new();
for narinfo_hash in current_narinfo_hashes.drain() {
futures.push(handle_narinfo(
&client,
&opt.cache_url,
&opt.mirror_dir,
narinfo_hash.clone(),
));
processed_narinfo_hashes.insert(narinfo_hash);
progress.inc_length(1);
}
// handle at most `opt.parallelism` concurrent futures at the same time
let mut stream = stream::iter(futures).buffer_unordered(opt.parallelism);
// for the result of each future, check to see if we've already seen that hash
// and if not add it to the set of hashes to process
while let Some(result) = stream.next().await {
let new_hashes = result?;
current_narinfo_hashes.extend(
new_hashes
.into_iter()
.filter(|x| !processed_narinfo_hashes.contains(x)),
);
progress.inc(1);
}
}
Ok(())
}