BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC CP4MZO6VZHSAL2ENE7MUYUS2BJFL6EIZSHFFMT66WPQD7KHOE5ZQC FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC 5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC WJNXI6Z4NH5YEBDS4F6FY5AI5GNXNITC4IJZVWOESTVLOV667AHQC HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQC ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC UWY5EVZ6AGIMH3OGHECQPK2WNT3MCDIQZRFB4TSBFBAVGVVD3MRQC AYQZ2UIA3HDOJAXF7WZZBFBQGNBLB63PPCMXJYLGES4FVIPXITPQC PVCRPP3BXTLRRT2VK2BCKFCRJXUOX3AEF2A6UCPOSZYEKQEMBWDQC FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC pub fn make_get_roster(id: &str) -> Element {let mut get_roster = Iq::from_get(Roster {ver: None,items: vec![],});get_roster.id = Some(id.to_string());get_roster.into()}
.online().and_then(XmppConnection::self_presence)
.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3))
if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
/// returns error if something went wrongfn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {Box::new(future::loop_fn((self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: client,}))
/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {if let Event::Stanza(s) = event {use try_from::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if let Some(id) = iq.id {if id == ID_GET_ROSTER {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into()as Result<xmpp_parsers::roster::Roster, _>{Ok(roster) => {info!("Got roster:");for i in roster.items {info!(" >>> {:?}", i);}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)
Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((client, account)))}_ => {warn!("Disconnected");future::err(account)}},Err((e, _)) => {error!("xmpp receive error: {}", e);future::err(account)
} else {error!("Iq stanza without id");Err(())
}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,{let XmppConnection {account,state: XmppState { client, roster },} = self;use tokio::prelude::Sink;let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(get_roster).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, roster },account,}.processing(XmppConnection::process_initial_roster, stop_future).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {let XmppConnection { account, inner } = self;let client = inner;
fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,state: XmppState { client, roster },} = self;
Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()
info!("Sending presence... {:?}", presence);client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, roster },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {if s.name() == "presence"&& s.attr("from").map_or(false, |f| f == conn.account.jid)&& s.attr("to").map_or(false, |f| f == conn.account.jid){Ok(true)} else {Ok(false)}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),
.and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))} else {future::ok(future::Loop::Continue((account, client)))}} else {future::err("Got wrong event".to_owned())}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)
})
.and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {if let Some(cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| format_err!("cmd receiver last error")),) as Box<Future<Item = (), Error = failure::Error>>} else {Box::new(future::ok(()))}})