#![deny(deprecated)]
#![deny(missing_docs)]
#![deny(bare_trait_objects)]
use hyper::{Body, Request, Response, Server};
use log::{error, info, warn};
mod config;
use crate::config::Config;
mod xmpp;
use crate::xmpp::{xmpp_process, XmppCommand};
mod stoppable_receiver;
use crate::stoppable_receiver::stop_receiver;
async fn body_to_string(req: Request<Body>) -> Result<String, failure::Error> {
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
std::str::from_utf8(&whole_body)
.map(std::string::ToString::to_string)
.map_err(std::convert::Into::into)
}
struct ServiceCmd {
cmd_send: tokio::sync::mpsc::Sender<XmppCommand>,
}
impl hyper::service::Service<Request<Body>> for ServiceCmd {
type Response = Response<Body>;
type Error = failure::Error;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn poll_ready(
&mut self,
_: &mut std::task::Context,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let xmpp_to_opt = req.headers().get("X-XMPP-To");
let xmpp_to_res: Result<xmpp_parsers::Jid, failure::Error> = xmpp_to_opt.map_or_else(
|| Err(failure::format_err!("No X-XMPP-To header")),
|xmpp_to| {
std::str::from_utf8(xmpp_to.as_bytes())
.map_err(std::convert::Into::into)
.and_then(|s| std::str::FromStr::from_str(s).map_err(std::convert::Into::into))
},
);
let xmpp_muc_opt = req
.headers()
.get("X-XMPP-Muc")
.map(|h| h.to_str().map(std::string::ToString::to_string));
let xmpp_pres_opt = req.headers().get("X-XMPP-Presence");
let xmpp_pres_res: Result<xmpp_parsers::presence::Show, failure::Error> = xmpp_pres_opt
.map_or_else(
|| Err(failure::format_err!("No X-XMPP-Presence header")),
|show| {
std::str::from_utf8(show.as_bytes())
.map_err(std::convert::Into::into)
.and_then(|s| {
std::str::FromStr::from_str(s)
.map_err(|e| failure::format_err!("Incorrect presence {}", e))
})
},
);
match (xmpp_muc_opt, xmpp_to_res, xmpp_pres_res) {
(None, Err(err), Err(err2)) => {
warn!("Unknown destination: {}", err);
warn!("Unknown destination2: {}", err2);
Box::pin(async move {
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!(
"Unknown destination: {}\nUnknown destination2: {}",
err, err2,
)))
.map_err(std::convert::Into::into)
})
}
(None, _, Ok(show)) => {
info!("Got presence request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::pin(async move {
match body_to_string(req).await {
Ok(message) => {
match cmd_send.send(XmppCommand::Presence { show, message }).await {
Ok(_) => Response::builder()
.body(Body::from("Accepted".to_string()))
.map_err(std::convert::Into::into),
Err(e) => {
error!("Cann't send presence command: {}", e);
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get presence text".to_string()))
.map_err(std::convert::Into::into)
}
}
}
Err(_) => Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get presence text".to_string()))
.map_err(std::convert::Into::into),
}
})
}
(None, Ok(xmpp_to), _) => {
if req.uri().path() == "/ping" {
info!("Got ping request");
let cmd_send = self.cmd_send.clone();
Box::pin(async move {
match cmd_send
.send(XmppCommand::Ping {
opt_xmpp_to: Some(xmpp_to),
})
.await
{
Ok(_) => Response::builder()
.body(Body::from("Accepted".to_string()))
.map_err(std::convert::Into::into),
Err(e) => {
error!("Cann't send ping command: {}", e);
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't send ping".to_string()))
.map_err(std::convert::Into::into)
}
}
})
} else {
info!("Got message request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::pin(async move {
match body_to_string(req).await {
Ok(message) => {
match cmd_send.send(XmppCommand::Chat { xmpp_to, message }).await {
Ok(_) => Response::builder()
.body(Body::from("Accepted".to_string()))
.map_err(std::convert::Into::into),
Err(e) => {
error!("Cann't send message command: {}", e);
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get message text".to_string()))
.map_err(std::convert::Into::into)
}
}
}
Err(_) => Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get message text".to_string()))
.map_err(std::convert::Into::into),
}
})
}
}
(Some(Ok(muc_id)), _, Ok(show)) => {
info!("Got chat presence request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::pin(async move {
match body_to_string(req).await {
Ok(message) => {
match cmd_send
.send(XmppCommand::ChatroomPresence {
muc_id,
show,
message,
})
.await
{
Ok(_) => Response::builder()
.body(Body::from("Accepted".to_string()))
.map_err(std::convert::Into::into),
Err(e) => {
error!("Cann't send chat presence command: {}", e);
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(
"Cann't get chat presence text".to_string(),
))
.map_err(std::convert::Into::into)
}
}
}
Err(_) => Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get chat presence text".to_string()))
.map_err(std::convert::Into::into),
}
})
}
(Some(Ok(muc_id)), _, _) => {
info!("Got chat message request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::pin(async move {
match body_to_string(req).await {
Ok(message) => {
match cmd_send
.send(XmppCommand::Chatroom { muc_id, message })
.await
{
Ok(_) => Response::builder()
.body(Body::from("Accepted".to_string()))
.map_err(std::convert::Into::into),
Err(e) => {
error!("Cann't send chat message command: {}", e);
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(
"Cann't get chat message text".to_string(),
))
.map_err(std::convert::Into::into)
}
}
}
Err(_) => Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Cann't get chat message text".to_string()))
.map_err(std::convert::Into::into),
}
})
}
(Some(Err(err)), _, _) => {
warn!("Unknown destination: {}", err);
Box::pin(async move {
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Unknown destination: {}", err,)))
.map_err(std::convert::Into::into)
})
}
}
}
}
struct MakeServiceCmd {
cmd_send: tokio::sync::mpsc::Sender<XmppCommand>,
}
impl<T> hyper::service::Service<T> for MakeServiceCmd {
type Response = ServiceCmd;
type Error = failure::Error;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;
fn poll_ready(
&mut self,
_: &mut std::task::Context,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, _: T) -> Self::Future {
let cmd_send = self.cmd_send.clone();
let fut = async move { Ok(ServiceCmd { cmd_send }) };
Box::pin(fut)
}
}
#[tokio::main]
pub async fn main() -> Result<(), failure::Error> {
env_logger::init();
let args = clap::App::new("SendXMPP")
.version("0.1.0")
.author("O01eg <o01eg@yandex.ru>")
.arg(
clap::Arg::new("config")
.short('c')
.long("config")
.value_name("CONFIG")
.help("File with configuration")
.takes_value(true),
)
.get_matches();
let config = Config::read(args.value_of("config").expect("Mandatory option config"))
.expect("Cann't read config file");
use futures_util::future::FutureExt;
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Stop signal registering failed {}", e);
}
}
.shared();
let (cmd_send, cmd_recv) = tokio::sync::mpsc::channel(10);
let http_server = Server::bind(&config.http)
.serve(MakeServiceCmd {
cmd_send: cmd_send.clone(),
})
.with_graceful_shutdown(ctrl_c.clone());
if let Some(ping) = config.account.ping {
let mut ping = tokio::time::interval(std::time::Duration::from_secs(ping));
let mut ctrl_c = ctrl_c.clone();
tokio::spawn(async move {
let mut phase = false;
loop {
let tick = ping.tick();
tokio::pin!(tick);
match futures_util::future::select(ctrl_c, tick).await {
futures_util::future::Either::Left((_, _)) => break,
futures_util::future::Either::Right((_, ctrl_c__)) => {
ctrl_c = ctrl_c__;
info!("Tick");
phase = !phase;
if phase {
if let Err(e) = cmd_send
.clone()
.send(XmppCommand::Ping { opt_xmpp_to: None })
.await
{
error!("Cann't send global ping {}", e);
}
} else if let Err(e) = cmd_send.send(XmppCommand::TimeoutCleanup).await {
error!("Cann't send timeout cleanup {}", e);
}
}
}
}
});
}
let xmpp_join = std::thread::Builder::new()
.name("XMPP Thread".into())
.spawn(move || -> Result<(), failure::Error> {
let recv = stop_receiver(cmd_recv, ctrl_c.clone());
let ctrt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));
info!("Stopping xmpp thread");
result
})
.expect("XMPP Thread");
info!("Server started");
http_server.await?;
info!("Server stopped");
xmpp_join.join().unwrap()?;
Ok(())
}