use crate::config::Config;
use crate::permissions::*;
use diesel::sql_types::Bool;
use diesel::{
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use libpijul::{Base32, MutTxnT};
use lru_cache::LruCache;
use serde_derive::*;
use std::fs::{create_dir_all, metadata};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tracing::*;
use uuid::Uuid;
pub mod changestore;
pub mod router;
pub mod admin;
#[derive(Debug, Deserialize)]
pub struct TreePath {
pub owner: String,
pub repo: String,
}
#[derive(Serialize)]
pub struct RepositoryPath<'a> {
pub owner: &'a str,
pub repo: &'a str,
pub path: &'a [PathElement],
pub n_followers: u64,
pub user_is_follower: bool,
}
#[derive(Serialize)]
pub struct Description<'a> {
pub descr: &'a str,
pub repo_id: uuid::Uuid,
}
#[derive(Clone, Debug)]
pub struct RepositoryId {
pub owner_id: Uuid,
pub repo_id: Uuid,
pub fork_origin: Option<Uuid>,
}
pub fn nest_path(config: &Config, fork_origin: Uuid) -> PathBuf {
let mut p = config.repositories_path.clone();
p.push(&format!("{}", fork_origin));
p
}
pub fn nest_pristine_path(config: &Config, fork_origin: Uuid) -> PathBuf {
let mut p = nest_path(config, fork_origin);
p.push(".pijul");
p.push("pristine");
p
}
pub fn nest_changes_path(config: &Config, fork_origin: Uuid) -> PathBuf {
let mut p = nest_path(config, fork_origin);
p.push(".pijul");
p.push("changes");
p
}
impl RepositoryId {
pub fn origin_id(&self) -> Uuid {
self.fork_origin.unwrap_or(self.repo_id)
}
pub fn nest_changes_path(&self, config: &Config) -> PathBuf {
nest_changes_path(config, self.origin_id())
}
}
type Locks = Mutex<LruCache<Uuid, Arc<Repo>>>;
pub struct Repo {
pub pristine: RwLock<libpijul::pristine::sanakirja::Pristine>,
pub changes: changestore::FileSystem,
}
#[derive(Clone)]
pub struct RepositoryLocks {
pub config: Arc<Config>,
locks: Pin<Arc<Locks>>,
}
impl std::fmt::Debug for RepositoryLocks {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(fmt, "RepositoryLocks {{ … }}")
}
}
impl RepositoryLocks {
pub fn new(config: Arc<Config>) -> Self {
let cache_size = config.repository_cache_size;
RepositoryLocks {
config,
locks: Arc::pin(Mutex::new(LruCache::new(cache_size))),
}
}
pub async fn remove(&self, repo_id: &uuid::Uuid) -> Result<(), crate::Error> {
let mut locks = self.locks.lock().await;
drop(locks.remove(repo_id));
Ok(())
}
pub async fn get(&self, repo_id: &Uuid) -> Result<Arc<Repo>, crate::Error> {
debug!("get: taking lock {:?}", repo_id);
let mut locks = self.locks.lock().await;
unsafe {
info!(
"repositorylocks size: {:?} {:?}",
locks.len(),
heapsize::heap_size_of::<LruCache<_, _>>(&*locks)
);
}
debug!("get: lock taken");
{
if let Some(repo_lock) = locks.get_mut(repo_id).map(|x| x.clone()) {
debug!("get: found");
std::mem::drop(locks);
return Ok(repo_lock);
}
}
debug!("get: not found, inserting");
let path = nest_pristine_path(&self.config, *repo_id);
debug!("get: open_or_create {:?}", path);
let repo = unsafe {
self.open_or_create_repository(&path)?
};
debug!("get: open_or_create done");
let path = nest_changes_path(&self.config, *repo_id);
debug!("path = {:?}", path);
create_dir_all(&path)?;
let repo = Arc::new(Repo {
pristine: RwLock::new(repo),
changes: changestore::FileSystem {
change_cache: self.config.change_cache.clone(),
hash_cache: self.config.hash_cache.clone(),
changes_dir: path,
id: *repo_id,
db: self.config.db.clone(),
},
});
locks.insert(*repo_id, repo.clone());
std::mem::drop(locks);
debug!("get: done");
Ok(repo)
}
unsafe fn open_or_create_repository(
&self,
path: &Path,
) -> Result<libpijul::pristine::sanakirja::Pristine, crate::Error> {
if metadata(&path).is_err() {
create_dir_all(&path)?;
let repo = libpijul::pristine::sanakirja::Pristine::new_nolock(&path.join("db"))?;
repo.mut_txn_begin()?.commit()?;
Ok(repo)
} else {
Ok(libpijul::pristine::sanakirja::Pristine::new_nolock(
&path.join("db"),
)?)
}
}
}
#[derive(Debug)]
pub enum ChannelSpec {
Channel(String),
CI(String),
Discussion(i32),
}
impl ChannelSpec {
pub fn channel(c: std::borrow::Cow<str>) -> Self {
if c.ends_with("/ci") {
ChannelSpec::CI(c.split_at(c.len() - 3).0.to_string())
} else {
ChannelSpec::Channel(c.into_owned())
}
}
}
pub fn channel_spec(repo_id: &RepositoryId, channel: &str) -> String {
if channel.ends_with("/ci") {
format!(
"{}_{}",
repo_id.repo_id,
channel.split_at(channel.len() - 3).0
)
} else {
format!("{}_{}", repo_id.repo_id, channel)
}
}
pub fn channel_spec_id(repo_id: uuid::Uuid, channel: &str) -> String {
if channel.ends_with("/ci") {
format!("{}_{}", repo_id, channel.split_at(channel.len() - 3).0)
} else {
format!("{}_{}", repo_id, channel)
}
}
pub async fn repository_id(
db: &mut AsyncPgConnection,
owner: &str,
name: &str,
user: Option<uuid::Uuid>,
required: Perm,
) -> Result<(Uuid, Perm), crate::Error> {
use crate::db::repositories::dsl as r;
use crate::db::users::dsl as u;
if let Some((rid, perms, suspended, is_public)) = r::repositories
.inner_join(u::users)
.filter(u::login.eq(owner))
.filter(u::is_active)
.filter(r::name.eq(name))
.filter(r::is_active)
.select((
r::id,
crate::permissions!(user.unwrap_or(uuid::Uuid::nil()), r::id),
u::suspended,
is_public(),
))
.get_result::<(uuid::Uuid, i64, bool, bool)>(db)
.await.optional()?
{
let got = if suspended {
Perm::empty()
} else if is_public {
Perm::from_bits(perms).unwrap() | Perm::READ
} else {
Perm::from_bits(perms).unwrap()
};
if got.contains(required) {
Ok((rid, got))
} else {
Err(crate::Error::Permissions {
required, got
})
}
} else {
Err(crate::Error::RepositoryNotFound)
}
}
pub fn is_public() -> diesel::expression::SqlLiteral<Bool> {
diesel::dsl::sql::<Bool>("EXISTS (SELECT 1 FROM permissions WHERE permissions.user_id = '00000000-0000-0000-0000-000000000000' AND permissions.repo_id = repositories.id)")
}
pub async fn repository(
db: &mut AsyncPgConnection,
owner: &str,
name: &str,
user: Uuid,
required: Perm,
) -> Result<Repository, crate::Error> {
use crate::db::repositories::dsl as r;
use crate::db::users::dsl as u;
if let Some((
repo_id,
perms,
suspended,
is_public,
)) = r::repositories
.inner_join(u::users)
.filter(u::login.eq(owner))
.filter(u::is_active)
.filter(r::name.eq(name))
.filter(r::is_active)
.select((
r::id,
crate::permissions!(user, r::id),
u::suspended,
is_public(),
))
.get_result::<(
uuid::Uuid,
i64,
bool,
bool,
)>(db)
.await
.optional()?
{
let got = if suspended {
Perm::empty()
} else if is_public {
Perm::from_bits(perms).unwrap() | Perm::READ
} else {
Perm::from_bits(perms).unwrap()
};
if !got.contains(required) {
Err(crate::Error::Permissions { required, got })
} else {
Ok(Repository {
id: repo_id,
permissions: got,
})
}
} else {
Err(crate::Error::RepositoryNotFound)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RepoPath {
pub owner: String,
pub repo: String,
pub channel: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct RepoPath_ {
owner: String,
repo: String,
}
impl<S> axum::extract::FromRequestParts<S> for RepoPath
where
S: Send + Sync,
{
type Rejection =
<axum::extract::Path<RepoPath_> as axum::extract::FromRequestParts<S>>::Rejection;
async fn from_request_parts(
parts: &mut http::request::Parts,
state: &S,
) -> Result<Self, Self::Rejection> {
use axum::extract::Path;
let Path(r) = Path::<RepoPath_>::from_request_parts(parts, state).await?;
debug!("path r {:?}", r);
let mut it = r.repo.split(':');
Ok(RepoPath {
owner: r.owner,
repo: it.next().unwrap().to_string(),
channel: it.next().map(|x| x.to_string()),
})
}
}
impl Repo {
pub async fn channels(&self) -> Vec<String> {
use libpijul::TxnT;
let mut c = Vec::new();
let pristine = self.pristine.read().await;
let txn = pristine.txn_begin().unwrap();
let mut at_least_one = false;
for chan in txn.channels("").unwrap() {
at_least_one = true;
let name = {
let s = chan.read();
debug!("channels: {:?}", s.name.as_str());
let (_, s) = s.name.as_str().split_at(37);
s.to_string()
};
c.push(name)
}
c.sort();
if !at_least_one {
c.push("main".to_string())
}
c
}
}
pub async fn free_used_storage(
db: &mut diesel_async::AsyncPgConnection,
repo: uuid::Uuid,
size: u64,
) -> Result<(), diesel::result::Error> {
use crate::db::repositories::dsl as repositories;
use crate::db::users::dsl as users;
use diesel::sql_types::BigInt;
let n = diesel::update(users::users)
.filter(
users::id.eq_any(
repositories::repositories
.filter(repositories::id.eq(repo))
.select(repositories::owner),
),
)
.set(
users::storage_used.eq(diesel::dsl::sql("GREATEST(0, storage_used - ")
.bind::<BigInt, _>(size as i64)
.sql(")")),
)
.execute(db)
.await?;
debug!("del_change updated {:?}", n);
Ok(())
}
#[derive(Debug)]
pub struct Repository {
pub id: uuid::Uuid,
pub permissions: Perm,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Identity {
pub public_key: libpijul::key::PublicKey,
pub login: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub origin: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub email: Option<String>,
pub last_modified: u64,
}
#[derive(Debug, Serialize)]
pub struct PathElement {
pub basename: String,
pub pos: String,
pub meta: libpijul::pristine::InodeMetadata,
}
fn current_path<
T: libpijul::GraphTxnT + libpijul::ChannelTxnT,
C: libpijul::changestore::ChangeStore,
>(
txn: &T,
channel: &T::Channel,
changes: &C,
pos: libpijul::pristine::Position<libpijul::ChangeId>,
) -> Result<Vec<PathElement>, crate::Error> {
let mut current_path = Vec::new();
{
let mut pos = pos;
while !pos.is_root() {
if let Some(x) = libpijul::fs::iter_basenames(txn, changes, txn.graph(channel), pos)
.map_err(|_| crate::Error::Txn)?
.next()
{
let (grandparent, meta, basename) = x.map_err(|_| crate::Error::Txn)?;
current_path.push(PathElement {
basename: basename,
pos: pos.to_base32(),
meta,
});
pos = grandparent
} else {
break;
}
}
}
current_path.reverse();
Ok(current_path)
}