use crate::permissions::Perm;
use crate::repository::{channel_spec, ChannelSpec, RepositoryId};
use crate::Keyalgorithm_;
use byteorder::{BigEndian, ByteOrder};
use cuach::*;
use diesel::sql_types::{BigInt, Bool, Integer, Text};
use diesel::{
    BoolExpressionMethods, ExpressionMethods, Insertable, IntoSql, NullableExpressionMethods,
    OptionalExtension, QueryDsl,
};
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;
use futures;
use futures::Future;
use lazy_static::*;
use libpijul::pristine::{Base32, ChannelTxnT, DepsTxnT, GraphTxnT, TxnT};
use libpijul::TxnTExt;
use regex::Regex;
use serde_derive::*;
use std;
use std::collections::HashSet;
use std::io::Read;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use thrussh;
use thrussh::server::*;
use thrussh::ChannelId;
use thrussh_keys;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tracing::*;
use uuid::Uuid;

pub async fn socket(ssh_addr: &SocketAddr) -> TcpListener {
    tokio::net::TcpListener::bind(ssh_addr).await.unwrap()
}
const CHAL_LEN: usize = 30;

lazy_static! {
    pub static ref DISC: Regex = Regex::new(r#":([0-9]+)"#).unwrap();
}
pub const EMAIL_CHARSET: &'static str = "UTF-8";

pub async fn worker(config: super::Config, socket: TcpListener) {
    loop {
        let (socket, addr) = if let Ok(x) = socket.accept().await {
            x
        } else {
            error!("SSH accept");
            continue;
        };
        let ip: std::net::IpAddr = addr.ip();
        info!("ssh connection from {:?}", ip);

        let config = config.clone();

        use crate::db::password_failures::dsl as pf;
        let ip_sql = ipnetwork::IpNetwork::new(ip, if ip.is_ipv4() { 32 } else { 128 }).unwrap();
        if let (Some(date), count) = pf::password_failures
            .filter(pf::ip.eq(ip_sql))
            .filter(pf::date.gt(chrono::Utc::now() - chrono::Duration::hours(1)))
            .select((diesel::dsl::max(pf::date), diesel::dsl::count(pf::date)))
            .get_result::<(Option<chrono::DateTime<chrono::Utc>>, i64)>(
                &mut config.db.get().await.unwrap(),
            )
            .await
            .unwrap()
        {
            let now = chrono::Utc::now();
            if now.signed_duration_since(date).num_seconds() < count {
                continue;
            }
        }
        tokio::spawn(run_stream(
            config.ssh.clone(),
            socket,
            H {
                config: config.clone(),
                db: config.db.clone(),
                user: String::new(),
                user_id: None,
                repo: String::new(),
                default_channel: String::new(),
                channel: None,
                repo_id: None,
                permissions: crate::permissions::Perm::empty(),
                owner: String::new(),
                protocol_version: 0,
                state: None,
                applied: Vec::new(),
                ip_sql,
                last_window_adjustment: SystemTime::now(),
                random_challenge: RandomChallenge {
                    ch: String::new(),
                    gen: std::time::SystemTime::UNIX_EPOCH,
                    key: None,
                },
            },
        ));
    }
}

struct H {
    pub config: super::Config,
    pub db: crate::config::Db,
    pub protocol_version: usize,
    pub user: String,
    pub user_id: Option<Uuid>,
    pub owner: String,
    pub repo: String,
    pub permissions: crate::permissions::Perm,
    pub channel: Option<ChannelSpec>,
    pub default_channel: String,
    pub repo_id: Option<RepositoryId>,
    pub state: Option<State>,
    pub ip_sql: ipnetwork::IpNetwork,
    pub applied: Vec<libpijul::pristine::Hash>,
    pub last_window_adjustment: SystemTime,
    pub random_challenge: RandomChallenge,
}

struct RandomChallenge {
    ch: String,
    gen: std::time::SystemTime,
    key: Option<libpijul::key::PublicKey>,
}

struct TmpFile {
    f: Option<tokio::fs::File>,
    path: std::path::PathBuf,
}

impl TmpFile {
    async fn persist(
        mut self,
        path: &std::path::Path,
    ) -> Result<tokio::fs::File, tokio::io::Error> {
        tokio::fs::rename(&self.path, path).await?;
        Ok(self.f.take().unwrap())
    }
}

impl Drop for TmpFile {
    fn drop(&mut self) {
        if self.f.is_some() {
            debug!("dropping file {:?}", self.path);
            std::fs::remove_file(&self.path).unwrap_or(());
        }
    }
}

enum State {
    Protocol,
    Change {
        hash: libpijul::pristine::Hash,
        size: usize,
        file: TmpFile,
        file_size: usize,
    },
}

impl From<libpijul::key::Algorithm> for Keyalgorithm_ {
    fn from(a: libpijul::key::Algorithm) -> Keyalgorithm_ {
        match a {
            libpijul::key::Algorithm::Ed25519 => Keyalgorithm_::Ed25519,
        }
    }
}

impl From<Keyalgorithm_> for libpijul::key::Algorithm {
    fn from(a: Keyalgorithm_) -> libpijul::key::Algorithm {
        match a {
            Keyalgorithm_::Ed25519 => libpijul::key::Algorithm::Ed25519,
        }
    }
}

impl H {
    async fn reject(self) -> Result<(Self, thrussh::server::Auth), crate::Error> {
        use crate::db::password_failures::dsl as pf;
        diesel::insert_into(pf::password_failures)
            .values((pf::ip.eq(&self.ip_sql), pf::login.eq(&self.user)))
            .execute(&mut self.db.get().await.unwrap())
            .await?;
        Ok((self, thrussh::server::Auth::Reject))
    }
}

#[derive(Debug, Serialize, Deserialize)]
struct Signature {
    public_key: String,
    timestamp: chrono::DateTime<chrono::Utc>,
    signature: String,
}

impl Handler for H {
    type Error = crate::Error;
    type FutureAuth = Pin<Box<dyn Future<Output = Result<(Self, Auth), crate::Error>> + Send>>;
    type FutureUnit = Pin<Box<dyn Future<Output = Result<(Self, Session), crate::Error>> + Send>>;
    type FutureBool = futures::future::Ready<Result<(Self, Session, bool), crate::Error>>;

    fn finished(self, session: thrussh::server::Session) -> Self::FutureUnit {
        Box::pin(futures::future::ready(Ok((self, session))))
    }

    fn finished_auth(self, auth: thrussh::server::Auth) -> Self::FutureAuth {
        Box::pin(futures::future::ready(Ok((self, auth))))
    }

    fn finished_bool(self, b: bool, s: thrussh::server::Session) -> Self::FutureBool {
        futures::future::ready(Ok((self, s, b)))
    }

    fn auth_publickey(
        mut self,
        user: &str,
        public_key: &thrussh_keys::key::PublicKey,
    ) -> Self::FutureAuth {
        self.user.clear();
        self.user.push_str(user);
        use thrussh_keys::PublicKeyBase64;
        debug!("auth_publickey {:?}", public_key);
        debug!("user {:?}", user);
        let key_bytes = match public_key {
            thrussh_keys::key::PublicKey::RSA { ref key, .. } => {
                (thrussh_keys::key::PublicKey::RSA {
                    key: thrussh_keys::key::OpenSSLPKey(key.0.clone()),
                    hash: thrussh_keys::key::SignatureHash::SHA2_512,
                })
                .public_key_bytes()
            }
            k => k.public_key_bytes(),
        };

        debug!(
            "key_bytes = {:?}",
            data_encoding::HEXLOWER.encode(&key_bytes)
        );
        Box::pin(async move {
            use crate::db::publickeys::dsl as publickeys;
            use crate::db::users::dsl as users;
            let mut db_ = self.db.get().await?;

            if let Some(id) = users::users
                .inner_join(publickeys::publickeys)
                .filter(users::login.eq(&self.user))
                .filter(publickeys::bin.eq(&key_bytes))
                .filter(users::is_active)
                .select(users::id)
                .get_result(&mut db_)
                .await
                .optional()?
            {
                debug!("accept");
                use crate::db::password_failures::dsl as pf;
                diesel::delete(pf::password_failures)
                    .filter(pf::ip.eq(self.ip_sql))
                    .filter(pf::login.eq(&self.user))
                    .execute(&mut db_)
                    .await?;
                self.user_id = Some(id);
                Ok((self, Auth::Accept))
            } else {
                std::mem::drop(db_);
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                Ok((self, Auth::Reject))
            }
        })
    }

    fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
        self.user.clear();
        self.user.push_str(user);
        let password = password.to_string();
        Box::pin(async move {
            use crate::db::password_failures::dsl as pf;
            let mut db_ = self.db.get().await?;
            let count = pf::password_failures
                .select(diesel::dsl::count::<BigInt, i64>(1))
                .filter(pf::ip.eq(self.ip_sql).or(pf::login.eq(&self.user)))
                .get_result::<i64>(&mut db_)
                .await?;
            if count > self.config.max_password_attempts {
                return Ok((self, Auth::Reject));
            }

            use crate::db::users::dsl as users;
            if let Some(user_id) = users::users
                .filter(users::login.eq(&self.user))
                .filter(crate::check_password!(&password))
                .filter(users::email_is_invalid.is_null())
                .filter(users::is_active)
                .select(users::id)
                .get_result(&mut db_)
                .await
                .optional()?
            {
                self.user_id = Some(user_id);
                return Ok((self, Auth::Accept));
            } else {
                self.reject().await
            }
        })
    }

    fn exec_request(
        mut self,
        ch: thrussh::ChannelId,
        exec: &[u8],
        mut session: Session,
    ) -> Self::FutureUnit {
        debug!("exec_request = {:?}", exec);
        let exec = if let Ok(exec) = std::str::from_utf8(exec) {
            exec.to_string()
        } else {
            return Box::pin(async { Ok((self, session)) });
        };
        debug!("exec: {:?}", exec);
        Box::pin(async move {
            let mut words = exec.split_whitespace();
            if !self.parse_request(&exec) {
                return Ok((self, session));
            }
            if let (Some("pijul"), Some(cmd)) = (words.next(), words.next()) {
                match cmd {
                    "protocol" => {
                        debug!(
                            "owner = {:?}, repo = {:?}, channel = {:?}",
                            self.owner, self.repo, self.channel
                        );
                        if self.repo.is_empty() {
                            self.state = Some(State::Protocol);
                            session.channel_success(ch);
                            return Ok((self, session));
                        }
                        use crate::db::repositories::dsl as r;
                        use crate::db::users::dsl as users;
                        if let Some((repo_id, owner_id, fork_origin, permissions, channel, suspended, public)) = r::repositories
                            .inner_join(users::users)
                            .filter(users::login.eq(&self.owner))
                            .filter(r::name.eq(&self.repo))
                            .filter(r::is_active)
                            .filter(users::is_active)
                            .select((
                                r::id,
                                users::id,
                                r::fork_origin,
                                crate::permissions!(self.user_id.unwrap(), r::id),
                                r::default_channel,
                                users::suspended,
                                crate::repository::is_public(),
                            ))
                            .get_result::<(uuid::Uuid, uuid::Uuid, Option<uuid::Uuid>, i64, String, bool, bool)>(&mut self.db.get().await?)
                            .await.optional()?
                        {
                            self.repo_id = Some(RepositoryId {
                                repo_id,
                                owner_id,
                                fork_origin,
                            });
                            self.permissions = Perm::from_bits(permissions).unwrap();
                            let channel: String = channel;

                            let suspended: bool = suspended;
                            let public: bool = public;
                            if !public && suspended {
                                self.permissions &= Perm::READ
                            }

                            self.default_channel = channel.clone();
                            self.channel = Some(ChannelSpec::channel(channel.into()));
                            self.state = Some(State::Protocol);
                            session.channel_success(ch);
                            Ok((self, session))
                        } else {
                            debug!("permission denied");
                            session.extended_data(
                                ch,
                                1,
                                thrussh::CryptoVec::from_slice(
                                    b"Repository not found, or insufficient permissions\n",
                                ),
                            );
                            session.exit_status_request(ch, 1);
                            Ok((self, session))
                        }
                    }
                    cmd => {
                        error!("cmd = {:?}", cmd);
                        session.extended_data(
                            ch,
                            1,
                            thrussh::CryptoVec::from_slice(b"Invalid Pijul subcommand\n"),
                        );
                        session.exit_status_request(ch, 1);
                        Ok((self, session))
                    }
                }
            } else {
                session.exit_status_request(ch, 127);
                Ok((self, session))
            }
        })
    }

    fn data(
        mut self,
        chan: thrussh::ChannelId,
        data: &[u8],
        mut session: Session,
    ) -> Self::FutureUnit {
        debug!("data len {:?}", data.len());
        let data = data.to_vec();
        Box::pin(async move {
            let result = self.protocol(chan, &data, &mut session).await;
            match result {
                Ok(ProtocolResult::Change { hash, file, size }) => {
                    let result = self.apply(hash, file, size, chan, &mut session).await;
                    debug!("result: {:?}", result);
                    if let Err(e) = result {
                        session.extended_data(
                            chan,
                            1,
                            thrussh::CryptoVec::from_slice(e.to_string().as_bytes()),
                        );
                        session.exit_status_request(chan, 1);
                        return Ok((self, session));
                    }
                    self.applied.push(hash);
                }
                Ok(ProtocolResult::ChannelList(disc)) => {
                    if self.repo_id.is_none() {
                        return Ok((self, session));
                    }
                    let repo = self
                        .config
                        .repo_locks
                        .get(&self.repo_id.as_ref().unwrap().origin_id())
                        .await?;
                    let data = crate::discussions::discussion_changelist(
                        &mut self.db.get().await.unwrap(),
                        &repo.changes,
                        self.repo_id.as_ref().unwrap().repo_id,
                        disc,
                    )
                    .await?;
                    session.data(chan, thrussh::CryptoVec::from_slice(&data));
                    self.state = Some(State::Protocol);
                }
                Ok(ProtocolResult::State { disc, pos }) => {
                    if self.repo_id.is_none() {
                        return Ok((self, session));
                    }
                    let (n, m) = crate::discussions::discussion_state(
                        &mut self.db.get().await.unwrap(),
                        self.repo_id.as_ref().unwrap().repo_id,
                        disc,
                        pos,
                    )
                    .await?;
                    let data = format!("{} {}\n", n, m.to_base32());
                    session.data(chan, thrussh::CryptoVec::from_slice(&data.as_bytes()));
                    self.state = Some(State::Protocol);
                }
                Ok(ProtocolResult::None) => {}
                Err(e) => {
                    match e {
                        crate::Error::AmbiguousInode => {
                            session.extended_data(
                                chan,
                                1,
                                thrussh::CryptoVec::from_slice(b"Ambiguous path\n"),
                            );
                        }
                        crate::Error::InodeNotFound => {
                            session.extended_data(
                                chan,
                                1,
                                thrussh::CryptoVec::from_slice(b"Path not found\n"),
                            );
                        }
                        _ => {
                            session.extended_data(
                                chan,
                                1,
                                thrussh::CryptoVec::from_slice(b"Channel not found or empty\n"),
                            );
                        }
                    }
                    session.eof(chan);
                    session.exit_status_request(chan, 1);
                    session.close(chan);
                    return Ok((self, session));
                }
            }
            Ok((self, session))
        })
    }

    fn adjust_window(&mut self, _channel: ChannelId, target: u32) -> u32 {
        let elapsed = self.last_window_adjustment.elapsed().unwrap();
        self.last_window_adjustment = SystemTime::now();
        if target >= 10_000_000 {
            return target;
        }
        if elapsed < Duration::from_secs(2) {
            target * 2
        } else if elapsed > Duration::from_secs(8) {
            target / 2
        } else {
            target
        }
    }

    fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        debug!("channel eof");
        debug!("{:#?}", self.applied);
        Box::pin(async move {
            if self.repo_id.is_none() {
                return Ok((self, session));
            }
            session.eof(channel);
            session.exit_status_request(channel, 0);
            session.close(channel);
            Ok((self, session))
        })
    }
}

enum ProtocolResult {
    None,
    Change {
        hash: libpijul::pristine::Hash,
        file: TmpFile,
        size: usize,
    },
    ChannelList(i32),
    State {
        disc: i32,
        pos: Option<u64>,
    },
}

enum ChannelList {
    Discussion(i32),
    Data(Vec<u8>),
}

impl H {
    fn parse_request<'a>(&mut self, exec: &'a str) -> bool {
        lazy_static! {
            static ref ARGS: Regex =
                Regex::new(r#"--(\S*) +(("((\\\\|\\"|[^"])*)")|((\\\\|\\ |[^ ])*))"#).unwrap();
            static ref REPO: Regex = Regex::new(r#"'?/?(([^/']*)/)?([^']+)'?"#).unwrap();
        };
        for cap in ARGS.captures_iter(exec) {
            debug!("cap:{:?}", cap);
            match cap.get(1).as_ref().map(|x| x.as_str()) {
                Some("repository") => {
                    if let Some(repo_) = cap.get(4).or(cap.get(6)) {
                        for cap in REPO.captures_iter(repo_.as_str()) {
                            debug!("cap {:?}", cap);
                            if let Some(owner) = cap.get(2) {
                                self.owner.clear();
                                self.owner.push_str(owner.as_str())
                            } else {
                                self.owner.clear();
                                self.owner.push_str(&self.user)
                            }
                            self.repo.clear();
                            self.repo.push_str(cap.get(3).unwrap().as_str());
                        }
                    } else {
                        error!("unparsed command: {:?}", exec);
                        return false;
                    }
                }
                Some("version") => {
                    if let Some(ver) = cap.get(4).or(cap.get(6)) {
                        if ver.as_str() == "3" {
                            self.protocol_version = 3
                        } else {
                            return false;
                        }
                    } else {
                        return false;
                    }
                }
                _ => {
                    debug!("extra: {:?}", cap);
                }
            }
        }
        self.protocol_version == 3
    }

    async fn protocol(
        &mut self,
        chan: thrussh::ChannelId,
        data: &[u8],
        session: &mut Session,
    ) -> Result<ProtocolResult, crate::Error> {
        lazy_static! {
            static ref STATE: Regex = Regex::new(r#"^state\s+(\S+)(\s+([0-9]+)?)\s+"#).unwrap();
            static ref ID: Regex = Regex::new(r#"^id\s+(\S+)\s+"#).unwrap();
            static ref CHANGELIST: Regex =
                Regex::new(r#"^changelist\s+(\S+)\s+([0-9]+)(.*)\s+"#).unwrap();
            static ref CHANGELIST_PATHS: Regex = Regex::new(r#""((\\")|[^"])+""#).unwrap();
            static ref CHANGE: Regex = Regex::new(r#"^((change)|(partial))\s+([^ ]*)\s+"#).unwrap();
            static ref APPLY: Regex =
                Regex::new(r#"^apply\s+(\S+)\s+([^ ]*) ([0-9]+)\s+"#).unwrap();
            static ref TAG: Regex = Regex::new(r#"^tag\s+(\S+)\s+"#).unwrap();
            static ref TAGUP: Regex =
                Regex::new(r#"^tagup\s+(\S+)\s+(\S+)\s+([0-9]+)\s+"#).unwrap();
            static ref CHANNEL: Regex = Regex::new(r#"^channel\s+(\S+)\s+"#).unwrap();
            static ref ARCHIVE: Regex =
                Regex::new(r#"^archive\s+(\S+)\s*(( ([^:]+))*)( :(.*))?\n"#).unwrap();
            static ref CHALLENGE: Regex = Regex::new(r#"^challenge (\S+)\n"#).unwrap();
            static ref IDENTITIES: Regex = Regex::new(r#"^identities( (\d+))?\n"#).unwrap();
            static ref PROVE: Regex = Regex::new(r#"^prove (.+)\n"#).unwrap();
        }
        debug!("protocol, channel {:?}", self.channel);
        match self.state.take() {
            Some(State::Protocol) => {
                let data = {
                    if let Some(n) = data.iter().position(|&x| x == b'\n') {
                        &data[..n + 1]
                    } else {
                        return Err(crate::Error::ProtocolError.into());
                    }
                };
                let data = std::str::from_utf8(data)?;
                debug!("data = {:?}", data);
                if let Some(cap) = CHANGELIST.captures(data) {
                    match self.changelist(cap).await? {
                        ChannelList::Discussion(d) => return Ok(ProtocolResult::ChannelList(d)),
                        ChannelList::Data(data) => {
                            trace!("sending data {:?}", data);
                            session.data(chan, thrussh::CryptoVec::from_slice(&data));
                            debug!("sent");
                            self.state = Some(State::Protocol);
                        }
                    }
                } else if let Some(cap) = STATE.captures(data) {
                    match self.state(cap).await? {
                        (ChannelList::Discussion(disc), pos) => {
                            return Ok(ProtocolResult::State { disc, pos })
                        }
                        (ChannelList::Data(data), _) => {
                            trace!("sending data {:?}", data);
                            session.data(chan, thrussh::CryptoVec::from_slice(&data));
                            debug!("sent");
                            self.state = Some(State::Protocol);
                        }
                    }
                } else if let Some(cap) = ID.captures(data) {
                    match self.id(cap, false).await? {
                        ChannelList::Discussion(_) => {
                            session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
                        }
                        ChannelList::Data(data) => {
                            session.data(chan, thrussh::CryptoVec::from_slice(&data));
                        }
                    }
                    self.state = Some(State::Protocol);
                } else if let Some(cap) = APPLY.captures(data) {
                    self.parse_apply(cap).await?;
                } else if let Some(cap) = CHANGE.captures(data) {
                    let is_change = &cap[1] == "change";
                    self.parse_change(cap, chan, session, is_change)?;
                    self.state = Some(State::Protocol);
                } else if let Some(cap) = CHALLENGE.captures(data) {
                    self.challenge(cap, chan, session)?;
                    self.state = Some(State::Protocol);
                } else if let Some(cap) = PROVE.captures(data) {
                    self.prove(cap, chan, session).await?;
                    self.state = Some(State::Protocol);
                } else if let Some(cap) = IDENTITIES.captures(data) {
                    let id: Option<i64> = if let Some(id) = cap.get(2) {
                        Some(id.as_str().parse()?)
                    } else {
                        None
                    };
                    self.identities(id, chan, session).await?;
                    self.state = Some(State::Protocol);
                } else {
                    debug!("not recognised: {:?}", data);
                    self.state = Some(State::Protocol);
                    return Ok(ProtocolResult::None);
                }
            }
            Some(State::Change {
                hash,
                size,
                mut file,
                mut file_size,
            }) => {
                debug!("data.len() = {:?}", data.len());
                file.f.as_mut().unwrap().write_all(data).await?;
                file_size += data.len();
                debug!("{:?} < {:?}", file_size, size);
                if file_size < size {
                    self.state = Some(State::Change {
                        hash,
                        size,
                        file,
                        file_size,
                    })
                } else {
                    self.state = Some(State::Protocol);
                    return Ok(ProtocolResult::Change { hash, file, size });
                }
            }
            None => {}
        }
        Ok(ProtocolResult::None)
    }

    fn challenge<'a>(
        &mut self,
        cap: regex::Captures<'a>,
        chan: ChannelId,
        session: &mut Session,
    ) -> Result<(), crate::Error> {
        use rand::Rng;
        if let Ok(json) = serde_json::from_slice::<libpijul::key::PublicKey>(cap[1].as_bytes()) {
            self.random_challenge.key = Some(json)
        } else {
            return Ok(());
        };
        self.random_challenge.ch.clear();
        self.random_challenge.ch.reserve(CHAL_LEN);
        self.random_challenge.ch.extend(
            rand::rng()
                .sample_iter(&rand::distr::Alphanumeric)
                .take(30)
                .map(char::from),
        );
        self.random_challenge.ch.push('\n');
        session.data(
            chan,
            thrussh::CryptoVec::from_slice(self.random_challenge.ch.as_bytes()),
        );
        self.random_challenge.ch.pop();
        self.random_challenge.gen = std::time::SystemTime::now();
        Ok(())
    }

    async fn identities(
        &mut self,
        _rev: Option<i64>,
        chan: ChannelId,
        session: &mut Session,
    ) -> Result<(), crate::Error> {
        // Not well-supported yet.
        if let Some(ref _id) = self.repo_id {
            session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
        }
        Ok(())
    }

    async fn prove<'a>(
        &mut self,
        cap: regex::Captures<'a>,
        chan: ChannelId,
        session: &mut Session,
    ) -> Result<(), crate::Error> {
        if self.random_challenge.gen.elapsed()? > std::time::Duration::from_secs(10) {
            session.data(
                chan,
                thrussh::CryptoVec::from_slice(b"Challenge expired.\n"),
            );
            return Ok(());
        }
        if let Some(ref key) = self.random_challenge.key {
            let pk = key.load()?;
            debug!("challenge {:?}", self.random_challenge.ch);
            pk.verify(
                self.random_challenge.ch.as_bytes(),
                &cap[1],
                &chrono::Utc::now(),
            )?;

            let key_key = bs58::decode(&key.key).into_vec()?;
            let key_sig = bs58::decode(&key.signature).into_vec()?;
            use crate::db::signingkeys::dsl as sk;
            diesel::insert_into(sk::signingkeys)
                .values((
                    sk::user_id.eq(self.user_id.as_ref().unwrap()),
                    sk::algorithm.eq(Keyalgorithm_::from(key.algorithm)),
                    sk::public_key.eq(key_key),
                    sk::signature.eq(key_sig),
                    sk::expires.eq(key.expires),
                ))
                .execute(&mut self.db.get().await?)
                .await?;
            session.data(chan, thrussh::CryptoVec::from_slice(b"\n"));
        }
        Ok(())
    }

    async fn changelist<'a>(
        &mut self,
        cap: regex::Captures<'a>,
    ) -> Result<ChannelList, crate::Error> {
        let from: u64 = (&cap[2]).parse()?;
        lazy_static! {
            static ref CHANGELIST_PATHS: Regex = Regex::new(r#""(((\\")|[^"])+)""#).unwrap();
        }
        if let Some(disc) = DISC.captures(&cap[1]) {
            let disc: i32 = disc[1].parse().unwrap();
            Ok(ChannelList::Discussion(disc))
        } else if let Some(ref id) = self.repo_id {
            debug!("taking lock");
            let repo = self.config.repo_locks.get(&id.origin_id()).await?;
            let pristine = repo.pristine.read().await;
            let txn = pristine.txn_begin()?;
            let c = channel_spec(id, &cap[1]);
            let channel_spec = ChannelSpec::channel((&cap[1]).into());
            let channel = if let Some(channel) = txn.load_channel(&c)? {
                channel
            } else if &cap[1] == self.default_channel {
                debug!("channel not found {:?}", c);
                match channel_spec {
                    ChannelSpec::Channel(ref c_) => {
                        debug!("c_ = {:?} {:?}", c_, &cap[1]);
                        if c_ == &cap[1] {
                            return Ok(ChannelList::Data(vec![b'\n']));
                        }
                    }
                    ChannelSpec::CI(ref c_) => {
                        if c_.len() + 3 == cap[1].len() {
                            let (a, b) = cap[1].split_at(cap[1].len() - 3);
                            if c_ == a && b == "/ci" {
                                return Ok(ChannelList::Data(vec![b'\n']));
                            }
                        }
                    }
                    _ => {}
                }
                return Err((crate::Error::ChannelNotFound {
                    channel: cap[1].to_string(),
                })
                .into());
            } else {
                return Ok(ChannelList::Data(
                    format!("error:Remote channel {:?} not found\n", &cap[1]).into(),
                ));
            };
            let mut paths = HashSet::new();
            let mut data = Vec::new();
            for r in CHANGELIST_PATHS.captures_iter(&cap[3]) {
                if let Ok((p, ambiguous)) = txn.follow_oldest_path(&repo.changes, &channel, &r[1]) {
                    let h = txn.get_external(&p.change)?.unwrap();
                    let h: libpijul::Hash = h.into();
                    writeln!(data, "{}.{}", h.to_base32(), p.pos.0)?;
                    if ambiguous {
                        return Err(crate::Error::AmbiguousInode.into());
                    }
                    paths.insert(p);
                    paths.extend(
                        libpijul::fs::iter_graph_descendants(&txn, txn.graph(&*channel.read()), p)?
                            .filter_map(|x| x.ok()),
                    );
                } else {
                    return Err(crate::Error::InodeNotFound.into());
                }
            }
            use std::io::Write;
            let tags: Vec<u64> = txn
                .iter_tags(txn.tags(&*channel.read()), from)?
                .map(|k| (*k.unwrap().0).into())
                .collect();
            let mut tagsi = 0;
            for x in txn.log(&*channel.read(), from)? {
                let (n, (h, m)) = x?;
                let h_int = txn.get_internal(h)?.unwrap();
                let mut on_channel = paths.is_empty();
                if !on_channel {
                    let mut it = paths.iter();
                    loop {
                        if let Some(x) = it.next() {
                            if x.change == *h_int
                                || txn.get_touched_files(x, Some(h_int))?.is_some()
                            {
                                on_channel = true;
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
                if on_channel {
                    let h: libpijul::Hash = h.into();
                    let m: libpijul::Merkle = m.into();
                    if tags.get(tagsi) == Some(&n) {
                        writeln!(data, "{}.{}.{}.", n, h.to_base32(), m.to_base32())?;
                        tagsi += 1
                    } else {
                        writeln!(data, "{}.{}.{}", n, h.to_base32(), m.to_base32())?;
                    }
                }
            }
            data.push(b'\n');
            Ok(ChannelList::Data(data))
        } else {
            debug!("no repo id / no channel");
            Ok(ChannelList::Data(Vec::new()))
        }
    }

    async fn state<'a>(
        &mut self,
        cap: regex::Captures<'a>,
    ) -> Result<(ChannelList, Option<u64>), crate::Error> {
        let pos: Option<u64> = cap.get(3).map(|c| c.as_str().parse().unwrap());
        if let Some(disc) = DISC.captures(&cap[1]) {
            let disc: i32 = disc[1].parse().unwrap();
            Ok((ChannelList::Discussion(disc), pos))
        } else if let Some(ref id) = self.repo_id {
            debug!("taking lock");
            let repo = self
                .config
                .repo_locks
                .get(&self.repo_id.as_ref().unwrap().origin_id())
                .await?;
            let pristine = repo.pristine.read().await;
            let txn = pristine.txn_begin()?;
            let c = channel_spec(id, &cap[1]);
            let channel = if let Some(channel) = txn.load_channel(&c)? {
                channel
            } else {
                debug!("channel not found {:?}", c);
                return Ok((ChannelList::Data(vec![b'\n']), pos));
            };
            let mut data = Vec::new();
            use std::io::Write;
            if let Some(pos) = pos {
                for n in txn.log(&*channel.read(), pos)? {
                    let (n, (_, m)) = n?;
                    if n < pos {
                        continue;
                    } else if n > pos {
                        data.push(b'\n');
                        break;
                    } else {
                        let m: libpijul::Merkle = m.into();
                        let m2 = if let Some(x) = txn
                            .rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
                            .next()
                        {
                            x?.1.b.into()
                        } else {
                            libpijul::Merkle::zero()
                        };
                        writeln!(&mut data, "{} {} {}", n, m.to_base32(), m2.to_base32()).unwrap();
                        break;
                    }
                }
            } else {
                if let Some(n) = txn.reverse_log(&*channel.read(), None)?.next() {
                    let (n, (_, m)) = n?;
                    let m: libpijul::Merkle = m.into();
                    let m2 = if let Some(x) = txn
                        .rev_iter_tags(txn.tags(&*channel.read()), Some(n))?
                        .next()
                    {
                        x?.1.b.into()
                    } else {
                        libpijul::Merkle::zero()
                    };
                    writeln!(data, "{} {} {}", n, m.to_base32(), m2.to_base32())?
                } else {
                    writeln!(data, "-")?;
                }
            }
            if data.is_empty() {
                data.push(b'\n')
            }
            Ok((ChannelList::Data(data), pos))
        } else {
            debug!("no repo id / no channel");
            Ok((ChannelList::Data(Vec::new()), pos))
        }
    }

    async fn id<'a>(
        &mut self,
        cap: regex::Captures<'a>,
        is_tag: bool,
    ) -> Result<ChannelList, crate::Error> {
        debug!("id is_tag {:?}", is_tag);
        if let Some(disc) = DISC.captures(&cap[1]) {
            let disc: i32 = disc[1].parse().unwrap();
            return Ok(ChannelList::Discussion(disc));
        } else if let Some(ref id) = self.repo_id {
            debug!("taking lock");
            let repo = self
                .config
                .repo_locks
                .get(&self.repo_id.as_ref().unwrap().origin_id())
                .await?;
            debug!("ok, reading");
            let pristine = repo.pristine.read().await;
            debug!("ok, reading");
            let txn = pristine.txn_begin()?;
            debug!("ok, reading");
            let c = channel_spec(id, &cap[1]);
            debug!("channel c = {:?}", c);
            if let (Some(ChannelSpec::Channel(ref c)), Some(ref id)) =
                (&self.channel, &self.repo_id)
            {
                if c == &self.default_channel {
                    let id = if is_tag {
                        let mut id_ = [0; 16];
                        for (a, b) in id_.iter_mut().zip(id.repo_id.as_bytes().iter()) {
                            *a = 255 - *b
                        }
                        data_encoding::BASE32_NOPAD.encode(&id_)
                    } else {
                        data_encoding::BASE32_NOPAD.encode(id.repo_id.as_bytes())
                    };
                    debug!("id {:?} = {:?}", is_tag, id);
                    return Ok(ChannelList::Data(format!("{}\n", id).into()));
                }
            }
            if let Some(channel) = txn.load_channel(&c)? {
                let resp = format!(
                    "{}\n",
                    data_encoding::BASE32_NOPAD.encode(channel.read().id.as_bytes())
                )
                .into();
                debug!("resp = {:?}", resp);
                return Ok(ChannelList::Data(resp));
            }
        }
        debug!("no repo id / no channel");
        Ok(ChannelList::Data(b"\n".to_vec()))
    }

    async fn parse_apply<'a>(&mut self, cap: regex::Captures<'a>) -> Result<(), crate::Error> {
        let hash = if let Some(h) = libpijul::pristine::Hash::from_base32((&cap[2]).as_bytes()) {
            h
        } else {
            debug!("wrong hash");
            self.state = Some(State::Protocol);
            return Ok(());
        };
        if let Ok(d) = cap[1].parse::<i32>() {
            self.channel = Some(ChannelSpec::Discussion(d))
        } else {
            self.channel = Some(ChannelSpec::channel(cap[1].into()));
        };
        let size: usize = (&cap[3]).parse()?;
        debug!("parse_apply = {:?} {:?}", hash, size);
        let mut path = self.config.repositories_path.join("tmp");
        path.push(
            self.repo_id
                .as_ref()
                .map(|r| r.repo_id)
                .unwrap()
                .hyphenated()
                .to_string(),
        );
        tokio::fs::create_dir_all(&path).await?;
        path.push(&hash.to_base32());
        let f = Some(
            OpenOptions::new()
                .read(true)
                .write(true)
                .create(true)
                .open(&path)
                .await?,
        );
        let file = TmpFile { f, path };
        self.state = Some(State::Change {
            hash,
            size,
            file_size: 0,
            file,
        });
        Ok(())
    }

    fn parse_change(
        &mut self,
        cap: regex::Captures,
        chan: ChannelId,
        session: &mut Session,
        full: bool,
    ) -> Result<(), crate::Error> {
        let hash = if let Some(h) = libpijul::pristine::Hash::from_base32((&cap[4]).as_bytes()) {
            h
        } else {
            debug!("wrong hash");
            return Ok(());
        };
        if self.repo_id.is_none() {
            return Ok(());
        }
        let mut p = self
            .repo_id
            .as_ref()
            .unwrap()
            .nest_changes_path(&self.config);
        libpijul::changestore::filesystem::push_filename(&mut p, &hash);
        debug!("change path = {:?}", p);
        let mut v = thrussh::CryptoVec::new();
        let mut f = std::fs::File::open(&p)?;
        let full_size = std::fs::metadata(&p)?.len();
        let size = if full || full_size <= self.config.partial_change_size {
            full_size
        } else {
            libpijul::change::Change::size_no_contents(&mut f)?
        };
        v.resize(8 + size as usize);
        f.read_exact(&mut v[8..])?;
        BigEndian::write_u64(&mut v[..8], size);
        debug!("sending {:?}", size);
        session.data(chan, v);
        Ok(())
    }

    async fn apply(
        &mut self,
        hash: libpijul::pristine::Hash,
        file: TmpFile,
        size: usize,
        channel_id: thrussh::ChannelId,
        session: &mut thrussh::server::Session,
    ) -> Result<(), crate::Error> {
        // Check the hash, and save the change.
        debug!("saving change {:?}", hash);
        // Apply
        debug!("apply: {:?} {:?}", self.repo_id, self.channel);
        if self.repo_id.is_none() {
            return Ok(());
        }
        match self.channel {
            Some(ChannelSpec::CI(_)) | Some(ChannelSpec::Channel(_)) => {
                if !self.permissions.contains(Perm::APPLY) {
                    return Err((crate::Error::Forbidden).into());
                }
            }
            Some(ChannelSpec::Discussion(_)) => {
                if !self.permissions.contains(Perm::CREATE_DISCUSSION) {
                    return Err((crate::Error::Forbidden).into());
                }
            }
            None => return Ok(()),
        }
        let channel = self.channel.take().unwrap();
        debug!("applying {:?} to {:?}", hash, channel);
        let repo_id = self.repo_id.as_ref().unwrap();
        let repo = self
            .config
            .repo_locks
            .get(&repo_id.origin_id())
            .await
            .unwrap();
        if !repo.changes.has_change(&hash) {
            let owner = self.repo_id.as_ref().unwrap().owner_id.clone();
            use crate::db::users::dsl as u;
            let n = if self.config.size_limit > 0 {
                diesel::update(u::users.find(owner))
                    .set((u::storage_used.eq(u::storage_used + size as i64),))
                    .filter(
                        (u::storage_used + size as i64).le(self.config.size_limit)
                    )
                    .execute(&mut self.db.get().await?)
                    .await?
            } else {
                diesel::update(u::users.find(owner))
                    .set((u::storage_used.eq(u::storage_used + size as i64),))
                    .execute(&mut self.db.get().await?)
                    .await?
            };
            if n > 0 {
                let name = repo.changes.filename(&hash);
                tokio::fs::create_dir_all(name.parent().unwrap()).await?;
                let mut file = file.persist(&name).await?;
                debug!("seek");
                let changes = repo.changes.clone();
                file.flush().await?;
                file.seek(tokio::io::SeekFrom::Start(0)).await?;
                debug!("seek done");
                changes.check(&mut file, &hash, None).await?;
            } else {
                return Err((crate::Error::Quota { quota: 0 }).into());
            }
        }

        match channel {
            ChannelSpec::Channel(c) => self.apply_to_channel(repo, c, hash).await?,
            ChannelSpec::CI(c) => self.channel = Some(ChannelSpec::CI(c)),
            ChannelSpec::Discussion(d) => {
                self.apply_to_discussion(repo, d, hash, channel_id, session)
                    .await?
            }
        }
        Ok(())
    }

    async fn apply_to_channel(
        &mut self,
        repo: Arc<crate::repository::Repo>,
        c: String,
        hash: libpijul::Hash,
    ) -> Result<(), crate::Error> {
        use libpijul::changestore::ChangeStore;
        let header = repo.changes.get_header(&hash)?;
        let id = self.repo_id.as_ref().unwrap();

        let repo_id = self.repo_id.as_ref().unwrap().repo_id;
        self.config
            .replicator
            .send_change(id.origin_id(), hash, false)
            .await?;
        self.config
            .replicator
            .handle_update(
                None,
                None,
                None,
                ::replication::Update::Apply {
                    repo: id.origin_id(),
                    channel: c.clone(),
                    hash,
                },
            )
            .await?;

        let hook = pijul_hooks::HookContent::ChangesApplied {
            repository_owner: self.owner.to_string(),
            repository_name: self.repo.to_string(),
            applied_by: self.user.to_string(),
            message: header.message.clone(),
            author: crate::change::authors_string(&self.db, &header.authors).await?,
            hash: hash.to_base32(),
        };
        let mut db_ = self.db.get().await?;
        use crate::db::repositories::dsl as repos;
        diesel::update(repos::repositories.find(repo_id))
            .set(repos::last_pushed.eq(diesel::dsl::now))
            .execute(&mut db_)
            .await?;

        for h in header.authors {
            debug!("author {:?}", h);
            if let Some(k) =
                h.0.get("key")
                    .and_then(|k| bs58::decode(k.as_bytes()).into_vec().ok())
            {
                use crate::db::contributors::dsl as c;
                diesel::insert_into(c::contributors)
                    .values((
                        c::repo.eq(repo_id),
                        c::key.eq(k),
                        c::revision.eq(diesel::dsl::now),
                    ))
                    .on_conflict_do_nothing()
                    .execute(&mut db_)
                    .await?;
            } else {
                error!("Decoding error");
            }
        }
        let db = self.db.clone();
        tokio::spawn(async move {
            crate::hooks::run_hooks_by_repo_id(
                &mut db.get().await.unwrap(),
                repo_id,
                crate::hooks::Action::APPLY_PATCH,
                &hook,
            )
            .await
        });
        self.channel = Some(ChannelSpec::Channel(c));
        Ok(())
    }

    async fn apply_to_discussion(
        &mut self,
        repo: Arc<crate::repository::Repo>,
        d: i32,
        hash: libpijul::Hash,
        channel: thrussh::ChannelId,
        session: &mut thrussh::server::Session,
    ) -> Result<(), crate::Error> {
        let id = self.repo_id.clone().unwrap();
        let user_id = self.user_id.unwrap();
        let user = self.user.clone();
        let repo_name = self.repo.clone();
        let ip_sql = self.ip_sql.clone();
        let config = self.config.clone();
        let repo_ = repo.clone();
        let (x, err) = self
            .db
            .get()
            .await?
            .transaction(|mut txn| {
                (async move {
                    debug!("got transaction");
                    let mut err = None;
                    let d = if d == 0 {
                        use libpijul::changestore::ChangeStore;
                        let header = if let Ok(h) = repo_.changes.get_header(&hash) {
                            h
                        } else {
                            return Ok((None, None));
                        };
                        let (n, _) = if let Some(x) = create_discussion_repo(
                            &mut txn,
                            id.repo_id,
                            user_id,
                            &header.message,
                            ip_sql,
                        )
                        .await?
                        {
                            x
                        } else {
                            err = Some(thrussh::CryptoVec::from_slice(
                                b"A discussion with the same title already exists\n",
                            ));
                            return Ok((None, err));
                        };
                        err = Some(thrussh::CryptoVec::from_slice(
                            format!(
                                "Created discussion https://{}/{}/{}/discussions/{}\n",
                                config.host, user, repo_name, n
                            )
                            .as_bytes(),
                        ));
                        n
                    } else {
                        d
                    };

                    use crate::db::discussion_changes::dsl as dc;
                    use crate::db::discussions::dsl as d;
                    use diesel::sql_types::Bool;
                    let hashb32 = hash.to_base32();
                    let n = d::discussions
                        .filter(d::repository_id.eq(id.repo_id))
                        .filter(d::number.eq(d))
                        .filter(
                            diesel::dsl::sql::<Bool>("not exists(select 1 from discussion_changes join discussions on discussions.id = discussion_changes.discussion where repository_id = ")
                                .bind::<diesel::sql_types::Uuid, _>(id.repo_id)
                            .sql(" and number = ")
                            .bind::<Integer, _>(d)
                            .sql(" and discussion_changes.change = ")
                            .bind::<Text, _>(&hashb32)
                            .sql(" and discussion_changes.removed is null)"),
                        )
                        .select((
                            hashb32.clone().into_sql::<Text>(),
                            d::id,
                            user_id.into_sql::<diesel::sql_types::Uuid>().nullable(),
                        ))
                        .insert_into(dc::discussion_changes)
                        .into_columns((dc::change, dc::discussion, dc::pushed_by))
                        .returning(dc::discussion)
                        .get_result(txn)
                        .await
                        .optional()?;

                    debug!("n = {:?}", n);
                    if let Some(disc_id) = n {
                        debug!("{:?} {:?}", id.repo_id, d);
                        use crate::db::discussion_subscriptions::dsl as ds;
                        let title = diesel::update(d::discussions.find(disc_id))
                            .set(d::changes.eq(d::changes + 1))
                            .returning(d::title)
                            .get_result(txn)
                            .await?;

                        diesel::insert_into(ds::discussion_subscriptions)
                            .values((ds::user_id.eq(user_id), ds::discussion_id.eq(disc_id)))
                            .on_conflict_do_nothing()
                            .execute(txn)
                            .await?;

                        Ok((Some((disc_id, title)), err))
                    } else {
                        Err(diesel::result::Error::RollbackTransaction)
                    }
                })
                .scope_boxed()
            })
            .await?;

        if let Some((disc_id, title)) = x {
            use libpijul::changestore::ChangeStore;
            let header = repo.changes.get_header(&hash)?;
            let db = self.db.clone();
            let repo_id = self.repo_id.as_ref().unwrap();
            self.config
                .replicator
                .send_change(repo_id.origin_id(), hash, false)
                .await?;
            let hook = pijul_hooks::HookContent::NewChanges {
                repository_owner: self.owner.to_string(),
                repository_name: self.repo.to_string(),
                pusher: self.user.to_string(),
                message: header.message.clone(),
                author: crate::change::authors_string(&self.db, &header.authors).await?,
                hash: hash.to_base32(),
            };
            let repo_id = repo_id.repo_id;
            tokio::spawn(async move {
                crate::hooks::run_hooks_by_repo_id(
                    &mut db.get().await.unwrap(),
                    repo_id,
                    crate::hooks::Action::ADD_PATCH,
                    &hook,
                )
                .await
            });
            self.send_change_email(d, disc_id, title, hash, header)
                .await?
        }
        if let Some(err) = err {
            session.extended_data(channel, 1, err);
        }
        self.channel = Some(ChannelSpec::Discussion(d));
        Ok(())
    }

    async fn send_change_email(
        &self,
        n: i32,
        discussion_id: uuid::Uuid,
        name: String,
        hash: libpijul::pristine::Hash,
        header: libpijul::change::ChangeHeader,
    ) -> Result<(), crate::Error> {
        let user_id = if let Some(user) = self.user_id {
            user
        } else {
            return Ok(());
        };
        use crate::db::discussion_subscriptions::dsl as ds;
        use crate::db::users::dsl as u;
        let mut db = self.db.get().await?;
        let it = ds::discussion_subscriptions
            .inner_join(u::users)
            .filter(ds::discussion_id.eq(discussion_id))
            .filter(u::id.eq(user_id))
            .select((u::email, u::login))
            .get_results::<(String, String)>(&mut db)
            .await?;

        let hash = hash.to_base32();
        use cuach::Render;
        use diesel::sql_types::{Text, Uuid};
        let obj = format!("[{}/{}] Changed: {} (#{})", self.owner, self.repo, name, n);
        for (address, login) in it {
            let n_insert = diesel::sql_query(
                "INSERT INTO email_log(discussion, email)
SELECT $1, $2 WHERE (
  SELECT COUNT(1) FROM email_log WHERE
    discussion = $1
    AND email = $2
    AND time > NOW() - INTERVAL '1h'
) < 10",
            )
            .bind::<Uuid, _>(discussion_id)
            .bind::<Text, _>(&address)
            .execute(&mut db)
            .await?;

            if n_insert == 0 {
                continue;
            }

            let mut body_html = String::new();
            debug!("sending email to {:?}", login);
            {
                (EmailDiscussionChangeTemplate {
                    host: &self.config.host,
                    repo: &self.repo,
                    owner: &self.owner,
                    author: &self.user,
                    login: &login,
                    disc: n,
                    name: &name,
                    hash: &hash,
                    message: &header.message,
                })
                .render_into(&mut body_html)
                .unwrap();
            }
            let email_body = rusoto_ses::Body {
                text: Some(rusoto_ses::Content {
                    data: format!(
                        include_str!("discussions/new_change_email"),
                        author = self.user,
                        disc = n,
                        name = name,
                        message = header.message,
                        owner = self.owner,
                        repo = self.repo,
                        host = self.config.host,
                        hash = hash,
                        login = &login,
                    ),
                    charset: Some(EMAIL_CHARSET.to_string()),
                }),
                html: Some(rusoto_ses::Content {
                    data: body_html,
                    charset: Some(EMAIL_CHARSET.to_string()),
                }),
            };
            debug!("sending an email to {:?}", address);
            let config = self.config.clone();
            let obj = obj.clone();
            tokio::spawn(async move {
                crate::email::send_email(&config, obj, email_body, address).await
            });
        }
        Ok(())
    }
}

use crate::email::*;

#[template(path = "discussions/new_change_email.html")]
struct EmailDiscussionChangeTemplate<'a> {
    host: &'a str,
    owner: &'a str,
    repo: &'a str,
    author: &'a str,
    login: &'a str,
    disc: i32,
    name: &'a str,
    hash: &'a str,
    message: &'a str,
}

async fn create_discussion_repo(
    client: &mut diesel_async::AsyncPgConnection,
    repo: Uuid,
    author_id: Uuid,
    name: &str,
    ip_sql: ipnetwork::IpNetwork,
) -> Result<Option<(i32, Uuid)>, diesel::result::Error> {
    use crate::db::discussions::dsl as d;
    if d::discussions
        .filter(d::repository_id.eq(repo))
        .filter(d::closed.is_not_null())
        .filter(d::title.eq(&name))
        .select(d::id)
        .get_result::<uuid::Uuid>(client)
        .await
        .optional()?
        .is_some()
    {
        return Ok(None);
    }

    client
        .transaction(|mut txn| {
            (async move {
                use crate::db::discussion_subscriptions::dsl as ds;
                use crate::db::repositories::dsl as r;
                let number = diesel::update(r::repositories.find(repo))
                    .filter(crate::has_permissions!(
                        author_id,
                        repo,
                        Perm::CREATE_DISCUSSION.bits()
                    ))
                    .set(r::next_discussion_number.eq(r::next_discussion_number + 1))
                    .returning(r::next_discussion_number)
                    .get_result(&mut txn)
                    .await
                    .optional()?;
                let number = if let Some(number) = number {
                    number
                } else {
                    return Err(diesel::result::Error::NotFound);
                };
                let id = diesel::insert_into(d::discussions)
                    .values((
                        d::title.eq(&name),
                        d::author.eq(&author_id),
                        d::creation_date.eq(diesel::dsl::now),
                        d::creation_ip.eq(ip_sql),
                        d::number.eq(number),
                    ))
                    .on_conflict_do_nothing()
                    .returning(d::id)
                    .get_result::<uuid::Uuid>(&mut txn)
                    .await?;
                diesel::insert_into(ds::discussion_subscriptions)
                    .values((ds::user_id.eq(author_id), ds::discussion_id.eq(id)))
                    .execute(&mut txn)
                    .await?;
                Ok((number, id))
            })
            .scope_boxed()
        })
        .await
        .optional()
}