AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC 2L3JHRULRLBHT4K2VE4MHKDFX5RJXYUDTJJCLTUYH3DOOVCJT6WAC OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC FDHRCKH5OM4TONRBQZK5MNKAXTX56LZWVLXEW3QATI5KL4JZO3QQC BTOZT4JPYXPTMXBSKQGXZ3K46UNACE65RIKAGPLUH3OIRVV7WBKAC TDOR5XQUFRE2LRXML25IJ7A6CIZPFSRES7FQCXDBMIOV4JE2O6OAC QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC 5OBTKGDLRGZ4K4373CVGBF53LRECXQ7OW45PW43MJLBU622OKMTQC EOHEZXX3TBKGJTOMXEPAPHLAJSATRXNY4AIL63BE4VG3HFNWARVQC ZI4GJ72VMCBV6EQAO4PIKCGXV6ASUYOTGPRVSIGCKDBCDNRQNUFAC inner: Client,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,inner: Some(from.inner),}}
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
XmppConnection {inner: client,account,}.online().and_then(XmppConnection::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})
Self::online(client.split(), account).and_then(Self::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})
impl XmppConnection {/// base XMPP processingfn xmpp_processing(&mut self, event: &Event) {}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolvedfn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E>,S: FnMut(&mut Self, &Event) -> bool,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {inner: client,account,};xmpp.xmpp_processing(&event);if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::B((e, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Err(e),)))} else {future::err((account, Err(e)))}}})},)}
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
fn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: std::rc::Rc<config::Account>,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
(self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {
(sink, stream, account),|(sink, stream, account)| {stream.into_future().then(|r| match r {Ok((event, stream)) => match event {
let presence = stanzas::make_presence(&account);info!("Sending presence...");let account2 = account.clone();Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))
let presence = stanzas::make_presence(&account);info!("Sending presence...");Box::new(sink.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).join(future::loop_fn((account.clone(), stream), |(account, stream)| {stream.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, stream)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(stream))} else {future::ok(future::Loop::Continue((account, stream,)))}
}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)
None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e)),).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(MaybeXmppConnection {account,inner: Some(inner),}),}),)} else {warn!("Don't gen connection on self-presence");Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>}
let result = ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()),recv,config.account,));
let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));