use anyhow::bail;
use libpijul::pristine::{Base32, Position};
use libpijul::Hash;
use log::{debug, error, trace};
use std::collections::HashSet;
use std::io::Write;
use std::path::PathBuf;
use crate::remote::CS;
const USER_AGENT: &str = concat!("pijul-", clap::crate_version!());
pub struct Http {
pub url: url::Url,
pub channel: String,
pub client: reqwest::Client,
pub name: String,
pub headers: Vec<(String, String)>,
}
async fn download_change(
client: reqwest::Client,
url: url::Url,
headers: Vec<(String, String)>,
mut path: PathBuf,
c: CS,
) -> Result<CS, anyhow::Error> {
let (req, c32) = match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut path, &c);
("change", c.to_base32())
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
if std::fs::metadata(&path).is_ok() {
bail!("Tag already downloaded: {}", c.to_base32())
}
("tag", c.to_base32())
}
};
std::fs::create_dir_all(&path.parent().unwrap())?;
let path_ = path.with_extension("tmp");
let mut f = tokio::fs::File::create(&path_).await?;
let url = format!("{}/{}", url, super::DOT_DIR);
let mut delay = 1f64;
let (send, mut recv) = tokio::sync::mpsc::channel::<Option<bytes::Bytes>>(100);
let t = tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
while let Some(chunk) = recv.recv().await {
match chunk {
Some(chunk) => {
trace!("writing {:?}", chunk.len());
f.write_all(&chunk).await?;
}
None => {
f.set_len(0).await?;
}
}
}
f.flush().await?;
Ok::<_, std::io::Error>(())
});
let mut done = false;
while !done {
let mut req = client
.get(&url)
.query(&[(req, &c32)])
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let mut res = if let Ok(res) = req.send().await {
delay = 1f64;
res
} else {
debug!("HTTP error, retrying in {} seconds", delay.round());
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
send.send(None).await?;
delay *= 2.;
continue;
};
debug!("response {:?}", res);
if !res.status().is_success() {
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
send.send(None).await?;
delay *= 2.;
continue;
}
let mut size = res
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|x| x.to_str().ok())
.unwrap_or("0")
.parse::<usize>()
.ok();
while !done {
match res.chunk().await {
Ok(Some(chunk)) => {
if let Some(ref mut s) = size {
*s -= chunk.len();
}
send.send(Some(chunk)).await?;
}
Ok(None) => match size {
Some(0) | None => done = true,
_ => break,
},
Err(e) => {
debug!("error {:?}", e);
error!("Error while downloading {:?} from {:?}, retrying", c32, url);
send.send(None).await?;
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
delay *= 2.;
break;
}
}
}
}
std::mem::drop(send);
t.await??;
if done {
match c {
CS::Change(_) => {
std::fs::rename(&path_, &path)?;
}
CS::State(_) => {
std::fs::rename(&path_, &path)?;
}
}
}
Ok(c)
}
const POOL_SIZE: usize = 20;
impl Http {
pub async fn download_changes(
&mut self,
pro_n: usize,
hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
send: &mut tokio::sync::mpsc::Sender<CS>,
path: &PathBuf,
_full: bool,
) -> Result<(), anyhow::Error> {
let mut pool = <[_; POOL_SIZE]>::default();
let mut cur = 0;
while let Some(c) = hashes.recv().await {
debug!("downloading {:?}", c);
let t = std::mem::replace(
&mut pool[cur],
Some(tokio::spawn(download_change(
self.client.clone(),
self.url.clone(),
self.headers.clone(),
path.clone(),
c,
))),
);
if let Some(t) = t {
debug!("waiting for process {:?}", cur);
let c = t.await??;
debug!("sending {:?}", c);
super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
if send.send(c).await.is_err() {
debug!("err for {:?}", c);
break;
}
debug!("sent");
}
cur = (cur + 1) % POOL_SIZE;
}
for f in 0..POOL_SIZE {
if let Some(t) = pool[(cur + f) % POOL_SIZE].take() {
let c = t.await??;
debug!("sending {:?}", c);
super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
if send.send(c).await.is_err() {
debug!("err for {:?}", c);
break;
}
debug!("sent");
}
}
Ok(())
}
pub async fn upload_changes(
&self,
pro_n: usize,
mut local: PathBuf,
to_channel: Option<&str>,
changes: &[CS],
) -> Result<(), anyhow::Error> {
for c in changes {
let url = {
let mut p = self.url.path().to_string();
if !p.ends_with("/") {
p.push('/')
}
p.push_str(super::DOT_DIR);
let mut u = self.url.clone();
u.set_path(&p);
u
};
let mut to_channel = if let Some(ch) = to_channel {
vec![("to_channel", ch)]
} else {
Vec::new()
};
let base32;
let body = match c {
CS::Change(c) => {
libpijul::changestore::filesystem::push_filename(&mut local, &c);
let change = std::fs::read(&local)?;
base32 = c.to_base32();
to_channel.push(("apply", &base32));
change
}
CS::State(c) => {
libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;
let mut v = Vec::new();
tag_file.short(&mut v)?;
base32 = c.to_base32();
to_channel.push(("tagup", &base32));
v
}
};
libpijul::changestore::filesystem::pop_filename(&mut local);
debug!("url {:?} {:?}", url, to_channel);
let mut req = self
.client
.post(url)
.query(&to_channel)
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in self.headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let resp = req.body(body).send().await?;
let stat = resp.status();
if !stat.is_success() {
let body = resp.text().await?;
if !body.is_empty() {
bail!("The HTTP server returned an error: {}", body)
} else {
if let Some(reason) = stat.canonical_reason() {
bail!("HTTP Error {}: {}", stat.as_u16(), reason)
} else {
bail!("HTTP Error {}", stat.as_u16())
}
}
}
super::PROGRESS.borrow_mut().unwrap()[pro_n].incr();
}
Ok(())
}
pub async fn download_changelist<
A,
F: FnMut(&mut A, u64, Hash, libpijul::Merkle, bool) -> Result<(), anyhow::Error>,
>(
&self,
mut f: F,
a: &mut A,
from: u64,
paths: &[String],
) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
let url = {
let mut p = self.url.path().to_string();
if !p.ends_with("/") {
p.push('/')
}
p.push_str(super::DOT_DIR);
let mut u = self.url.clone();
u.set_path(&p);
u
};
let from_ = from.to_string();
let mut query = vec![("changelist", &from_), ("channel", &self.channel)];
for p in paths.iter() {
query.push(("path", p));
}
let mut req = self
.client
.get(url)
.query(&query)
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in self.headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let res = req.send().await?;
let status = res.status();
if !status.is_success() {
match serde_json::from_slice::<libpijul::RemoteError>(&*res.bytes().await?) {
Ok(remote_err) => return Err(remote_err.into()),
Err(_) if status.as_u16() == 404 => {
bail!("Repository `{}` not found (404)", self.url)
}
Err(_) => bail!("Http request failed with status code: {}", status),
}
}
let resp = res.bytes().await?;
let mut result = HashSet::new();
if let Ok(data) = std::str::from_utf8(&resp) {
for l in data.lines() {
if !l.is_empty() {
match super::parse_line(l)? {
super::ListLine::Change { n, m, h, tag } => f(a, n, h, m, tag)?,
super::ListLine::Position(pos) => {
result.insert(pos);
}
super::ListLine::Error(e) => {
let mut stderr = std::io::stderr();
writeln!(stderr, "{}", e)?;
}
}
} else {
break;
}
}
}
Ok(result)
}
pub async fn get_state(
&mut self,
mid: Option<u64>,
) -> Result<Option<(u64, libpijul::Merkle, libpijul::Merkle)>, anyhow::Error> {
debug!("get_state {:?}", self.url);
let url = format!("{}/{}", self.url, super::DOT_DIR);
let q = if let Some(mid) = mid {
[
("state", format!("{}", mid)),
("channel", self.channel.clone()),
]
} else {
[("state", String::new()), ("channel", self.channel.clone())]
};
let mut req = self
.client
.get(&url)
.query(&q)
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in self.headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let res = req.send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let resp = res.bytes().await?;
let resp = std::str::from_utf8(&resp)?;
debug!("resp = {:?}", resp);
let mut s = resp.split_whitespace();
if let (Some(n), Some(m), Some(m2)) = (
s.next().and_then(|s| s.parse().ok()),
s.next()
.and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
s.next()
.and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
) {
Ok(Some((n, m, m2)))
} else {
Ok(None)
}
}
pub async fn get_id(&self) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
debug!("get_state {:?}", self.url);
let url = format!("{}/{}", self.url, super::DOT_DIR);
let q = [("channel", self.channel.clone()), ("id", String::new())];
let mut req = self
.client
.get(&url)
.query(&q)
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in self.headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let res = req.send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
let resp = res.bytes().await?;
debug!("resp = {:?}", resp);
Ok(libpijul::pristine::RemoteId::from_bytes(&resp))
}
pub async fn archive<W: std::io::Write + Send + 'static>(
&mut self,
prefix: Option<String>,
state: Option<(libpijul::Merkle, &[Hash])>,
mut w: W,
) -> Result<u64, anyhow::Error> {
let url = {
let mut p = self.url.path().to_string();
if !p.ends_with("/") {
p.push('/')
}
p.push_str(super::DOT_DIR);
let mut u = self.url.clone();
u.set_path(&p);
u
};
let res = self.client.get(url).query(&[("channel", &self.channel)]);
let res = if let Some((ref state, ref extra)) = state {
let mut q = vec![("archive".to_string(), state.to_base32())];
if let Some(pre) = prefix {
q.push(("outputPrefix".to_string(), pre));
}
for e in extra.iter() {
q.push(("change".to_string(), e.to_base32()))
}
res.query(&q)
} else {
res
};
let res = res
.header(reqwest::header::USER_AGENT, USER_AGENT)
.send()
.await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
use futures_util::StreamExt;
let mut stream = res.bytes_stream();
let mut conflicts = 0;
let mut n = 0;
while let Some(item) = stream.next().await {
let item = item?;
let mut off = 0;
while n < 8 && off < item.len() {
conflicts = (conflicts << 8) | (item[off] as u64);
off += 1;
n += 1
}
w.write_all(&item[off..])?;
}
Ok(conflicts as u64)
}
pub async fn update_identities(
&mut self,
rev: Option<u64>,
mut path: PathBuf,
) -> Result<u64, anyhow::Error> {
let url = {
let mut p = self.url.path().to_string();
if !p.ends_with("/") {
p.push('/')
}
p.push_str(super::DOT_DIR);
let mut u = self.url.clone();
u.set_path(&p);
u
};
let mut req = self
.client
.get(url)
.query(&[(
"identities",
if let Some(rev) = rev {
format!("{}", rev)
} else {
String::new()
},
)])
.header(reqwest::header::USER_AGENT, USER_AGENT);
for (k, v) in self.headers.iter() {
req = req.header(k.as_str(), v.as_str());
}
let res = req.send().await?;
if !res.status().is_success() {
bail!("HTTP error {:?}", res.status())
}
use serde_derive::*;
#[derive(Debug, Deserialize)]
struct Identities {
id: Vec<crate::identity::Complete>,
rev: u64,
}
let resp: Identities = res.json().await?;
std::fs::create_dir_all(&path)?;
for id in resp.id.iter() {
path.push(&id.public_key.key);
debug!("recv identity: {:?} {:?}", id, path);
let mut id_file = std::fs::File::create(&path)?;
serde_json::to_writer_pretty(&mut id_file, &id.as_portable())?;
path.pop();
}
Ok(resp.rev)
}
}