use crate::permissions::Perm;
use crate::repository::{channel_spec, ChannelSpec, RepositoryId};
use crate::Keyalgorithm_;
use byteorder::{BigEndian, ByteOrder};
use cuach::*;
use diesel::sql_types::{BigInt, Bool, Integer, Text};
use diesel::{
BoolExpressionMethods, ExpressionMethods, Insertable, IntoSql, NullableExpressionMethods,
OptionalExtension, QueryDsl,
};
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;
use futures;
use futures::Future;
use lazy_static::*;
use libpijul::pristine::{Base32, ChannelTxnT, DepsTxnT, GraphTxnT, TxnT};
use libpijul::TxnTExt;
use regex::Regex;
use serde_derive::*;
use std;
use std::collections::HashSet;
use std::io::Read;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use thrussh;
use thrussh::server::*;
use thrussh::ChannelId;
use thrussh_keys;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tracing::*;
use uuid::Uuid;
pub async fn socket(ssh_addr: &SocketAddr) -> TcpListener {
tokio::net::TcpListener::bind(ssh_addr).await.unwrap()
}
const CHAL_LEN: usize = 30;
lazy_static! {
pub static ref DISC: Regex = Regex::new(r#":([0-9]+)"#).unwrap();
}
pub const EMAIL_CHARSET: &'static str = "UTF-8";
pub async fn worker(config: super::Config, socket: TcpListener) {
loop {
let (socket, addr) = if let Ok(x) = socket.accept().await {
x
} else {
error!("SSH accept");
continue;
};
let ip: std::net::IpAddr = addr.ip();
info!("ssh connection from {:?}", ip);
let config = config.clone();
use crate::db::password_failures::dsl as pf;
let ip_sql = ipnetwork::IpNetwork::new(ip, if ip.is_ipv4() { 32 } else { 128 }).unwrap();
if let (Some(date), count) = pf::password_failures
.filter(pf::ip.eq(ip_sql))
.filter(pf::date.gt(chrono::Utc::now() - chrono::Duration::hours(1)))
.select((diesel::dsl::max(pf::date), diesel::dsl::count(pf::date)))
.get_result::<(Option<chrono::DateTime<chrono::Utc>>, i64)>(
&mut config.db.get().await.unwrap(),
)
.await
.unwrap()
{
let now = chrono::Utc::now();
if now.signed_duration_since(date).num_seconds() < count {
continue;
}
}
tokio::spawn(run_stream(
config.ssh.clone(),
socket,
H {
config: config.clone(),
db: config.db.clone(),
user: String::new(),
user_id: None,
repo: String::new(),
default_channel: String::new(),
channel: None,
repo_id: None,
permissions: crate::permissions::Perm::empty(),
owner: String::new(),
protocol_version: 0,
state: None,
applied: Vec::new(),
ip_sql,
last_window_adjustment: SystemTime::now(),
random_challenge: RandomChallenge {
ch: String::new(),
gen: std::time::SystemTime::UNIX_EPOCH,
key: None,
},
},
));
}
}
struct H {
pub config: super::Config,
pub db: crate::config::Db,
pub protocol_version: usize,
pub user: String,
pub user_id: Option<Uuid>,
pub owner: String,
pub repo: String,
pub permissions: crate::permissions::Perm,
pub channel: Option<ChannelSpec>,
pub default_channel: String,
pub repo_id: Option<RepositoryId>,
pub state: Option<State>,
pub ip_sql: ipnetwork::IpNetwork,
pub applied: Vec<libpijul::pristine::Hash>,
pub last_window_adjustment: SystemTime,
pub random_challenge: RandomChallenge,
}
struct RandomChallenge {
ch: String,
gen: std::time::SystemTime,
key: Option<libpijul::key::PublicKey>,
}
struct TmpFile {
f: Option<tokio::fs::File>,
path: std::path::PathBuf,
}
impl TmpFile {
async fn persist(
mut self,
path: &std::path::Path,
) -> Result<tokio::fs::File, tokio::io::Error> {
tokio::fs::rename(&self.path, path).await?;
Ok(self.f.take().unwrap())
}
}
impl Drop for TmpFile {
fn drop(&mut self) {
if self.f.is_some() {
debug!("dropping file {:?}", self.path);
std::fs::remove_file(&self.path).unwrap_or(());
}
}
}
enum State {
Protocol,
Change {
hash: libpijul::pristine::Hash,
size: usize,
file: TmpFile,
file_size: usize,
},
}
impl From<libpijul::key::Algorithm> for Keyalgorithm_ {
fn from(a: libpijul::key::Algorithm) -> Keyalgorithm_ {
match a {
libpijul::key::Algorithm::Ed25519 => Keyalgorithm_::Ed25519,
}
}
}
impl From<Keyalgorithm_> for libpijul::key::Algorithm {
fn from(a: Keyalgorithm_) -> libpijul::key::Algorithm {
match a {
Keyalgorithm_::Ed25519 => libpijul::key::Algorithm::Ed25519,
}
}
}
impl H {
async fn reject(self) -> Result<(Self, thrussh::server::Auth), crate::Error> {
use crate::db::password_failures::dsl as pf;
diesel::insert_into(pf::password_failures)
.values((pf::ip.eq(&self.ip_sql), pf::login.eq(&self.user)))
.execute(&mut self.db.get().await.unwrap())
.await?;
Ok((self, thrussh::server::Auth::Reject))
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Signature {
public_key: String,
timestamp: chrono::DateTime<chrono::Utc>,
signature: String,
}
impl Handler for H {
type Error = crate::Error;
type FutureAuth = Pin<Box<dyn Future<Output = Result<(Self, Auth), crate::Error>> + Send>>;
type FutureUnit = Pin<Box<dyn Future<Output = Result<(Self, Session), crate::Error>> + Send>>;
type FutureBool = futures::future::Ready<Result<(Self, Session, bool), crate::Error>>;
fn finished(self, session: thrussh::server::Session) -> Self::FutureUnit {
Box::pin(futures::future::ready(Ok((self, session))))
}
fn finished_auth(self, auth: thrussh::server::Auth) -> Self::FutureAuth {
Box::pin(futures::future::ready(Ok((self, auth))))
}
fn finished_bool(self, b: bool, s: thrussh::server::Session) -> Self::FutureBool {
futures::future::ready(Ok((self, s, b)))
}
fn auth_publickey(
mut self,
user: &str,
public_key: &thrussh_keys::key::PublicKey,
) -> Self::FutureAuth {
self.user.clear();
self.user.push_str(user);
use thrussh_keys::PublicKeyBase64;
debug!("auth_publickey {:?}", public_key);
debug!("user {:?}", user);
let key_bytes = match public_key {
thrussh_keys::key::PublicKey::RSA { ref key, .. } => {
(thrussh_keys::key::PublicKey::RSA {
key: thrussh_keys::key::OpenSSLPKey(key.0.clone()),
hash: thrussh_keys::key::SignatureHash::SHA2_512,
})
.public_key_bytes()
}
k => k.public_key_bytes(),
};
debug!(
"key_bytes = {:?}",
data_encoding::HEXLOWER.encode(&key_bytes)
);
Box::pin(async move {
use crate::db::publickeys::dsl as publickeys;
use crate::db::users::dsl as users;
let mut db_ = self.db.get().await?;
if let Some(id) = users::users
.inner_join(publickeys::publickeys)
.filter(users::login.eq(&self.user))
.filter(publickeys::bin.eq(&key_bytes))
.filter(users::is_active)
.select(users::id)
.get_result(&mut db_)
.await
.optional()?
{
debug!("accept");
use crate::db::password_failures::dsl as pf;
diesel::delete(pf::password_failures)
.filter(pf::ip.eq(self.ip_sql))
.filter(pf::login.eq(&self.user))
.execute(&mut db_)
.await?;
self.user_id = Some(id);
Ok((self, Auth::Accept))
} else {
std::mem::drop(db_);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Ok((self, Auth::Reject))
}
})
}
fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
self.user.clear();
self.user.push_str(user);
let password = password.to_string();
Box::pin(async move {
use crate::db::password_failures::dsl as pf;
let mut db_ = self.db.get().await?;
let count = pf::password_failures
.select(diesel::dsl::count::<BigInt, i64>(1))
.filter(pf::ip.eq(self.ip_sql).or(pf::login.eq(&self.user)))
.get_result::<i64>(&mut db_)
.await?;
if count > self.config.max_password_attempts {
return Ok((self, Auth::Reject));
}
use crate::db::users::dsl as users;
if let Some(user_id) = users::users
.filter(users::login.eq(&self.user))
.filter(crate::check_password!(&password))
.filter(users::email_is_invalid.is_null())
.filter(users::is_active)
.select(users::id)
.get_result(&mut db_)
.await
.optional()?
{
self.user_id = Some(user_id);
return Ok((self, Auth::Accept));
} else {
self.reject().await
}
})
}
fn exec_request(
mut self,
ch: thrussh::ChannelId,
exec: &[u8],
mut session: Session,
) -> Self::FutureUnit {
debug!("exec_request = {:?}", exec);
let exec = if let Ok(exec) = std::str::from_utf8(exec) {
exec.to_string()
} else {
return Box::pin(async { Ok((self, session)) });
};
debug!("exec: {:?}", exec);
Box::pin(async move {
let mut words = exec.split_whitespace();
if !self.parse_request(&exec) {
return Ok((self, session));
}
if let (Some("pijul"), Some(cmd)) = (words.next(), words.next()) {
match cmd {
"protocol" => {
debug!(
"owner = {:?}, repo = {:?}, channel = {:?}",
self.owner, self.repo, self.channel
);
if self.repo.is_empty() {
self.state = Some(State::Protocol);
session.channel_success(ch);
return Ok((self, session));
}
use crate::db::repositories::dsl as r;
use crate::db::users::dsl as users;
if let Some((repo_id, owner_id, fork_origin, permissions, channel, suspended, public)) = r::repositories
.inner_join(users::users)
.filter(users::login.eq(&self.owner))
.filter(r::name.eq(&self.repo))
.filter(r::is_active)
.filter(users::is_active)
.select((
r::id,
users::id,
r::fork_origin,
crate::permissions!(self.user_id.unwrap(), r::id),
r::default_channel,
users::suspended,
crate::repository::is_public(),
))
.get_result::<(uuid::Uuid, uuid::Uuid, Option<uuid::Uuid>, i64, String, bool, bool)>(&mut self.db.get().await?)
.await.optional()?
{
self.repo_id = Some(RepositoryId {
repo_id,
owner_id,
fork_origin,
});
self.permissions = Perm::from_bits(permissions).unwrap();
let channel: String = channel;
let suspended: bool = suspended;
let public: bool = public;
if !public && suspended {
self.permissions &= Perm::READ
}
self.default_channel = channel.clone();
self.channel = Some(ChannelSpec::channel(channel.into()));
self.state = Some(State::Protocol);
session.channel_success(ch);
Ok((self, session))
} else {
debug!("permission denied");
session.extended_data(
ch,
1,
thrussh::CryptoVec::from_slice(
b"Repository not found, or insufficient permissions\n",
),
);
session.exit_status_request(ch, 1);
Ok((self, session))
}
}
cmd => {
error!("cmd = {:?}", cmd);
session.extended_data(
ch,
1,
thrussh::CryptoVec::from_slice(b"Invalid Pijul subcommand\n"),
);
session.exit_status_request(ch, 1);
Ok((self, session))
}
}
} else {
session.exit_status_request(ch, 127);
Ok((self, session))
}
})
}
fn data(
mut self,
chan: thrussh::ChannelId,
data: &[u8],
mut session: Session,
) -> Self::FutureUnit {
debug!("data len {:?}", data.len());
let data = data.to_vec();
Box::pin(async move {
let result = self.protocol(chan, &data, &mut session).await;
match result {
Ok(ProtocolResult::Change { hash, file, size }) => {
let result = self.apply(hash, file, size, chan, &mut session).await;
debug!("result: {:?}", result);
if let Err(e) = result {
session.extended_data(
chan,
1,
thrussh::CryptoVec::from_slice(e.to_string().as_bytes()),
);
session.exit_status_request(chan, 1);
return Ok((self, session));
}
self.applied.push(hash);
}
Ok(ProtocolResult::ChannelList(disc)) => {
if self.repo_id.is_none() {
return Ok((self, session));
}
let repo = self
.config
.repo_locks
.get(&self.repo_id.as_ref().unwrap().origin_id())
.await?;
let data = crate::discussions::discussion_changelist(
&mut self.db.get().await.unwrap(),
&repo.changes,
self.repo_id.as_ref().unwrap().repo_id,
disc,
)
.await?;
session.data(chan, thrussh::CryptoVec::from_slice(&data));
self.state = Some(State::Protocol);
}
Ok(ProtocolResult::State { disc, pos }) => {
if self.repo_id.is_none() {
return Ok((self, session));
}
let (n, m) = crate::discussions::discussion_state(
&mut self.db.get().await.unwrap(),
self.repo_id.as_ref().unwrap().repo_id,
disc,
pos,
)
.await?;
let data = format!("{} {}\n", n, m.to_base32());
session.data(chan, thrussh::CryptoVec::from_slice(&data.as_bytes()));
self.state = Some(State::Protocol);
}
Ok(ProtocolResult::None) => {}
Err(e) => {
match e {
crate::Error::AmbiguousInode => {
session.extended_data(
chan,
1,
thrussh::CryptoVec::from_slice(b"Ambiguous path\n"),
);
}
crate::Error::InodeNotFound => {
session.extended_data(
chan,
1,
thrussh::CryptoVec::from_slice(b"Path not found\n"),
);
}
_ => {
session.extended_data(
chan,
1,
thrussh::CryptoVec::from_slice(b"Channel not found or empty\n"),
);
}
}
session.eof(chan);
session.exit_status_request(chan, 1);
session.close(chan);
return Ok((self, session));
}
}
Ok((self, session))
})
}
fn adjust_window(&mut self, _channel: ChannelId, target: u32) -> u32 {
let elapsed = self.last_window_adjustment.elapsed().unwrap();
self.last_window_adjustment = SystemTime::now();
if target >= 10_000_000 {
return target;
}
if elapsed < Duration::from_secs(2) {
target * 2
} else if elapsed > Duration::from_secs(8) {
target / 2
} else {
target
}
}
fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
debug!("channel eof");
debug!("{:#?}", self.applied);
Box::pin(async move {
if self.repo_id.is_none() {
return Ok((self, session));
}
session.eof(channel);
session.exit_status_request(channel, 0);
session.close(channel);
Ok((self, session))
})
}
}
enum ProtocolResult {
None,
Change {
hash: libpijul::pristine::Hash,
file: TmpFile,
size: usize,
},
ChannelList(i32),
State {
disc: i32,
pos: Option<u64>,
},
}
enum ChannelList {
Discussion(i32),
Data(Vec<u8>),
}
impl H {
fn parse_request<'a>(&mut self, exec: &'a str) -> bool {
lazy_static! {
static ref ARGS: Regex =
Regex::new(r#"--(\S*) +(("((\\\\|\\"|[^"])*)")|((\\\\|\\ |[^ ])*))"#).unwrap();
static ref REPO: Regex = Regex::new(r#"'?/?(([^/']*)/)?([^']+)'?"#).unwrap();
};
for cap in ARGS.captures_iter(exec) {
debug!("cap:{:?}", cap);
match cap.get(1).as_ref().map(|x| x.as_str()) {
Some("repository") => {
if let Some(repo_) = cap.get(4).or(cap.get(6)) {
for cap in REPO.captures_iter(repo_.as_str()) {
debug!("cap {:?}", cap);
if let Some(owner) = cap.get(2) {
self.owner.clear();
self.owner.push_str(owner.as_str())
} else {
self.owner.clear();
self.owner.push_str(&self.user)
}
self.repo.clear();
self.repo.push_str(cap.get(3).unwrap().as_str());
}
} else {
error!("unparsed command: {:?}", exec);
return false;
}
}
Some("version") => {
if let Some(ver) = cap.get(4).or(cap.get(6)) {
if ver.as_str() == "3" {
self.protocol_version = 3
} else {
return false;
}
} else {
return false;
}
}
_ => {
debug!("extra: {:?}", cap);
}
}
}
self.protocol_version == 3
}
async fn protocol(
&mut self,
chan: thrussh::ChannelId,
data: &[u8],
session: &mut Session,
) -> Result<ProtocolResult, crate::Error> {
lazy_static! {
static ref STATE: Regex = Regex::new(r#"^state\s+(\S+)(\s+([0-9]+)?)\s+"#).unwrap();
static ref ID: Regex = Regex::new(r#"^id\s+(\S+)\s+"#).unwrap();
static ref CHANGELIST: Regex =
Regex::new(r#"^changelist\s+(\S+)\s+([0-9]+)(.*)\s+"#).unwrap();
static ref CHANGELIST_PATHS: Regex = Regex::new(r#""((\\")|[^"])+""#).unwrap();
static ref CHANGE: Regex = Regex::new(r#"^((change)|(partial))\s+([^ ]*)\s+"#).unwrap();
static ref APPLY: Regex =
Regex::new(r#"^apply\s+(\S+)\s+([^ ]*) ([0-9]+)\s+"#).unwrap();
static ref TAG: Regex = Regex::new(r#"^tag\s+(\S+)\s+"#).unwrap();
static ref TAGUP: Regex =
Regex::new(r#"^tagup\s+(\S+)\s+(\S+)\s+([0-9]+)\s+"#).unwrap();
static ref CHANNEL: Regex = Regex::new(r#"^channel\s+(\S+)\s+"#).unwrap();
static ref ARCHIVE: Regex =
Regex::new(r#"^archive\s+(\S+)\s*(( ([^:]+))*)( :(.*))?\n"#).unwrap();
static ref CHALLENGE: Regex = Regex::new(r#"^challenge (\S+)\n"#).unwrap();
static ref IDENTITIES: Regex = Regex::new(r#"^identities( (\d+))?\n"#).unwrap();
static ref PROVE: Regex = Regex::new(r#"^prove (.+)\n"#).unwrap();
}
debug!("protocol, channel {:?}", self.channel);
match self.state.take() {
Some(State::Protocol) => {
let data = {
if let Some(n) = data.iter().position(|&x| x == b'\n') {
&data[..n + 1]
} else {
return Err(crate::Error::ProtocolError.into());
}
};
let data = std::str::from_utf8(data)?;
debug!("data = {:?}", data);
if let Some(cap) = CHANGELIST.captures(data) {
match self.changelist(cap).await? {
ChannelList::Discussion(d) => return Ok(ProtocolResult::ChannelList(d)),
ChannelList::Data(data) => {
trace!("sending data {:?}", data);
session.data(chan, thrussh::CryptoVec::from_slice(&data));
debug!("sent");
self.state = Some(State::Protocol);
}
}
} else if let Some(cap) = STATE.captures(data) {
match self.state(cap).await? {
(ChannelList::Discussion(disc), pos) => {
return Ok(ProtocolResult::State { disc, pos })
}
(ChannelList::Data(data), _) => {
trace!("sending data {:?}", data);
session.data(chan, thrussh::CryptoVec::from_slice(&data));
debug!("sent");
self.state = Some(State::Protocol);
}
}
} else if let Some(cap) = ID.captures(data) {
match self.id(cap, false).await? {
ChannelList::Discussion(_) => {
session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
}
ChannelList::Data(data) => {
session.data(chan, thrussh::CryptoVec::from_slice(&data));
}
}
self.state = Some(State::Protocol);
} else if let Some(cap) = APPLY.captures(data) {
self.parse_apply(cap).await?;
} else if let Some(cap) = CHANGE.captures(data) {
let is_change = &cap[1] == "change";
self.parse_change(cap, chan, session, is_change)?;
self.state = Some(State::Protocol);
} else if let Some(cap) = CHALLENGE.captures(data) {
self.challenge(cap, chan, session)?;
self.state = Some(State::Protocol);
} else if let Some(cap) = PROVE.captures(data) {
self.prove(cap, chan, session).await?;
self.state = Some(State::Protocol);
} else if let Some(cap) = IDENTITIES.captures(data) {
let id: Option<i64> = if let Some(id) = cap.get(2) {
Some(id.as_str().parse()?)
} else {
None
};
self.identities(id, chan, session).await?;
self.state = Some(State::Protocol);
} else {
debug!("not recognised: {:?}", data);
self.state = Some(State::Protocol);
return Ok(ProtocolResult::None);
}
}
Some(State::Change {
hash,
size,
mut file,
mut file_size,
}) => {
debug!("data.len() = {:?}", data.len());
file.f.as_mut().unwrap().write_all(data).await?;
file_size += data.len();
debug!("{:?} < {:?}", file_size, size);
if file_size < size {
self.state = Some(State::Change {
hash,
size,
file,
file_size,
})
} else {
self.state = Some(State::Protocol);
return Ok(ProtocolResult::Change { hash, file, size });
}
}
None => {}
}
Ok(ProtocolResult::None)
}
fn challenge<'a>(
&mut self,
cap: regex::Captures<'a>,
chan: ChannelId,
session: &mut Session,
) -> Result<(), crate::Error> {
use rand::Rng;
if let Ok(json) = serde_json::from_slice::<libpijul::key::PublicKey>(cap[1].as_bytes()) {
self.random_challenge.key = Some(json)
} else {
return Ok(());
};
self.random_challenge.ch.clear();
self.random_challenge.ch.reserve(CHAL_LEN);
self.random_challenge.ch.extend(
rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.take(30)
.map(char::from),
);
self.random_challenge.ch.push('\n');
session.data(
chan,
thrussh::CryptoVec::from_slice(self.random_challenge.ch.as_bytes()),
);
self.random_challenge.ch.pop();
self.random_challenge.gen = std::time::SystemTime::now();
Ok(())
}
async fn identities(
&mut self,
_rev: Option<i64>,
chan: ChannelId,
session: &mut Session,
) -> Result<(), crate::Error> {
if let Some(ref _id) = self.repo_id {
session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
}
Ok(())
}
async fn prove<'a>(
&mut self,
cap: regex::Captures<'a>,
chan: ChannelId,
session: &mut Session,
) -> Result<(), crate::Error> {
if self.random_challenge.gen.elapsed()? > std::time::Duration::from_secs(10) {
session.data(
chan,
thrussh::CryptoVec::from_slice(b"Challenge expired.\n"),
);
return Ok(());
}
if let Some(ref key) = self.random_challenge.key {
let pk = key.load()?;
debug!("challenge {:?}", self.random_challenge.ch);
pk.verify(
self.random_challenge.ch.as_bytes(),
&cap[1],
&chrono::Utc::now(),
)?;
let key_key = bs58::decode(&key.key).into_vec()?;
let key_sig = bs58::decode(&key.signature).into_vec()?;
use crate::db::signingkeys::dsl as sk;
diesel::insert_into(sk::signingkeys)
.values((
sk::user_id.eq(self.user_id.as_ref().unwrap()),
sk::algorithm.eq(Keyalgorithm_::from(key.algorithm)),
sk::public_key.eq(key_key),
sk::signature.eq(key_sig),
sk::expires.eq(key.expires),
))
.execute(&mut self.db.get().await?)
.await?;
session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
}
Ok(())
}
async fn changelist<'a>(
&mut self,
cap: regex::Captures<'a>,
) -> Result<ChannelList, crate::Error> {
let from: u64 = (&cap[2]).parse()?;
lazy_static! {
static ref CHANGELIST_PATHS: Regex = Regex::new(r#""(((\\")|[^"])+)""#).unwrap();
}
if let Some(disc) = DISC.captures(&cap[1]) {
let disc: i32 = disc[1].parse().unwrap();
Ok(ChannelList::Discussion(disc))
} else if let Some(ref id) = self.repo_id {
debug!("taking lock");
let repo = self.config.repo_locks.get(&id.origin_id()).await?;
let pristine = repo.pristine.read().await;
let txn = pristine.txn_begin()?;
let c = channel_spec(id, &cap[1]);
let channel_spec = ChannelSpec::channel((&cap[1]).into());
let channel = if let Some(channel) = txn.load_channel(&c)? {
channel
} else if &cap[1] == self.default_channel {
debug!("channel not found {:?}", c);
match channel_spec {
ChannelSpec::Channel(ref c_) => {
debug!("c_ = {:?} {:?}", c_, &cap[1]);
if c_ == &cap[1] {
return Ok(ChannelList::Data(vec![b'\n']));
}
}
ChannelSpec::CI(ref c_) => {
if c_.len() + 3 == cap[1].len() {
let (a, b) = cap[1].split_at(cap[1].len() - 3);
if c_ == a && b == "/ci" {
return Ok(ChannelList::Data(vec![b'\n']));
}
}
}
_ => {}
}
return Err((crate::Error::ChannelNotFound {
channel: cap[1].to_string(),
})
.into());
} else {
return Ok(ChannelList::Data(
format!("error:Remote channel {:?} not found\n", &cap[1]).into(),
));
};
let mut paths = HashSet::new();
let mut data = Vec::new();
for r in CHANGELIST_PATHS.captures_iter(&cap[3]) {
if let Ok((p, ambiguous)) = txn.follow_oldest_path(&repo.changes, &channel, &r[1]) {
let h = txn.get_external(&p.change)?.unwrap();
let h: libpijul::Hash = h.into();
writeln!(data, "{}.{}", h.to_base32(), p.pos.0)?;
if ambiguous {
return Err(crate::Error::AmbiguousInode.into());
}
paths.insert(p);
paths.extend(
libpijul::fs::iter_graph_descendants(&txn, txn.graph(&*channel.read()), p)?
.filter_map(|x| x.ok()),
);
} else {
return Err(crate::Error::InodeNotFound.into());
}
}
use std::io::Write;
let tags: Vec<u64> = txn
.iter_tags(txn.tags(&*channel.read()), from)?
.map(|k| (*k.unwrap().0).into())
.collect();
let mut tagsi = 0;
for x in txn.log(&*channel.read(), from)? {
let (n, (h, m)) = x?;
let h_int = txn.get_internal(h)?.unwrap();
let mut on_channel = paths.is_empty();
if !on_channel {
let mut it = paths.iter();
loop {
if let Some(x) = it.next() {
if x.change == *h_int
|| txn.get_touched_files(x, Some(h_int))?.is_some()
{
on_channel = true;
break;
}
} else {
break;
}
}
}
if on_channel {
let h: libpijul::Hash = h.into();
let m: libpijul::Merkle = m.into();
if tags.get(tagsi) == Some(&n) {
writeln!(data, "{}.{}.{}.", n, h.to_base32(), m.to_base32())?;
tagsi += 1
} else {
writeln!(data, "{}.{}.{}", n, h.to_base32(), m.to_base32())?;
}
}
}
data.push(b'\n');
Ok(ChannelList::Data(data))
} else {
debug!("no repo id / no channel");
Ok(ChannelList::Data(Vec::new()))
}
}
async fn state<'a>(
&mut self,
cap: regex::Captures<'a>,
) -> Result<(ChannelList, Option<u64>), crate::Error> {
let pos: Option<u64> = cap.get(3).map(|c| c.as_str().parse().unwrap());
if let Some(disc) = DISC.captures(&cap[1]) {
let disc: i32 = disc[1].parse().unwrap();
Ok((ChannelList::Discussion(disc), pos))
} else if let Some(ref id) = self.repo_id {
debug!("taking lock");
let repo = self
.config
.repo_locks
.get(&self.repo_id.as_ref().unwrap().origin_id())
.await?;
let pristine = repo.pristine.read().await;
let txn = pristine.txn_begin()?;
let c = channel_spec(id, &cap[1]);
let channel = if let Some(channel) = txn.load_channel(&c)? {
channel
} else {
debug!("channel not found {:?}", c);
return Ok((ChannelList::Data(vec![b'\n']), pos));
};
let mut data = Vec::new();
use std::io::Write;
if let Some(pos) = pos {
for n in txn.log(&*channel.read(), pos)? {
let (n, (_, m)) = n?;
if n < pos {
continue;
} else if n > pos {
data.push(b'\n');
break;
} else {
let m: libpijul::Merkle = m.into();
let m2 = if let Some(x) = txn
.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
.next()
{
x?.1.b.into()
} else {
libpijul::Merkle::zero()
};
writeln!(&mut data, "{} {} {}", n, m.to_base32(), m2.to_base32()).unwrap();
break;
}
}
} else {
if let Some(n) = txn.reverse_log(&*channel.read(), None)?.next() {
let (n, (_, m)) = n?;
let m: libpijul::Merkle = m.into();
let m2 = if let Some(x) = txn
.rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
.next()
{
x?.1.b.into()
} else {
libpijul::Merkle::zero()
};
writeln!(data, "{} {} {}", n, m.to_base32(), m2.to_base32())?
} else {
writeln!(data, "-")?;
}
}
if data.is_empty() {
data.push(b'\n')
}
Ok((ChannelList::Data(data), pos))
} else {
debug!("no repo id / no channel");
Ok((ChannelList::Data(Vec::new()), pos))
}
}
async fn id<'a>(
&mut self,
cap: regex::Captures<'a>,
is_tag: bool,
) -> Result<ChannelList, crate::Error> {
debug!("id is_tag {:?}", is_tag);
if let Some(disc) = DISC.captures(&cap[1]) {
let disc: i32 = disc[1].parse().unwrap();
return Ok(ChannelList::Discussion(disc));
} else if let Some(ref id) = self.repo_id {
debug!("taking lock");
let repo = self
.config
.repo_locks
.get(&self.repo_id.as_ref().unwrap().origin_id())
.await?;
debug!("ok, reading");
let pristine = repo.pristine.read().await;
debug!("ok, reading");
let txn = pristine.txn_begin()?;
debug!("ok, reading");
let c = channel_spec(id, &cap[1]);
debug!("channel c = {:?}", c);
if let (Some(ChannelSpec::Channel(ref c)), Some(ref id)) =
(&self.channel, &self.repo_id)
{
if c == &self.default_channel {
let id = if is_tag {
let mut id_ = [0; 16];
for (a, b) in id_.iter_mut().zip(id.repo_id.as_bytes().iter()) {
*a = 255 - *b
}
data_encoding::BASE32_NOPAD.encode(&id_)
} else {
data_encoding::BASE32_NOPAD.encode(id.repo_id.as_bytes())
};
debug!("id {:?} = {:?}", is_tag, id);
return Ok(ChannelList::Data(format!("{}\n", id).into()));
}
}
if let Some(channel) = txn.load_channel(&c)? {
let resp = format!(
"{}\n",
data_encoding::BASE32_NOPAD.encode(channel.read().id.as_bytes())
)
.into();
debug!("resp = {:?}", resp);
return Ok(ChannelList::Data(resp));
}
}
debug!("no repo id / no channel");
Ok(ChannelList::Data(b"\n".to_vec()))
}
async fn parse_apply<'a>(&mut self, cap: regex::Captures<'a>) -> Result<(), crate::Error> {
let hash = if let Some(h) = libpijul::pristine::Hash::from_base32((&cap[2]).as_bytes()) {
h
} else {
debug!("wrong hash");
self.state = Some(State::Protocol);
return Ok(());
};
if let Ok(d) = cap[1].parse::<i32>() {
self.channel = Some(ChannelSpec::Discussion(d))
} else {
self.channel = Some(ChannelSpec::channel(cap[1].into()));
};
let size: usize = (&cap[3]).parse()?;
debug!("parse_apply = {:?} {:?}", hash, size);
let mut path = self.config.repositories_path.join("tmp");
path.push(
self.repo_id
.as_ref()
.map(|r| r.repo_id)
.unwrap()
.hyphenated()
.to_string(),
);
tokio::fs::create_dir_all(&path).await?;
path.push(&hash.to_base32());
let f = Some(
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await?,
);
let file = TmpFile { f, path };
self.state = Some(State::Change {
hash,
size,
file_size: 0,
file,
});
Ok(())
}
fn parse_change(
&mut self,
cap: regex::Captures,
chan: ChannelId,
session: &mut Session,
full: bool,
) -> Result<(), crate::Error> {
let hash = if let Some(h) = libpijul::pristine::Hash::from_base32((&cap[4]).as_bytes()) {
h
} else {
debug!("wrong hash");
return Ok(());
};
if self.repo_id.is_none() {
return Ok(());
}
let mut p = self
.repo_id
.as_ref()
.unwrap()
.nest_changes_path(&self.config);
libpijul::changestore::filesystem::push_filename(&mut p, &hash);
debug!("change path = {:?}", p);
let mut v = thrussh::CryptoVec::new();
let mut f = std::fs::File::open(&p)?;
let full_size = std::fs::metadata(&p)?.len();
let size = if full || full_size <= self.config.partial_change_size {
full_size
} else {
libpijul::change::Change::size_no_contents(&mut f)?
};
v.resize(8 + size as usize);
f.read_exact(&mut v[8..])?;
BigEndian::write_u64(&mut v[..8], size);
debug!("sending {:?}", size);
session.data(chan, v);
Ok(())
}
async fn apply(
&mut self,
hash: libpijul::pristine::Hash,
file: TmpFile,
size: usize,
channel_id: thrussh::ChannelId,
session: &mut thrussh::server::Session,
) -> Result<(), crate::Error> {
debug!("saving change {:?}", hash);
debug!("apply: {:?} {:?}", self.repo_id, self.channel);
if self.repo_id.is_none() {
return Ok(());
}
match self.channel {
Some(ChannelSpec::CI(_)) | Some(ChannelSpec::Channel(_)) => {
if !self.permissions.contains(Perm::APPLY) {
return Err((crate::Error::Forbidden).into());
}
}
Some(ChannelSpec::Discussion(_)) => {
if !self.permissions.contains(Perm::CREATE_DISCUSSION) {
return Err((crate::Error::Forbidden).into());
}
}
None => return Ok(()),
}
let channel = self.channel.take().unwrap();
debug!("applying {:?} to {:?}", hash, channel);
let repo_id = self.repo_id.as_ref().unwrap();
let repo = self
.config
.repo_locks
.get(&repo_id.origin_id())
.await
.unwrap();
if !repo.changes.has_change(&hash) {
let owner = self.repo_id.as_ref().unwrap().owner_id.clone();
use crate::db::users::dsl as u;
let n = if self.config.size_limit > 0 {
diesel::update(u::users.find(owner))
.set((u::storage_used.eq(u::storage_used + size as i64),))
.filter(
(u::storage_used + size as i64).le(self.config.size_limit)
)
.execute(&mut self.db.get().await?)
.await?
} else {
diesel::update(u::users.find(owner))
.set((u::storage_used.eq(u::storage_used + size as i64),))
.execute(&mut self.db.get().await?)
.await?
};
if n > 0 {
let name = repo.changes.filename(&hash);
tokio::fs::create_dir_all(name.parent().unwrap()).await?;
let mut file = file.persist(&name).await?;
debug!("seek");
let changes = repo.changes.clone();
file.flush().await?;
file.seek(tokio::io::SeekFrom::Start(0)).await?;
debug!("seek done");
changes.check(&mut file, &hash, None).await?;
} else {
return Err((crate::Error::Quota { quota: 0 }).into());
}
}
match channel {
ChannelSpec::Channel(c) => self.apply_to_channel(repo, c, hash).await?,
ChannelSpec::CI(c) => self.channel = Some(ChannelSpec::CI(c)),
ChannelSpec::Discussion(d) => {
self.apply_to_discussion(repo, d, hash, channel_id, session)
.await?
}
}
Ok(())
}
async fn apply_to_channel(
&mut self,
repo: Arc<crate::repository::Repo>,
c: String,
hash: libpijul::Hash,
) -> Result<(), crate::Error> {
use libpijul::changestore::ChangeStore;
let header = repo.changes.get_header(&hash)?;
let id = self.repo_id.as_ref().unwrap();
let repo_id = self.repo_id.as_ref().unwrap().repo_id;
self.config
.replicator
.send_change(id.origin_id(), hash, false)
.await?;
self.config
.replicator
.handle_update(
None,
None,
None,
::replication::Update::Apply {
repo: id.origin_id(),
channel: c.clone(),
hash,
},
)
.await?;
let hook = pijul_hooks::HookContent::ChangesApplied {
repository_owner: self.owner.to_string(),
repository_name: self.repo.to_string(),
applied_by: self.user.to_string(),
message: header.message.clone(),
author: crate::change::authors_string(&self.db, &header.authors).await?,
hash: hash.to_base32(),
};
let mut db_ = self.db.get().await?;
use crate::db::repositories::dsl as repos;
diesel::update(repos::repositories.find(repo_id))
.set(repos::last_pushed.eq(diesel::dsl::now))
.execute(&mut db_)
.await?;
for h in header.authors {
debug!("author {:?}", h);
if let Some(k) =
h.0.get("key")
.and_then(|k| bs58::decode(k.as_bytes()).into_vec().ok())
{
use crate::db::contributors::dsl as c;
diesel::insert_into(c::contributors)
.values((
c::repo.eq(repo_id),
c::key.eq(k),
c::revision.eq(diesel::dsl::now),
))
.on_conflict_do_nothing()
.execute(&mut db_)
.await?;
} else {
error!("Decoding error");
}
}
let db = self.db.clone();
tokio::spawn(async move {
crate::hooks::run_hooks_by_repo_id(
&mut db.get().await.unwrap(),
repo_id,
crate::hooks::Action::APPLY_PATCH,
&hook,
)
.await
});
self.channel = Some(ChannelSpec::Channel(c));
Ok(())
}
async fn apply_to_discussion(
&mut self,
repo: Arc<crate::repository::Repo>,
d: i32,
hash: libpijul::Hash,
channel: thrussh::ChannelId,
session: &mut thrussh::server::Session,
) -> Result<(), crate::Error> {
let id = self.repo_id.clone().unwrap();
let user_id = self.user_id.unwrap();
let user = self.user.clone();
let repo_name = self.repo.clone();
let ip_sql = self.ip_sql.clone();
let config = self.config.clone();
let repo_ = repo.clone();
let (x, err) = self
.db
.get()
.await?
.transaction(|mut txn| {
(async move {
debug!("got transaction");
let mut err = None;
let d = if d == 0 {
use libpijul::changestore::ChangeStore;
let header = if let Ok(h) = repo_.changes.get_header(&hash) {
h
} else {
return Ok((None, None));
};
let (n, _) = if let Some(x) = create_discussion_repo(
&mut txn,
id.repo_id,
user_id,
&header.message,
ip_sql,
)
.await?
{
x
} else {
err = Some(thrussh::CryptoVec::from_slice(
b"A discussion with the same title already exists\n",
));
return Ok((None, err));
};
err = Some(thrussh::CryptoVec::from_slice(
format!(
"Created discussion https://{}/{}/{}/discussions/{}\n",
config.host, user, repo_name, n
)
.as_bytes(),
));
n
} else {
d
};
use crate::db::discussion_changes::dsl as dc;
use crate::db::discussions::dsl as d;
use diesel::sql_types::Bool;
let hashb32 = hash.to_base32();
let n = d::discussions
.filter(d::repository_id.eq(id.repo_id))
.filter(d::number.eq(d))
.filter(
diesel::dsl::sql::<Bool>("not exists(select 1 from discussion_changes join discussions on discussions.id = discussion_changes.discussion where repository_id = ")
.bind::<diesel::sql_types::Uuid, _>(id.repo_id)
.sql(" and number = ")
.bind::<Integer, _>(d)
.sql(" and discussion_changes.change = ")
.bind::<Text, _>(&hashb32)
.sql(" and discussion_changes.removed is null)"),
)
.select((
hashb32.clone().into_sql::<Text>(),
d::id,
user_id.into_sql::<diesel::sql_types::Uuid>().nullable(),
))
.insert_into(dc::discussion_changes)
.into_columns((dc::change, dc::discussion, dc::pushed_by))
.returning(dc::discussion)
.get_result(txn)
.await
.optional()?;
debug!("n = {:?}", n);
if let Some(disc_id) = n {
debug!("{:?} {:?}", id.repo_id, d);
use crate::db::discussion_subscriptions::dsl as ds;
let title = diesel::update(d::discussions.find(disc_id))
.set(d::changes.eq(d::changes + 1))
.returning(d::title)
.get_result(txn)
.await?;
diesel::insert_into(ds::discussion_subscriptions)
.values((ds::user_id.eq(user_id), ds::discussion_id.eq(disc_id)))
.on_conflict_do_nothing()
.execute(txn)
.await?;
Ok((Some((disc_id, title)), err))
} else {
Err(diesel::result::Error::RollbackTransaction)
}
})
.scope_boxed()
})
.await?;
if let Some((disc_id, title)) = x {
use libpijul::changestore::ChangeStore;
let header = repo.changes.get_header(&hash)?;
let db = self.db.clone();
let repo_id = self.repo_id.as_ref().unwrap();
self.config
.replicator
.send_change(repo_id.origin_id(), hash, false)
.await?;
let hook = pijul_hooks::HookContent::NewChanges {
repository_owner: self.owner.to_string(),
repository_name: self.repo.to_string(),
pusher: self.user.to_string(),
message: header.message.clone(),
author: crate::change::authors_string(&self.db, &header.authors).await?,
hash: hash.to_base32(),
};
let repo_id = repo_id.repo_id;
tokio::spawn(async move {
crate::hooks::run_hooks_by_repo_id(
&mut db.get().await.unwrap(),
repo_id,
crate::hooks::Action::ADD_PATCH,
&hook,
)
.await
});
self.send_change_email(d, disc_id, title, hash, header)
.await?
}
if let Some(err) = err {
session.extended_data(channel, 1, err);
}
self.channel = Some(ChannelSpec::Discussion(d));
Ok(())
}
async fn send_change_email(
&self,
n: i32,
discussion_id: uuid::Uuid,
name: String,
hash: libpijul::pristine::Hash,
header: libpijul::change::ChangeHeader,
) -> Result<(), crate::Error> {
let user_id = if let Some(user) = self.user_id {
user
} else {
return Ok(());
};
use crate::db::discussion_subscriptions::dsl as ds;
use crate::db::users::dsl as u;
let mut db = self.db.get().await?;
let it = ds::discussion_subscriptions
.inner_join(u::users)
.filter(ds::discussion_id.eq(discussion_id))
.filter(u::id.eq(user_id))
.select((u::email, u::login))
.get_results::<(String, String)>(&mut db)
.await?;
let hash = hash.to_base32();
use cuach::Render;
use diesel::sql_types::{Text, Uuid};
let obj = format!("[{}/{}] Changed: {} (#{})", self.owner, self.repo, name, n);
for (address, login) in it {
let n_insert = diesel::sql_query(
"INSERT INTO email_log(discussion, email)
SELECT $1, $2 WHERE (
SELECT COUNT(1) FROM email_log WHERE
discussion = $1
AND email = $2
AND time > NOW() - INTERVAL '1h'
) < 10",
)
.bind::<Uuid, _>(discussion_id)
.bind::<Text, _>(&address)
.execute(&mut db)
.await?;
if n_insert == 0 {
continue;
}
let mut body_html = String::new();
debug!("sending email to {:?}", login);
{
(EmailDiscussionChangeTemplate {
host: &self.config.host,
repo: &self.repo,
owner: &self.owner,
author: &self.user,
login: &login,
disc: n,
name: &name,
hash: &hash,
message: &header.message,
})
.render_into(&mut body_html)
.unwrap();
}
let email_body = rusoto_ses::Body {
text: Some(rusoto_ses::Content {
data: format!(
include_str!("discussions/new_change_email"),
author = self.user,
disc = n,
name = name,
message = header.message,
owner = self.owner,
repo = self.repo,
host = self.config.host,
hash = hash,
login = &login,
),
charset: Some(EMAIL_CHARSET.to_string()),
}),
html: Some(rusoto_ses::Content {
data: body_html,
charset: Some(EMAIL_CHARSET.to_string()),
}),
};
debug!("sending an email to {:?}", address);
let config = self.config.clone();
let obj = obj.clone();
tokio::spawn(async move {
crate::email::send_email(&config, obj, email_body, address).await
});
}
Ok(())
}
}
use crate::email::*;
#[template(path = "discussions/new_change_email.html")]
struct EmailDiscussionChangeTemplate<'a> {
host: &'a str,
owner: &'a str,
repo: &'a str,
author: &'a str,
login: &'a str,
disc: i32,
name: &'a str,
hash: &'a str,
message: &'a str,
}
async fn create_discussion_repo(
client: &mut diesel_async::AsyncPgConnection,
repo: Uuid,
author_id: Uuid,
name: &str,
ip_sql: ipnetwork::IpNetwork,
) -> Result<Option<(i32, Uuid)>, diesel::result::Error> {
use crate::db::discussions::dsl as d;
if d::discussions
.filter(d::repository_id.eq(repo))
.filter(d::closed.is_not_null())
.filter(d::title.eq(&name))
.select(d::id)
.get_result::<uuid::Uuid>(client)
.await
.optional()?
.is_some()
{
return Ok(None);
}
client
.transaction(|mut txn| {
(async move {
use crate::db::discussion_subscriptions::dsl as ds;
use crate::db::repositories::dsl as r;
let number = diesel::update(r::repositories.find(repo))
.filter(crate::has_permissions!(
author_id,
repo,
Perm::CREATE_DISCUSSION.bits()
))
.set(r::next_discussion_number.eq(r::next_discussion_number + 1))
.returning(r::next_discussion_number)
.get_result(&mut txn)
.await
.optional()?;
let number = if let Some(number) = number {
number
} else {
return Err(diesel::result::Error::NotFound);
};
let id = diesel::insert_into(d::discussions)
.values((
d::title.eq(&name),
d::author.eq(&author_id),
d::creation_date.eq(diesel::dsl::now),
d::creation_ip.eq(ip_sql),
d::number.eq(number),
))
.on_conflict_do_nothing()
.returning(d::id)
.get_result::<uuid::Uuid>(&mut txn)
.await?;
diesel::insert_into(ds::discussion_subscriptions)
.values((ds::user_id.eq(author_id), ds::discussion_id.eq(id)))
.execute(&mut txn)
.await?;
Ok((number, id))
})
.scope_boxed()
})
.await
.optional()
}