fn connect<E>(self) -> impl Future<Item = Self, Error = E> {
let XmppConnection { account, inner } = self;
let inner = inner.unwrap_or_else(|| {
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
info!("xmpp connection...");
future::loop_fn(self, |XmppConnection { account, inner }| {
if let Some(inner) = inner {
Box::new(future::ok(future::Loop::Break(XmppConnection {
account,
inner: Some(inner),
}))) as Box<Future<Item = _, Error = E>>
} else {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
// future to wait for online
let (sink, stream) = client.split();
Box::new(future::loop_fn(
(sink, stream, account),
|(sink, stream, account)| {
stream.into_future().then(|r| match r {
Ok((event, stream)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(future::Loop::Break(
XmppConnection {
account,
inner: Some((sink, stream)),
},
)))
}
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((sink, stream, account)))
}
_ => {
warn!("Disconnected");
future::ok(future::Loop::Break(future::Loop::Continue(
XmppConnection {
account,
inner: None,
},
)))
}
},
Err((e, _)) => {
error!("xmpp receive error: {}", e);
future::ok(future::Loop::Break(future::Loop::Continue(
XmppConnection {
account,
inner: None,
},
)))
}
})
},
))