#![deny(deprecated)]
#![deny(missing_docs)]
#![deny(bare_trait_objects)]
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, middleware, web};
use log::{error, info, warn};
mod config;
use crate::config::Config;
mod xmpp;
use crate::xmpp::{XmppCommand, xmpp_process};
mod stoppable_receiver;
use crate::stoppable_receiver::stop_receiver;
async fn payload_to_string(payload: web::Payload) -> Result<String, failure::Error> {
match payload.to_bytes_limited(4096).await {
Ok(Ok(bytes)) => {
let message = String::from_utf8(bytes.to_vec());
match message {
Ok(m) => Ok(m),
Err(e) => Err(e.into()),
}
}
Ok(Err(e)) => Err(failure::format_err!("{}", e)),
Err(e) => Err(e.into()),
}
}
fn req_xmpp_to(req: &HttpRequest) -> Result<jid::Jid, failure::Error> {
let xmpp_to_opt = req.headers().get("X-XMPP-To");
let xmpp_to_res: Result<jid::Jid, failure::Error> = xmpp_to_opt.map_or_else(
|| Err(failure::format_err!("No X-XMPP-To header")),
|xmpp_to| {
std::str::from_utf8(xmpp_to.as_bytes())
.map_err(std::convert::Into::into)
.and_then(|s| std::str::FromStr::from_str(s).map_err(std::convert::Into::into))
},
);
xmpp_to_res
}
async fn ping(
req: HttpRequest,
data: web::Data<tokio::sync::mpsc::Sender<XmppCommand>>,
) -> HttpResponse {
let xmpp_to_res = req_xmpp_to(&req);
match xmpp_to_res {
Ok(xmpp_to) => {
info!("Got ping request");
let cmd_send = data.clone();
match cmd_send
.send(XmppCommand::Ping {
opt_xmpp_to: Some(xmpp_to),
})
.await
{
Ok(_) => HttpResponse::Ok().body("Accepted"),
Err(e) => {
error!("Cann't send ping command: {}", e);
HttpResponse::BadRequest().body("Cann't send ping")
}
}
}
_ => HttpResponse::BadRequest().body("Cann't send ping, missing X-XMPP-To"),
}
}
async fn index(
req: HttpRequest,
data: web::Data<tokio::sync::mpsc::Sender<XmppCommand>>,
payload: web::Payload,
) -> HttpResponse {
let xmpp_to_res = req_xmpp_to(&req);
let xmpp_muc_opt = req
.headers()
.get("X-XMPP-Muc")
.map(|h| h.to_str().map(std::string::ToString::to_string));
let xmpp_pres_opt = req.headers().get("X-XMPP-Presence");
let xmpp_pres_res: Result<xmpp_parsers::presence::Show, failure::Error> = xmpp_pres_opt
.map_or_else(
|| Err(failure::format_err!("No X-XMPP-Presence header")),
|show| {
std::str::from_utf8(show.as_bytes())
.map_err(std::convert::Into::into)
.and_then(|s| {
std::str::FromStr::from_str(s)
.map_err(|e| failure::format_err!("Incorrect presence {}", e))
})
},
);
match (xmpp_muc_opt, xmpp_to_res, xmpp_pres_res) {
(None, Err(err), Err(err2)) => {
warn!("Unknown destination: {}", err);
warn!("Unknown destination2: {}", err2);
HttpResponse::BadRequest().body(format!(
"Unknown destination: {}\nUnknown destination2: {}",
err, err2,
))
}
(None, _, Ok(show)) => {
info!("Got presence request. Reading body...");
let cmd_send = data.clone();
match payload_to_string(payload).await {
Ok(message) => match cmd_send.send(XmppCommand::Presence { show, message }).await {
Ok(_) => HttpResponse::Ok().body("Accepted".to_string()),
Err(e) => {
error!("Cann't send presence command: {}", e);
HttpResponse::BadRequest().body("Cann't get presence text")
}
},
Err(_) => HttpResponse::BadRequest().body("Cann't get presence text"),
}
}
(None, Ok(xmpp_to), _) => {
info!("Got message request. Reading body...");
let cmd_send = data.clone();
match payload_to_string(payload).await {
Ok(message) => match cmd_send.send(XmppCommand::Chat { xmpp_to, message }).await {
Ok(_) => HttpResponse::Ok().body("Accepted"),
Err(e) => {
error!("Cann't send message command: {}", e);
HttpResponse::BadRequest().body("Cann't get message text")
}
},
Err(_) => HttpResponse::BadRequest().body("Cann't get message text"),
}
}
(Some(Ok(muc_id)), _, Ok(show)) => {
info!("Got chat presence request. Reading body...");
let cmd_send = data.clone();
match payload_to_string(payload).await {
Ok(message) => {
match cmd_send
.send(XmppCommand::ChatroomPresence {
muc_id,
show,
message,
})
.await
{
Ok(_) => HttpResponse::Ok().body("Accepted"),
Err(e) => {
error!("Cann't send chat presence command: {}", e);
HttpResponse::BadRequest().body("Cann't get chat presence text")
}
}
}
Err(_) => HttpResponse::BadRequest().body("Cann't get chat presence text"),
}
}
(Some(Ok(muc_id)), _, _) => {
info!("Got chat message request. Reading body...");
let cmd_send = data.clone();
match payload_to_string(payload).await {
Ok(message) => {
match cmd_send
.send(XmppCommand::Chatroom { muc_id, message })
.await
{
Ok(_) => HttpResponse::Ok().body("Accepted"),
Err(e) => {
error!("Cann't send chat message command: {}", e);
HttpResponse::BadRequest().body("Cann't get chat message text")
}
}
}
Err(_) => HttpResponse::BadRequest().body("Cann't get chat message text"),
}
}
(Some(Err(err)), _, _) => {
warn!("Unknown destination: {}", err);
HttpResponse::BadRequest().body(format!("Unknown destination: {}", err,))
}
}
}
#[tokio::main]
pub async fn main() -> Result<(), failure::Error> {
env_logger::init();
let args = clap::Command::new("SendXMPP")
.version("0.1.0")
.author("O01eg <o01eg@yandex.ru>")
.arg(
clap::Arg::new("config")
.short('c')
.long("config")
.value_name("CONFIG")
.help("File with configuration")
.action(clap::ArgAction::Set)
.required(true),
)
.get_matches();
let config = Config::read(
args.get_one::<String>("config")
.expect("Mandatory option config"),
)
.expect("Cann't read config file");
use futures_util::future::FutureExt;
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Stop signal registering failed {}", e);
}
}
.shared();
let (cmd_send, cmd_recv) = tokio::sync::mpsc::channel(10);
let http_cmd_send = cmd_send.clone();
let http_server = HttpServer::new(move || {
let logger =
middleware::Logger::new("%{r}a \"%r\" %s %b \"%{Referer}i\" \"%{User-Agent}i\" %T");
App::new()
.wrap(logger)
.app_data(web::Data::new(http_cmd_send.clone()))
.route("/ping", web::get().to(ping))
.route("/", web::post().to(index))
});
if let Some(ping) = config.account.ping {
let mut ping = tokio::time::interval(std::time::Duration::from_secs(ping));
let mut ctrl_c = ctrl_c.clone();
tokio::spawn(async move {
let mut phase = false;
loop {
let tick = ping.tick();
tokio::pin!(tick);
match futures_util::future::select(ctrl_c, tick).await {
futures_util::future::Either::Left((_, _)) => break,
futures_util::future::Either::Right((_, ctrl_c__)) => {
ctrl_c = ctrl_c__;
info!("Tick");
phase = !phase;
if phase {
if let Err(e) = cmd_send
.clone()
.send(XmppCommand::Ping { opt_xmpp_to: None })
.await
{
error!("Cann't send global ping {}", e);
}
} else if let Err(e) = cmd_send.send(XmppCommand::TimeoutCleanup).await {
error!("Cann't send timeout cleanup {}", e);
}
}
}
}
});
}
let xmpp_join = std::thread::Builder::new()
.name("XMPP Thread".into())
.spawn(move || -> Result<(), failure::Error> {
let recv = stop_receiver(cmd_recv, ctrl_c.clone());
let ctrt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));
info!("Stopping xmpp thread");
result
})
.expect("XMPP Thread");
info!("Server started");
http_server.bind(&config.http)?.run().await?;
info!("Server stopped");
xmpp_join.join().unwrap()?;
Ok(())
}