XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC 2L3JHRULRLBHT4K2VE4MHKDFX5RJXYUDTJJCLTUYH3DOOVCJT6WAC QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC PBRUH4BJGPVY2F4C6SS7F5ZTQJAEBKZVEXCT7EOQRA25DT4MMZEAC TDOR5XQUFRE2LRXML25IJ7A6CIZPFSRES7FQCXDBMIOV4JE2O6OAC 3GEU7TC7VMBZOSUXQ7II5IGRDGGQCJOLBKFZYUIOGWTKM56ETINQC inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
inner: Client,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,inner: Some(from.inner),}}
/// Error shoud be !fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
/// don't connect if stop_future resolvedfn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{
Box::new(future::loop_fn(account, |account| {info!("xmpp initialization...");let mut res_client = Client::new(&account.jid, &account.password);while let Err(e) = res_client {error!("Cann't init xmpp client: {}", e);res_client = Client::new(&account.jid, &account.password);}let client = res_client.expect("Cann't init xmpp client");info!("xmpp initialized");
Box::new(stop_future.select2(future::loop_fn(account, |account| {info!("xmpp initialization...");let res_client = Client::new(&account.jid, &account.password);match res_client {Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))as Box<Future<Item = _, Error = _>>,Ok(client) => {info!("xmpp initialized");
// future to wait for onlineSelf::online(client.split(), account).and_then(Self::self_presence)
// future to wait for onlineBox::new(XmppConnection {inner: client,account,}.online().and_then(XmppConnection::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),},),)}}}).map_err(|_: ()| ()),)
Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})}))
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)
}}impl XmppConnection {/// base XMPP processingfn xmpp_processing(&mut self, _event: &Event) {}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolvedfn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E>,S: FnMut(&mut Self, &Event) -> bool,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {inner: client,account,};xmpp.xmpp_processing(&event);if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::B((e, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Err(e),)))} else {future::err((account, Err(e)))}}})},)
fn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: std::rc::Rc<config::Account>,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
let presence = stanzas::make_presence(&account);info!("Sending presence...");Box::new(sink.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).join(future::loop_fn((account.clone(), stream), |(account, stream)| {stream.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, stream)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(stream))} else {future::ok(future::Loop::Continue((account, stream,)))}
let presence = stanzas::make_presence(&account);info!("Sending presence...");let account2 = account.clone();Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))
None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e)),).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(MaybeXmppConnection {account,inner: Some(inner),}),}),)} else {warn!("Don't gen connection on self-presence");Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>}
}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)
Ok(Either::A((_x, b))) => {info!("Got signal");// got signal, breaksBox::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Ok(Either::B((x, a))) => {info!("Got cmd");// got cmd, continueBox::new(future::ok(future::Loop::Continue(XmppState::new((x.0).1,a,x.1,)))) as Box<Future<Item = _, Error = _>>}Err(Either::A((e, b))) => {// got signal error, breakserror!("Signal error: {}", e);Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>
Ok((cmd, cmd_recv, conn)) => {if let Some(_cmd) = cmd {info!("Got cmd");// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(cmd_recv,signal,conn.into(),)))} else {future::ok(future::Loop::Break((None, conn.into())))}