use crate::permissions::Perm;
use crate::repository::channel_spec_id;
use crate::Config;
use axum::{
debug_handler,
extract::{Query, State},
response::{IntoResponse, Response},
Json,
};
use axum_extra::extract::SignedCookieJar;
use diesel_async::AsyncPgConnection;
use libpijul::{
changestore::ChangeStore,
pristine::{Position, TxnErr},
Base32, ChangeId, ChannelTxnT, DepsTxnT, GraphTxnT, TxnT, TxnTExt,
};
use serde_derive::*;
use std::collections::HashSet;
use super::Author;
use tracing::*;
#[derive(Debug, Serialize)]
struct Changes {
hashes: Vec<Change>,
owner: String,
repo: String,
channel: String,
channels: Vec<String>,
token: String,
login: Option<String>,
}
#[derive(Debug, Default, Serialize)]
struct Change {
hash: String,
state: String,
unrecordable: bool,
pos: u64,
n: u64,
authors: Vec<Author>,
message: String,
date: chrono::DateTime<chrono::Utc>,
#[serde(skip)]
hash_: Option<libpijul::Hash>,
#[serde(skip)]
header:
Option<tokio::task::JoinHandle<libpijul::change::ChangeHeader_<libpijul::change::Author>>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListQuery {
from: Option<u64>,
to: Option<u64>,
count: Option<u64>,
rev: Option<bool>,
channel: Option<String>,
req_paths: Option<Vec<String>>,
}
const DEFAULT_PAGE_SIZE: u64 = 20;
#[debug_handler]
pub async fn changelist(
State(config): State<Config>,
jar: SignedCookieJar,
tree: crate::repository::RepoPath,
token: axum_csrf::CsrfToken,
page: Query<ListQuery>,
) -> Result<Response, crate::Error> {
let (uid, login) = crate::get_user_login_(&jar, &config).await?;
let mut db = config.db.get().await?;
let (id, _) =
crate::repository::repository_id(&mut db, &tree.owner, &tree.repo, uid, Perm::READ).await?;
let repo_locks = config.repo_locks.clone();
let channel_ = tree
.channel
.as_deref()
.unwrap_or_else(|| libpijul::DEFAULT_CHANNEL)
.to_string();
debug!("channel_ {:?}", channel_);
let channel = channel_.clone();
let req_paths: Vec<String> = if let Some(ref p) = page.req_paths {
p.iter().map(|x| x.to_string()).collect()
} else {
Vec::new()
};
let count = page.count.unwrap_or(DEFAULT_PAGE_SIZE);
let reverse = page.rev.unwrap_or(true);
let mut hashes = list_changes(
&repo_locks,
id,
&channel,
&req_paths,
reverse,
page.from,
count,
)
.await?;
let repo_ = repo_locks.get(&id).await?;
for h in hashes.iter_mut() {
let changes = repo_.changes.clone();
let hh = h.hash_.unwrap();
h.header = Some(tokio::task::spawn_blocking(move || {
changes.get_header(&hh).unwrap()
}));
}
for h in hashes.iter_mut() {
fill(&mut db, h).await?
}
debug!("hashes {:#?}", hashes);
let token_ = token.authenticity_token()?;
Ok((
token,
Json(Changes {
hashes,
channel: channel_.to_string(),
channels: repo_.channels().await,
owner: tree.owner,
repo: tree.repo,
token: token_,
login,
}),
)
.into_response())
}
async fn fill(db: &mut AsyncPgConnection, change: &mut Change) -> Result<(), crate::Error> {
let mut header = change.header.take().unwrap().await?;
info!("fill {:#?}", header);
change.message = header.message;
change.date = header.timestamp;
change.authors = super::get_authors(db, &mut header.authors).await?;
Ok(())
}
async fn list_changes(
repo_locks: &crate::repository::RepositoryLocks,
id: uuid::Uuid,
channel: &str,
req_paths: &[String],
reverse: bool,
from: Option<u64>,
count: u64,
) -> Result<Vec<Change>, crate::Error> {
let mut v = Vec::new();
let txn = {
let repo_ = repo_locks.get(&id).await?;
let pristine = repo_.pristine.read().await;
pristine.txn_begin()?
};
let c = channel_spec_id(id, &channel);
let channel = if let Some(channel) = txn.load_channel(&c)? {
channel
} else if channel.is_empty() || channel == libpijul::DEFAULT_CHANNEL {
return Ok(v);
} else {
debug!("channel not found {:?}", c);
return Err(crate::Error::ChannelNotFound {
channel: channel.to_string(),
});
};
let mut paths = HashSet::new();
let repo_ = repo_locks.get(&id).await?;
for r in req_paths {
if let Ok((p, ambiguous)) = txn.follow_oldest_path(&repo_.changes, &channel, &r) {
let h: libpijul::Hash = txn.get_external(&p.change)?.unwrap().into();
v.push(Change {
hash: h.to_base32(),
hash_: Some(h),
pos: u64::from_le(p.pos.0 .0),
..Change::default()
});
if ambiguous {
return Err(crate::Error::ChangeNotFound);
}
paths.insert(p);
paths.extend(
libpijul::fs::iter_graph_descendants(&txn, txn.graph(&*channel.read()), p)?
.filter_map(|x| x.ok()),
);
} else {
return Err(crate::Error::ChangeNotFound);
}
}
debug!("paths = {:?}", paths);
let tags: Vec<u64> = txn
.iter_tags(txn.tags(&*channel.read()), from.unwrap_or(0))?
.map(|k| (*k.unwrap().0).into())
.collect();
let mut tagsi = 0;
let mut from_ = from.unwrap_or(0);
let mut is_first = true;
let mut n = 0;
if reverse {
let ch = channel.read();
if !is_first && from_ == 0 {
return Ok(v);
}
let mut it = txn.reverse_log(&ch, if from_ == 0 { None } else { Some(from_) })?;
while let Some(x) = it.next() {
let (a, b) = x?;
debug!("reverse {:?} {:?}", a, b);
if !is_first && a == from_ {
continue;
}
is_first = false;
from_ = a;
push_patch(
&txn,
&paths,
&tags,
&ch,
reverse,
&mut tagsi,
&mut v,
(a, b),
)?;
n += 1;
if (count > 0 && n > count) || n >= 32 {
return Ok(v);
}
}
} else {
let ch = channel.read();
let mut it = txn.log(&ch, from_)?;
while let Some(x) = it.next() {
let (a, b) = x?;
debug!("log {:?} {:?}", a, b);
if !is_first && a == from_ {
continue;
}
is_first = false;
from_ = a;
push_patch(
&txn,
&paths,
&tags,
&ch,
reverse,
&mut tagsi,
&mut v,
(a, b),
)?;
n += 1;
if (count > 0 && n > count) || n >= 32 {
return Ok(v);
}
}
}
Ok(v)
}
fn push_patch<
E: std::error::Error,
T: GraphTxnT<GraphError = E> + ChannelTxnT + DepsTxnT<DepsError = E>,
>(
txn: &T,
paths: &HashSet<Position<ChangeId>>,
tags: &[u64],
channel: &T::Channel,
reverse: bool,
tagsi: &mut usize,
v: &mut Vec<Change>,
(n, (h, m)): (
u64,
(
&libpijul::pristine::SerializedHash,
&libpijul::pristine::SerializedMerkle,
),
),
) -> Result<(), TxnErr<E>> {
let h_int = txn.get_internal(h)?.unwrap();
if paths.is_empty()
|| paths
.iter()
.any(|x| x.change == *h_int || txn.get_touched_files(x, Some(h_int)).unwrap().is_some())
{
let h: libpijul::Hash = h.into();
let m: libpijul::Merkle = m.into();
if tags.get(*tagsi) == Some(&n) {
v.push(Change {
hash: h.to_base32(),
hash_: Some(h),
state: m.to_base32(),
..Change::default()
});
*tagsi += 1
} else {
let unrecordable = if reverse {
let ch = txn.get_internal(&h.into())?.unwrap();
let mut un = true;
for x in txn.iter_revdep(&ch)? {
let (a, b) = x?;
if a > ch {
break;
} else if a < ch {
continue;
}
if txn.get_changeset(T::changes(txn, channel), b)?.is_some() {
un = false;
break;
}
}
un
} else {
false
};
v.push(Change {
hash: h.to_base32(),
hash_: Some(h),
state: m.to_base32(),
n,
unrecordable,
..Change::default()
});
}
}
Ok(())
}