XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
2L3JHRULRLBHT4K2VE4MHKDFX5RJXYUDTJJCLTUYH3DOOVCJT6WAC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
PBRUH4BJGPVY2F4C6SS7F5ZTQJAEBKZVEXCT7EOQRA25DT4MMZEAC
TDOR5XQUFRE2LRXML25IJ7A6CIZPFSRES7FQCXDBMIOV4JE2O6OAC
3GEU7TC7VMBZOSUXQ7II5IGRDGGQCJOLBKFZYUIOGWTKM56ETINQC
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
inner: Client,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
inner: Some(from.inner),
}
}
/// Error shoud be !
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
/// don't connect if stop_future resolved
fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
Box::new(future::loop_fn(account, |account| {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
Box::new(
stop_future
.select2(
future::loop_fn(account, |account| {
info!("xmpp initialization...");
let res_client = Client::new(&account.jid, &account.password);
match res_client {
Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
as Box<Future<Item = _, Error = _>>,
Ok(client) => {
info!("xmpp initialized");
// future to wait for online
Self::online(client.split(), account)
.and_then(Self::self_presence)
// future to wait for online
Box::new(
XmppConnection {
inner: client,
account,
}
.online()
.and_then(XmppConnection::self_presence)
.then(
|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
},
),
)
}
}
})
.map_err(|_: ()| ()),
)
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
}))
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
impl XmppConnection {
/// base XMPP processing
fn xmpp_processing(&mut self, _event: &Event) {}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved
fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E>,
S: FnMut(&mut Self, &Event) -> bool,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection { inner, account } = xmpp;
inner.into_future().select2(stop_future).then(|r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
inner: client,
account,
};
xmpp.xmpp_processing(&event);
if stop_condition(&mut xmpp, &event) {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
} else {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
Err(Either::B((e, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
})
},
)
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: std::rc::Rc<config::Account>,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
Box::new(
sink.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.join(
future::loop_fn((account.clone(), stream), |(account, stream)| {
stream
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, stream)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from")
.map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(stream))
} else {
future::ok(future::Loop::Continue((
account, stream,
)))
}
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
let account2 = account.clone();
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e)),
)
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(MaybeXmppConnection {
account,
inner: Some(inner),
}),
}),
)
} else {
warn!("Don't gen connection on self-presence");
Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
Ok(Either::A((_x, b))) => {
info!("Got signal");
// got signal, breaks
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
}
Ok(Either::B((x, a))) => {
info!("Got cmd");
// got cmd, continue
Box::new(future::ok(future::Loop::Continue(XmppState::new(
(x.0).1,
a,
x.1,
)))) as Box<Future<Item = _, Error = _>>
}
Err(Either::A((e, b))) => {
// got signal error, breaks
error!("Signal error: {}", e);
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
Ok((cmd, cmd_recv, conn)) => {
if let Some(_cmd) = cmd {
info!("Got cmd");
// got cmd, continue
future::ok(future::Loop::Continue(XmppState::new(
cmd_recv,
signal,
conn.into(),
)))
} else {
future::ok(future::Loop::Break((None, conn.into())))
}