QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQC WJNXI6Z4NH5YEBDS4F6FY5AI5GNXNITC4IJZVWOESTVLOV667AHQC FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC 5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC UWY5EVZ6AGIMH3OGHECQPK2WNT3MCDIQZRFB4TSBFBAVGVVD3MRQC XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC PVCRPP3BXTLRRT2VK2BCKFCRJXUOX3AEF2A6UCPOSZYEKQEMBWDQC AYQZ2UIA3HDOJAXF7WZZBFBQGNBLB63PPCMXJYLGES4FVIPXITPQC X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC IK3YDPTYYB4IQR3JFFXFLPGWTBG4HE4OCMKJ447RK72OYESHOMRQC AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC 4LRBIGVT4GOFDT7EUBBRG7776IC6WVJYTB6NVMIBVWAFGISYGIUAC HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}Err(e) => future::err((xmpp.account, Err(e))),
if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
fn online(&mut self, event: Event) -> Result<bool, failure::Error> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(format_err!("Disconnected while online"))}}}fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {if let Event::Stanza(s) = event {use try_from::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if let Some(id) = iq.id {if id == ID_GET_ROSTER {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {Err(format_err!("Get error instead of roster"))}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into()as Result<xmpp_parsers::roster::Roster, _>{Ok(roster) => {info!("Got roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.roster.insert(i.jid, ());}Ok(true)}Err(e) => Err(format_err!("Cann't parse roster: {}", e)),}}_ => Err(format_err!("Unknown result of roster")),}} else {Ok(false)
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {Box::new(future::loop_fn((self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: client,}))}Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((client, account)))}_ => {warn!("Disconnected");future::err(account)
}Err(_e) => Ok(false),}} else {Err(format_err!("Wrong event while waiting roster"))}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,inner: client,} = self;use tokio::prelude::Sink;let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(get_roster).map_err(move |e| {error!("Error on querying roster: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {inner: client,account,}.processing(XmppConnection::process_initial_roster, stop_future)}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn),})
})},))
fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,inner: client,} = self;
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {let XmppConnection { account, inner } = self;let client = inner;
info!("Sending presence... {:?}", presence);client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {inner: client,account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {if s.name() == "presence"&& s.attr("from").map_or(false, |f| f == conn.account.jid)&& s.attr("to").map_or(false, |f| f == conn.account.jid){Ok(true)} else {Ok(false)}} else {Err(format_err!("Wrong event while waiting self-presence"))}},stop_future,)}).then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn),})
Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))} else {future::ok(future::Loop::Continue((account, client)))}} else {future::err("Got wrong event".to_owned())}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)
S: stream::Stream<Item = XmppCommand> + 'static,
S: stream::Stream<Item = XmppCommand, Error = ()> + 'static,