HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
QYY3KRGLZFFOMYZUMEX5DUFU2DSP3H3NVFLMVTS2PEFQZ6U63AMAC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
IK3YDPTYYB4IQR3JFFXFLPGWTBG4HE4OCMKJ447RK72OYESHOMRQC
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
O2GM5J4FSOXLTJURP2K6JMCBRAJ2JPJ723A43CR6AT2VQQQYRHFQC
5A5UVGNMHEYI62XUENBEPSRJ7M3MBULYITRDOEYYA5ZDUMFBB6ZQC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
UCY2DO3DQ2WE55CVVJYMHINBWCZYPCSZ5Q2EHMZNNZIU5LPCJNSAC
EOHEZXX3TBKGJTOMXEPAPHLAJSATRXNY4AIL63BE4VG3HFNWARVQC
ZI4GJ72VMCBV6EQAO4PIKCGXV6ASUYOTGPRVSIGCKDBCDNRQNUFAC
ZJ4QKTJROEJK2RSY4JS7EDJMUAVXBYVXZPFIXUC5BZGVPLP5XMJAC
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
inner: Client,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
inner: Some(from.inner),
}
}
/// Error shoud be !
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
/// don't connect if stop_future resolved
fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
Box::new(future::loop_fn(account, |account| {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let res_client = Client::new(&account.jid, &account.password);
match res_client {
Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
as Box<Future<Item = _, Error = _>>,
Ok(client) => {
info!("xmpp initialized");
// future to wait for online
Self::online(client.split(), account)
.and_then(Self::self_presence)
let stop_future2 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
inner: client,
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map(|(conn, _)| conn)
.map_err(|(acc, _)| acc)
.and_then(|conn| conn.self_presence(stop_future2))
.then(
|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
},
),
)
}
}
})
.map_err(|_: ()| ()),
)
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
}))
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
/// get connection and wait for online status and set presence
/// returns error if something went wrong
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: std::rc::Rc<config::Account>,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
Box::new(future::loop_fn(
(sink, stream, account),
|(sink, stream, account)| {
stream.into_future().then(|r| match r {
Ok((event, stream)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
impl XmppConnection {
/// base XMPP processing
fn xmpp_processing(&mut self, event: &Event) {
info!("Incoming xmpp event: {:?}", event);
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved
fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, failure::Error>),
Error = (
std::rc::Rc<config::Account>,
Result<Either<F, T>, failure::Error>,
),
>
where
F: Future<Item = T, Error = E>,
E: Into<failure::Error>,
S: FnMut(&mut Self, &Event) -> Result<bool, failure::Error>,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection { inner, account } = xmpp;
inner.into_future().select2(stop_future).then(|r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
inner: client,
inner: Some((sink, stream)),
}))
};
xmpp.xmpp_processing(&event);
match stop_condition(&mut xmpp, &event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
Err(e) => future::err((xmpp.account, Err(e))),
}
} else {
future::err((account, Ok(Either::A(b))))
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((sink, stream, account)))
}
Ok(Either::B((t, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
_ => {
warn!("Disconnected");
future::err(account)
}
Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
Err(Either::B((e, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Err(e.into()),
)))
} else {
future::err((account, Err(e.into())))
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
/// get connection and wait for online status and set presence
/// returns error if something went wrong
fn online(&mut self, event: &Event) -> Result<bool, failure::Error> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(format_err!("Disconnected while online"))
}
}
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
Box::new(
sink.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.join(
future::loop_fn((account.clone(), stream), |(account, stream)| {
stream
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, stream)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from")
.map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(stream))
} else {
future::ok(future::Loop::Continue((
account, stream,
)))
}
} else {
future::err("Got wrong event".to_owned())
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e)),
)
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
let account2 = account.clone();
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
Err(format_err!("Wrong event while waiting self-presence"))
Ok(inner) => future::ok(XmppConnection {
account,
inner: Some(inner),
}),
}),
)
} else {
warn!("Don't gen connection on self-presence");
Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
}
},
stop_future,
)
})
.then(|r| match r {
Err((account, e)) => {
error!("Cann't wait self-presence");
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
Ok(Either::A((_x, b))) => {
info!("Got signal");
// got signal, breaks
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
}
Ok(Either::B((x, a))) => {
info!("Got cmd");
// got cmd, continue
Box::new(future::ok(future::Loop::Continue(XmppState::new(
(x.0).1,
a,
x.1,
)))) as Box<Future<Item = _, Error = _>>
}
Err(Either::A((e, b))) => {
// got signal error, breaks
error!("Signal error: {}", e);
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
Ok((cmd, cmd_recv, conn)) => {
if let Some(_cmd) = cmd {
info!("Got cmd");
// got cmd, continue
future::ok(future::Loop::Continue(XmppState::new(
cmd_recv,
signal,
conn.into(),
)))
} else {
future::ok(future::Loop::Break((None, conn.into())))
}
struct ServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl hyper::service::Service for ServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = failure::Error;
type Future =
Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
let xmpp_to_opt = req.headers().get("X-XMPP-To");
let xmpp_to_res: Result<String, std::borrow::Cow<str>> = xmpp_to_opt.map_or_else(
|| Err("none".into()),
|xmpp_to| {
xmpp_to.to_str().map(|x| x.to_owned()).map_err(|e| {
format!("\"{}\" {}", String::from_utf8_lossy(xmpp_to.as_bytes()), e).into()
})
},
);
match xmpp_to_res {
Err(err) => {
warn!("Unknown destination: {}", err);
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Unknown destination: {}", err)))
.map_err(|e| e.into()),
)) as Box<Future<Item = _, Error = _> + Send + 'static>
}
Ok(xmpp_to) => {
info!("Got request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::new(
req.into_body()
.map_err(|e| e.into())
.fold(String::new(), |mut acc, ch| {
std::str::from_utf8(&*ch).map(|s| {
acc.push_str(s);
acc
})
})
.and_then(move |msg: String| {
if !msg.is_empty() {
Box::new(
cmd_send
.clone()
.send(XmppCommand { xmpp_to })
.then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(
Body::from("Accepted"),
)),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!(
"Command sent error: {}",
e
))),
)
}
})
.map_err(|e| e.into()),
)
} else {
warn!("Empty message");
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Empty message"))
.map_err(|e| e.into()),
))
as Box<Future<Item = _, Error = _> + Send + 'static>
}
}),
) as Box<Future<Item = _, Error = _> + Send + 'static>
}
}
}
}
struct MakeServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl<Ctx> hyper::service::MakeService<Ctx> for MakeServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = failure::Error;
type Service = ServiceCmd;
type Future = tokio::prelude::future::FutureResult<ServiceCmd, Self::MakeError>;
type MakeError = hyper::http::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
tokio::prelude::future::ok(ServiceCmd {
cmd_send: self.cmd_send.clone(),
})
}
}
.serve(MakeServiceCmd { cmd_send })
.serve(move || {
let cmd_send = cmd_send.clone();
service_fn(move |_req: Request<Body>| {
info!("Got request");
cmd_send.clone().send(XmppCommand {}).then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Command sent error: {}", e))),
)
}
})
})
})