OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
ZI4GJ72VMCBV6EQAO4PIKCGXV6ASUYOTGPRVSIGCKDBCDNRQNUFAC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
O2GM5J4FSOXLTJURP2K6JMCBRAJ2JPJ723A43CR6AT2VQQQYRHFQC
5A5UVGNMHEYI62XUENBEPSRJ7M3MBULYITRDOEYYA5ZDUMFBB6ZQC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
EOHEZXX3TBKGJTOMXEPAPHLAJSATRXNY4AIL63BE4VG3HFNWARVQC
impl XmppConnection {
fn new(account: config::Account) -> XmppConnection {
XmppConnection {
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
inner: Some(from.inner),
}
}
}
impl MaybeXmppConnection {
fn new(account: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
Self::online(client.split(), account)
.and_then(Self::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
XmppConnection {
inner: client,
account,
}
.online()
.and_then(XmppConnection::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: std::rc::Rc<config::Account>,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
Box::new(
sink.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.join(
future::loop_fn((account.clone(), stream), |(account, stream)| {
stream
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, stream)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from")
.map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(stream))
} else {
future::ok(future::Loop::Continue((
account, stream,
)))
}
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
let account2 = account.clone();
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e)),
)
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection {
account,
inner: Some(inner),
}),
}),
)
} else {
warn!("Don't gen connection on self-presence");
Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
struct ServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl hyper::service::Service for ServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
type Future =
Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
let xmpp_to_opt = req.headers().get("X-XMPP-To");
let xmpp_to_res: Result<String, std::borrow::Cow<str>> = xmpp_to_opt.map_or_else(
|| Err("none".into()),
|xmpp_to| {
xmpp_to.to_str().map(|x| x.to_owned()).map_err(|e| {
format!("\"{}\" {}", String::from_utf8_lossy(xmpp_to.as_bytes()), e).into()
})
},
);
match xmpp_to_res {
Err(err) => {
warn!("Unknown destination: {}", err);
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Unknown destination: {}", err)))
.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
}),
)) as Box<Future<Item = _, Error = _> + Send + 'static>
}
Ok(xmpp_to) => {
info!("Got request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::new(
req.into_body()
.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
.fold(String::new(), |mut acc, ch| {
std::str::from_utf8(&*ch).map(|s| {
acc.push_str(s);
acc
})
})
.and_then(move |message: String| {
if !message.is_empty() {
Box::new(
cmd_send
.clone()
.send(XmppCommand { xmpp_to, message })
.then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(
Body::from("Accepted"),
)),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!(
"Command sent error: {}",
e
))),
)
}
})
.map_err(|e| {
Box::new(e)
as Box<
dyn std::error::Error + Sync + Send + 'static,
>
}),
)
} else {
warn!("Empty message");
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Empty message"))
.map_err(|e| {
Box::new(e)
as Box<
dyn std::error::Error + Sync + Send + 'static,
>
}),
))
as Box<Future<Item = _, Error = _> + Send + 'static>
}
}),
) as Box<Future<Item = _, Error = _> + Send + 'static>
}
}
}
}
struct MakeServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl<Ctx> hyper::service::MakeService<Ctx> for MakeServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
type Service = ServiceCmd;
type Future = tokio::prelude::future::FutureResult<ServiceCmd, Self::MakeError>;
type MakeError = hyper::http::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
tokio::prelude::future::ok(ServiceCmd {
cmd_send: self.cmd_send.clone(),
})
}
}
.serve(MakeServiceCmd { cmd_send })
.serve(move || {
let cmd_send = cmd_send.clone();
service_fn(move |_req: Request<Body>| {
info!("Got request");
cmd_send.clone().send(XmppCommand {}).then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Command sent error: {}", e))),
)
}
})
})
})