Move out XmppConnection into own file
[?]
Apr 14, 2019, 7:38 AM
CCLGGFKRUNXBM6IOGEYJOFULCSVOL6C5PFAGCKEWNXOZIKPWLLWQCDependencies
- [2]
RRLRZTMRUse element processor for iq - [3]
J7VX56FWToDo - [4]
77USPY5ISending messages works! - [5]
TDOR5XQUAccept destination - [6]
L3D22A5JPrepare to check incoming presence - [7]
LL3D5CXKStaring using element processor - [8]
3ADA5BBXAdd items to roster from iq of "set" type - [9]
TPVUBB3FAnswer to ping requests - [10]
EBETRYK7Add counter for id. Check for jid in roster - [11]
HU3NZX5ZProcess self-presence via new processing code - [12]
AA2ZWGRLEnter to MUC - [13]
FDHRCKH5Unneded Box - [14]
JD62RVOJUpdate dependencies - [15]
AGIW6YR3Use shared future for signal everywhere - [16]
IK3YDPTYUpdate deps - [17]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [18]
3GEU7TC7Welcome to 2018! - [19]
O2GM5J4FDon't split xmpp receiving and sending - [20]
WJNXI6Z4Fill roster - [21]
SU4DNVCBStart to processing roster data - [22]
CBWCXUZZPrepare adding new items to roster - [23]
SA2IOFGYAdd items to roster - [24]
DCGEFPRCBetter README - [25]
RGOSS73UConvert self-presence to xmpp_parser's type - [26]
XGP44R5HRework stopping xmpp connection - [27]
DKXSFTDYSend stanzas via send queue - [28]
2L3JHRULCreate separate functions to process incoming XMPP stanzas - [29]
BWDUANCVSecond part of processing result is only about stop_future - [30]
4LRBIGVTShow info about xmpp errors - [31]
OGMBXBKPMove online to XmppConnection - [32]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [33]
UWY5EVZ6Add dummy roster data - [34]
FCPF2FV6Break connection on iq error - [35]
UMTLHH77Process commands in the separate function - [36]
ACXUIS63Update dependecies - [37]
NDDQQP2PUpdate deps - [38]
QTCUURXNAdd additional requirement for command stream - [39]
AYQZ2UIAUpdate deps - [40]
OB3HA2MDUse Client::new_with_jid to parse jid only once - [41]
X6L47BHQUse different structure for established xmpp connection - [42]
FWJDW3G5Allow process xmpp incoming stanzas with futures - [43]
HCCX7VW6Generate ids from counter - [44]
SYH7UQP6Make xmpp command enum to allow different commands Save subscription ask status. Don't ask if already requested subscription. - [45]
2VZBEEXAMessages fixed - [46]
5GINRCKLSend ping XEP-0199 - [47]
37OMJ4CKSend MUC message - [48]
UAT5MV5ODirectly use id for initial roster request - [49]
HOAZX2PBReorganize roster processing. Output roster - [50]
V5HDBSZMUse jid for receiver address - [51]
ALP2YJIURename XmppState to XmppProcessState - [52]
OANBCLN5Move xmpp client into XmppState - [53]
XOAM22TTSimplify xmpp incoming stanzas processing without futures - [54]
UO4WTU6UUpdate dependencies - [55]
5A5UVGNMMove receiver closing logic out of xmpp processing - [56]
VS6AHRWIMove XMPP to separate dir - [57]
H7R7Y3FQUse new processing code to wait online - [58]
QWE26TMVupdate deps - [59]
UIXIQHDYWait for commands via new processing code - [60]
3FYEOGCIMove additional rarely changed data to separate structure - [61]
PVCRPP3BSome servers don't send to in initial presence - [62]
5Y6YJ6UHAdd shutdown function to make actions before offline - [63]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection
Change contents
- file addition: xmpp_connection.rs[3.7]
use tokio_xmpp::{Client, Event, Packet};use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use std::collections::{HashMap, VecDeque};use super::XmppCommand;use super::stanzas;use super::element_processor;use crate::config;#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// ids countercounter: usize,/// map from id of adding item to roster and jid of itempending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,}struct XmppState {client: Client,data: XmppData,}pub struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}struct XmppElementProcessor {incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,}impl XmppElementProcessor {fn new() -> XmppElementProcessor {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {:#?}", e);true});incoming.register(&XmppConnection::incoming_iq_processing);XmppElementProcessor { incoming }}}pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedpub fn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => {let processors = XmppElementProcessor::new();processors.incoming.process(self, stanza.clone())}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {use std::convert::TryInto;if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid);} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}}}xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);self.state.data.send_queue.push_back(pong);}}_ => (), // ignore}true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurepub fn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if iq.id == id_init_roster {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {self.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.data.roster.insert(i.jid, (i.subscription, i.ask));}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)}}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| conn.process_initial_roster(event, &id_init_roster),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {:?}", presence);client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {Ok(presence) => {Ok(presence.from.as_ref() == Some(&conn.state.client.jid))}Err(e) => {warn!("Not a self-presence: {}", e);Ok(false)}}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}let sub_from = match rdata.0 {xmpp_parsers::roster::Subscription::From => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if !sub_from {info!("Not subscription from {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());self.state.data.pending_add_roster_ids.insert(id_add_roster, xmpp_to.clone());info!("Adding jid to roster... {:?}", add_roster);self.state.data.send_queue.push_back(add_roster);}}}}pub fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);}}}pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {:?}", muc_presence);use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {:?}", muc_presence);let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}} - edit in src/xmpp/stanzas.rs at line 47
presence.to = Some(jid);presence.into()}pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {let mut presence = Presence::new(PresenceType::Subscribed); - replacement in src/xmpp/mod.rs at line 0
use tokio::prelude::future::{self, Either};use tokio::prelude::future::{self, Either, Future}; - edit in src/xmpp/mod.rs at line 2
use tokio::prelude::{Future, Stream};use tokio_xmpp::{Client, Event, Packet}; - edit in src/xmpp/mod.rs at line 4[3.48]→[3.1192:1193](∅→∅),[3.1193]→[2.2405:2448](∅→∅),[3.306]→[3.420:434](∅→∅),[3.489]→[3.420:434](∅→∅),[3.305]→[3.420:434](∅→∅),[3.43]→[3.420:434](∅→∅),[3.262]→[3.420:434](∅→∅),[3.32]→[3.420:434](∅→∅),[3.34]→[3.420:434](∅→∅),[3.48]→[3.420:434](∅→∅),[3.1236]→[3.420:434](∅→∅),[3.32]→[3.420:434](∅→∅),[3.32]→[3.420:434](∅→∅),[3.48]→[3.420:434](∅→∅),[2.2448]→[3.420:434](∅→∅),[3.210]→[3.420:434](∅→∅)
use std::collections::{HashMap, VecDeque};mod stanzas; - edit in src/xmpp/mod.rs at line 6[3.433]→[3.430:431](∅→∅),[3.430]→[3.430:431](∅→∅),[3.431]→[3.1237:1274](∅→∅),[3.1274]→[2.2449:3031](∅→∅),[3.329]→[3.1786:1848](∅→∅),[3.490]→[3.1786:1848](∅→∅),[3.1104]→[3.1786:1848](∅→∅),[3.378]→[3.1786:1848](∅→∅),[2.3031]→[3.1786:1848](∅→∅),[3.1786]→[3.1786:1848](∅→∅),[3.1848]→[3.491:524](∅→∅),[3.524]→[2.3032:3121](∅→∅),[3.611]→[3.1848:1851](∅→∅),[2.3121]→[3.1848:1851](∅→∅),[3.1848]→[3.1848:1851](∅→∅),[3.1851]→[3.612:679](∅→∅),[3.679]→[2.3122:3336](∅→∅),[2.3336]→[3.795:847](∅→∅),[3.795]→[3.795:847](∅→∅),[3.847]→[3.1851:1880](∅→∅),[3.1851]→[3.1851:1880](∅→∅),[3.460]→[3.354:397](∅→∅),[3.392]→[3.354:397](∅→∅),[3.1880]→[3.354:397](∅→∅),[3.33]→[3.354:397](∅→∅),[3.33]→[3.354:397](∅→∅),[3.354]→[3.354:397](∅→∅),[3.397]→[3.1881:1911](∅→∅),[3.491]→[3.425:427](∅→∅),[3.423]→[3.425:427](∅→∅),[3.1911]→[3.425:427](∅→∅),[3.61]→[3.425:427](∅→∅),[3.27]→[3.425:427](∅→∅),[3.61]→[3.425:427](∅→∅),[3.425]→[3.425:427](∅→∅),[3.427]→[3.105:106](∅→∅),[3.105]→[3.105:106](∅→∅),[3.106]→[3.1912:1936](∅→∅),[3.101]→[3.31:74](∅→∅),[3.516]→[3.31:74](∅→∅),[3.448]→[3.31:74](∅→∅),[3.1936]→[3.31:74](∅→∅),[3.90]→[3.31:74](∅→∅),[3.90]→[3.31:74](∅→∅),[3.31]→[3.31:74](∅→∅),[3.74]→[3.1937:1959](∅→∅),[3.539]→[3.448:627](∅→∅),[3.471]→[3.448:627](∅→∅),[3.1959]→[3.448:627](∅→∅),[3.110]→[3.448:627](∅→∅),[3.47]→[3.448:627](∅→∅),[3.110]→[3.448:627](∅→∅),[3.448]→[3.448:627](∅→∅),[3.627]→[3.1960:1997](∅→∅),[3.1997]→[3.0:19](∅→∅),[3.148]→[3.0:19](∅→∅),[3.577]→[3.0:19](∅→∅),[3.19]→[3.1998:2111](∅→∅),[3.172]→[3.132:162](∅→∅),[3.113]→[3.132:162](∅→∅),[3.158]→[3.132:162](∅→∅),[3.677]→[3.132:162](∅→∅),[3.2111]→[3.132:162](∅→∅),[3.238]→[3.132:162](∅→∅),[3.132]→[3.132:162](∅→∅),[3.162]→[3.2112:2422](∅→∅),[3.2422]→[3.748:764](∅→∅),[3.312]→[3.748:764](∅→∅),[3.748]→[3.748:764](∅→∅),[3.764]→[3.2423:2425](∅→∅),[3.2425]→[3.766:767](∅→∅),[3.766]→[3.766:767](∅→∅),[3.767]→[3.2426:2453](∅→∅),[3.431]→[3.516:554](∅→∅),[3.430]→[3.516:554](∅→∅),[3.994]→[3.516:554](∅→∅),[3.2453]→[3.516:554](∅→∅),[3.516]→[3.516:554](∅→∅),[3.554]→[3.2454:2505](∅→∅),[3.483]→[3.876:1011](∅→∅),[3.268]→[3.876:1011](∅→∅),[3.482]→[3.876:1011](∅→∅),[3.254]→[3.876:1011](∅→∅),[3.1046]→[3.876:1011](∅→∅),[3.2505]→[3.876:1011](∅→∅),[3.359]→[3.876:1011](∅→∅),[3.876]→[3.876:1011](∅→∅),[3.1011]→[3.604:649](∅→∅),[3.649]→[3.1056:1132](∅→∅),[3.581]→[3.1056:1132](∅→∅),[3.37]→[3.1056:1132](∅→∅),[3.149]→[3.1056:1132](∅→∅),[3.1056]→[3.1056:1132](∅→∅),[3.648]→[3.653:690](∅→∅),[3.282]→[3.653:690](∅→∅),[3.302]→[3.653:690](∅→∅),[3.460]→[3.653:690](∅→∅),[3.563]→[3.653:690](∅→∅),[3.282]→[3.653:690](∅→∅),[3.152]→[3.653:690](∅→∅),[3.257]→[3.653:690](∅→∅),[3.1132]→[3.653:690](∅→∅),[3.109]→[3.653:690](∅→∅),[3.781]→[3.653:690](∅→∅),[3.653]→[3.653:690](∅→∅),[3.690]→[3.2506:2565](∅→∅),[3.337]→[3.744:745](∅→∅),[3.709]→[3.744:745](∅→∅),[3.641]→[3.744:745](∅→∅),[3.623]→[3.744:745](∅→∅),[3.337]→[3.744:745](∅→∅),[3.213]→[3.744:745](∅→∅),[3.2565]→[3.744:745](∅→∅),[3.419]→[3.744:745](∅→∅),[3.312]→[3.744:745](∅→∅),[3.209]→[3.744:745](∅→∅),[3.1193]→[3.744:745](∅→∅),[3.841]→[3.744:745](∅→∅),[3.234]→[3.744:745](∅→∅),[3.744]→[3.744:745](∅→∅),[3.745]→[3.2566:2671](∅→∅),[3.2671]→[2.3337:3393](∅→∅),[3.598]→[3.944:961](∅→∅),[3.769]→[3.944:961](∅→∅),[3.1197]→[3.944:961](∅→∅),[3.500]→[3.944:961](∅→∅),[3.355]→[3.944:961](∅→∅),[3.900]→[3.944:961](∅→∅),[3.581]→[3.944:961](∅→∅),[3.1155]→[3.944:961](∅→∅),[3.744]→[3.944:961](∅→∅),[3.500]→[3.944:961](∅→∅),[3.320]→[3.944:961](∅→∅),[3.2727]→[3.944:961](∅→∅),[3.577]→[3.944:961](∅→∅),[3.475]→[3.944:961](∅→∅),[3.1354]→[3.944:961](∅→∅),[3.216]→[3.944:961](∅→∅),[3.162]→[3.944:961](∅→∅),[3.962]→[3.944:961](∅→∅),[3.216]→[3.944:961](∅→∅),[2.3393]→[3.944:961](∅→∅),[3.944]→[3.944:961](∅→∅),[3.961]→[3.1355:1405](∅→∅),[3.1405]→[3.816:845](∅→∅),[3.845]→[3.1434:1464](∅→∅),[3.777]→[3.1434:1464](∅→∅),[3.1434]→[3.1434:1464](∅→∅),[3.1464]→[3.846:912](∅→∅),[3.912]→[3.1530:1591](∅→∅),[3.844]→[3.1530:1591](∅→∅),[3.99]→[3.1530:1591](∅→∅),[3.377]→[3.1530:1591](∅→∅),[3.1530]→[3.1530:1591](∅→∅),[3.1591]→[2.3394:3584](∅→∅),[3.1391]→[3.1476:1477](∅→∅),[3.1388]→[3.1476:1477](∅→∅),[3.1016]→[3.1476:1477](∅→∅),[3.612]→[3.1476:1477](∅→∅),[3.998]→[3.1476:1477](∅→∅),[3.1088]→[3.1476:1477](∅→∅),[3.1320]→[3.1476:1477](∅→∅),[3.349]→[3.1476:1477](∅→∅),[3.1346]→[3.1476:1477](∅→∅),[3.1016]→[3.1476:1477](∅→∅),[3.2918]→[3.1476:1477](∅→∅),[3.2010]→[3.1476:1477](∅→∅),[3.678]→[3.1476:1477](∅→∅),[3.1618]→[3.1476:1477](∅→∅),[3.636]→[3.1476:1477](∅→∅),[2.3584]→[3.1476:1477](∅→∅),[3.1476]→[3.1476:1477](∅→∅),[3.1477]→[2.3585:3789](∅→∅),[3.1593]→[3.1065:1066](∅→∅),[3.765]→[3.1065:1066](∅→∅),[3.1473]→[3.1065:1066](∅→∅),[3.1551]→[3.1065:1066](∅→∅),[3.3055]→[3.1065:1066](∅→∅),[3.789]→[3.1065:1066](∅→∅),[2.3789]→[3.1065:1066](∅→∅),[3.1065]→[3.1065:1066](∅→∅),[3.1066]→[2.3790:4201](∅→∅),[3.2005]→[3.3128:3162](∅→∅),[3.2431]→[3.3128:3162](∅→∅),[3.3139]→[3.3128:3162](∅→∅),[3.1963]→[3.3128:3162](∅→∅),[3.3467]→[3.3128:3162](∅→∅),[3.2455]→[3.3128:3162](∅→∅),[2.4201]→[3.3128:3162](∅→∅),[3.3128]→[3.3128:3162](∅→∅),[3.3162]→[2.4202:5219](∅→∅),[3.3023]→[3.3192:3287](∅→∅),[3.2462]→[3.3192:3287](∅→∅),[3.3170]→[3.3192:3287](∅→∅),[3.2981]→[3.3192:3287](∅→∅),[3.4405]→[3.3192:3287](∅→∅),[3.2486]→[3.3192:3287](∅→∅),[2.5219]→[3.3192:3287](∅→∅),[3.3192]→[3.3192:3287](∅→∅),[3.1167]→[3.2319:2359](∅→∅),[3.3287]→[3.2319:2359](∅→∅),[3.2319]→[3.2319:2359](∅→∅),[3.2359]→[3.3288:3733](∅→∅),[3.2883]→[3.1858:1868](∅→∅),[3.1359]→[3.1858:1868](∅→∅),[3.2804]→[3.1858:1868](∅→∅),[3.1359]→[3.1858:1868](∅→∅),[3.3733]→[3.1858:1868](∅→∅),[3.1137]→[3.1858:1868](∅→∅),[3.3372]→[3.1858:1868](∅→∅),[3.1858]→[3.1858:1868](∅→∅),[3.1868]→[3.2884:2890](∅→∅),[3.2890]→[3.3734:3736](∅→∅),[3.3736]→[3.2997:2998](∅→∅),[3.3375]→[3.2997:2998](∅→∅),[3.2997]→[3.2997:2998](∅→∅),[3.2998]→[3.3737:3788](∅→∅),[3.3788]→[2.5220:5324](∅→∅),[3.1289]→[3.4545:4606](∅→∅),[3.3163]→[3.4545:4606](∅→∅),[3.666]→[3.4545:4606](∅→∅),[3.3306]→[3.4545:4606](∅→∅),[3.1209]→[3.4545:4606](∅→∅),[3.395]→[3.4545:4606](∅→∅),[2.5324]→[3.4545:4606](∅→∅),[3.4545]→[3.4545:4606](∅→∅),[3.4606]→[3.3307:3369](∅→∅),[3.3369]→[2.5325:5391](∅→∅),[3.4784]→[3.8539:8553](∅→∅),[3.6248]→[3.8539:8553](∅→∅),[3.3049]→[3.8539:8553](∅→∅),[3.3603]→[3.8539:8553](∅→∅),[3.2217]→[3.8539:8553](∅→∅),[3.1135]→[3.8539:8553](∅→∅),[2.5391]→[3.8539:8553](∅→∅),[3.8539]→[3.8539:8553](∅→∅),[3.8553]→[2.5392:5427](∅→∅),[3.4820]→[3.8600:8670](∅→∅),[3.6296]→[3.8600:8670](∅→∅),[3.3097]→[3.8600:8670](∅→∅),[3.3651]→[3.8600:8670](∅→∅),[3.2253]→[3.8600:8670](∅→∅),[3.1171]→[3.8600:8670](∅→∅),[2.5427]→[3.8600:8670](∅→∅),[3.8600]→[3.8600:8670](∅→∅),[3.8670]→[2.5428:7935](∅→∅),[3.4843]→[3.8712:8726](∅→∅),[3.6339]→[3.8712:8726](∅→∅),[3.3140]→[3.8712:8726](∅→∅),[3.3694]→[3.8712:8726](∅→∅),[3.2276]→[3.8712:8726](∅→∅),[3.1194]→[3.8712:8726](∅→∅),[2.7935]→[3.8712:8726](∅→∅),[3.8712]→[3.8712:8726](∅→∅),[3.8726]→[2.7936:7967](∅→∅),[2.7967]→[3.8726:8736](∅→∅),[3.8726]→[3.8726:8736](∅→∅),[3.8736]→[2.7968:7981](∅→∅),[3.2431]→[3.2124:2130](∅→∅),[3.871]→[3.2124:2130](∅→∅),[3.1026]→[3.2124:2130](∅→∅),[3.1227]→[3.2124:2130](∅→∅),[3.8736]→[3.2124:2130](∅→∅),[3.970]→[3.2124:2130](∅→∅),[3.920]→[3.2124:2130](∅→∅),[2.7981]→[3.2124:2130](∅→∅),[3.2124]→[3.2124:2130](∅→∅),[3.2130]→[3.3896:3983](∅→∅),[3.1590]→[3.3896:3983](∅→∅),[3.285]→[3.3896:3983](∅→∅),[3.970]→[3.3896:3983](∅→∅),[3.3896]→[3.3896:3983](∅→∅),[3.3983]→[3.8737:8888](∅→∅),[3.658]→[3.4019:4137](∅→∅),[3.2282]→[3.4019:4137](∅→∅),[3.709]→[3.4019:4137](∅→∅),[3.1178]→[3.4019:4137](∅→∅),[3.8888]→[3.4019:4137](∅→∅),[3.1007]→[3.4019:4137](∅→∅),[3.1072]→[3.4019:4137](∅→∅),[3.4019]→[3.4019:4137](∅→∅),[3.4137]→[3.8889:9010](∅→∅),[3.841]→[3.4319:4335](∅→∅),[3.1773]→[3.4319:4335](∅→∅),[3.892]→[3.4319:4335](∅→∅),[3.1300]→[3.4319:4335](∅→∅),[3.9010]→[3.4319:4335](∅→∅),[3.1190]→[3.4319:4335](∅→∅),[3.407]→[3.4319:4335](∅→∅),[3.1092]→[3.4319:4335](∅→∅),[3.1194]→[3.4319:4335](∅→∅),[3.4319]→[3.4319:4335](∅→∅),[3.4335]→[3.9011:9167](∅→∅),[3.943]→[3.4476:4612](∅→∅),[3.2339]→[3.4476:4612](∅→∅),[3.1875]→[3.4476:4612](∅→∅),[3.1034]→[3.4476:4612](∅→∅),[3.1457]→[3.4476:4612](∅→∅),[3.9167]→[3.4476:4612](∅→∅),[3.1333]→[3.4476:4612](∅→∅),[3.453]→[3.4476:4612](∅→∅),[3.1138]→[3.4476:4612](∅→∅),[3.1351]→[3.4476:4612](∅→∅),[3.4476]→[3.4476:4612](∅→∅),[3.4612]→[3.9168:9205](∅→∅),[3.9205]→[2.7982:8041](∅→∅),[3.3750]→[3.9264:9319](∅→∅),[2.8041]→[3.9264:9319](∅→∅),[3.9264]→[3.9264:9319](∅→∅),[3.9319]→[2.8042:14798](∅→∅),[3.7546]→[3.5580:5595](∅→∅),[3.6633]→[3.5580:5595](∅→∅),[3.3719]→[3.5580:5595](∅→∅),[3.6982]→[3.5580:5595](∅→∅),[3.4691]→[3.5580:5595](∅→∅),[3.6822]→[3.5580:5595](∅→∅),[3.3484]→[3.5580:5595](∅→∅),[3.3510]→[3.5580:5595](∅→∅),[3.3851]→[3.5580:5595](∅→∅),[3.4185]→[3.5580:5595](∅→∅),[3.16291]→[3.5580:5595](∅→∅),[3.3480]→[3.5580:5595](∅→∅),[3.2864]→[3.5580:5595](∅→∅),[2.14798]→[3.5580:5595](∅→∅),[3.5580]→[3.5580:5595](∅→∅),[3.5595]→[3.6613:6623](∅→∅),[3.3945]→[3.3349:3355](∅→∅),[3.2690]→[3.3349:3355](∅→∅),[3.1027]→[3.3349:3355](∅→∅),[3.2690]→[3.3349:3355](∅→∅),[3.1842]→[3.3349:3355](∅→∅),[3.6623]→[3.3349:3355](∅→∅),[3.1060]→[3.3349:3355](∅→∅),[3.6318]→[3.3349:3355](∅→∅),[3.1761]→[3.3349:3355](∅→∅),[3.3349]→[3.3349:3355](∅→∅)
#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// ids countercounter: usize,/// map from id of adding item to roster and jid of itempending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,}struct XmppState {client: Client,data: XmppData,}struct XmppElementProcessor {incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,}impl XmppElementProcessor {fn new() -> XmppElementProcessor {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {:#?}", e);true});incoming.register(&XmppConnection::incoming_iq_processing);XmppElementProcessor { incoming }}}struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedfn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => {let processors = XmppElementProcessor::new();processors.incoming.process(self, stanza.clone())}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {use std::convert::TryInto;if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid);} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}}}xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);self.state.data.send_queue.push_back(pong);}}_ => (), // ignore}true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurefn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)} - replacement in src/xmpp/mod.rs at line 7[3.3356]→[3.6624:6691](∅→∅),[3.6691]→[3.16292:16429](∅→∅),[3.3839]→[3.4011:4339](∅→∅),[3.3630]→[3.4011:4339](∅→∅),[3.3989]→[3.4011:4339](∅→∅),[3.16429]→[3.4011:4339](∅→∅),[3.3601]→[3.4011:4339](∅→∅),[3.4756]→[3.4011:4339](∅→∅),[3.4011]→[3.4011:4339](∅→∅),[3.4339]→[3.16430:16484](∅→∅)
/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}mod stanzas; - replacement in src/xmpp/mod.rs at line 9[3.16485]→[2.14799:14898](∅→∅),[3.6900]→[3.16562:16604](∅→∅),[3.1320]→[3.16562:16604](∅→∅),[2.14898]→[3.16562:16604](∅→∅),[3.16562]→[3.16562:16604](∅→∅),[3.16604]→[2.14899:14938](∅→∅),[3.6453]→[3.16639:16735](∅→∅),[3.6936]→[3.16639:16735](∅→∅),[2.14938]→[3.16639:16735](∅→∅),[3.16639]→[3.16639:16735](∅→∅),[3.16735]→[2.14939:15999](∅→∅),[3.7514]→[3.18104:18146](∅→∅),[3.6606]→[3.18104:18146](∅→∅),[3.8330]→[3.18104:18146](∅→∅),[3.2869]→[3.18104:18146](∅→∅),[2.15999]→[3.18104:18146](∅→∅),[3.18104]→[3.18104:18146](∅→∅),[3.18146]→[2.16000:16049](∅→∅),[3.7564]→[3.18374:18412](∅→∅),[3.6835]→[3.18374:18412](∅→∅),[3.3131]→[3.18374:18412](∅→∅),[2.16049]→[3.18374:18412](∅→∅),[3.18374]→[3.18374:18412](∅→∅),[3.18412]→[2.16050:16262](∅→∅),[3.7777]→[3.6991:7025](∅→∅),[2.16262]→[3.6991:7025](∅→∅),[3.6991]→[3.6991:7025](∅→∅),[3.7025]→[2.16263:16436](∅→∅),[3.8520]→[3.7921:7951](∅→∅),[2.16436]→[3.7921:7951](∅→∅),[3.7921]→[3.7921:7951](∅→∅),[3.7951]→[3.18736:18791](∅→∅),[3.7097]→[3.18736:18791](∅→∅),[3.8592]→[3.18736:18791](∅→∅),[3.18736]→[3.18736:18791](∅→∅),[3.18791]→[2.16437:16471](∅→∅),[3.7986]→[3.18879:18957](∅→∅),[3.7186]→[3.18879:18957](∅→∅),[3.8681]→[3.18879:18957](∅→∅),[3.3340]→[3.18879:18957](∅→∅),[2.16471]→[3.18879:18957](∅→∅),[3.18879]→[3.18879:18957](∅→∅),[3.3902]→[3.4363:4377](∅→∅),[3.3693]→[3.4363:4377](∅→∅),[3.4014]→[3.4363:4377](∅→∅),[3.18957]→[3.4363:4377](∅→∅),[3.3664]→[3.4363:4377](∅→∅),[3.4781]→[3.4363:4377](∅→∅),[3.4363]→[3.4363:4377](∅→∅),[3.4377]→[3.18958:19051](∅→∅),[3.19051]→[3.4377:4387](∅→∅),[3.4377]→[3.4377:4387](∅→∅),[3.7793]→[3.8518:8524](∅→∅),[3.2042]→[3.8518:8524](∅→∅),[3.8518]→[3.8518:8524](∅→∅),[3.8524]→[3.19052:19318](∅→∅),[3.19318]→[2.16472:16523](∅→∅),[3.8729]→[3.19369:19421](∅→∅),[2.16523]→[3.19369:19421](∅→∅),[3.19369]→[3.19369:19421](∅→∅),[3.19421]→[3.8524:8525](∅→∅),[3.8524]→[3.8524:8525](∅→∅),[3.8525]→[2.16524:16691](∅→∅),[3.8796]→[3.19641:19750](∅→∅),[2.16691]→[3.19641:19750](∅→∅),[3.19641]→[3.19641:19750](∅→∅),[3.19750]→[2.16692:16738](∅→∅),[3.11465]→[3.19780:20084](∅→∅),[3.8827]→[3.19780:20084](∅→∅),[2.16738]→[3.19780:20084](∅→∅),[3.19780]→[3.19780:20084](∅→∅),[3.20084]→[2.16739:16911](∅→∅),[3.8909]→[3.20165:20486](∅→∅),[3.1638]→[3.20165:20486](∅→∅),[2.16911]→[3.20165:20486](∅→∅),[3.20165]→[3.20165:20486](∅→∅),[3.20486]→[3.7794:7942](∅→∅),[3.8525]→[3.7794:7942](∅→∅),[3.7942]→[3.20487:20570](∅→∅),[3.5010]→[3.8005:8011](∅→∅),[3.5460]→[3.8005:8011](∅→∅),[3.20570]→[3.8005:8011](∅→∅),[3.3728]→[3.8005:8011](∅→∅),[3.6431]→[3.8005:8011](∅→∅),[3.8005]→[3.8005:8011](∅→∅),[3.8011]→[3.20571:20686](∅→∅),[3.5180]→[3.10624:10716](∅→∅),[3.8128]→[3.10624:10716](∅→∅),[3.8796]→[3.10624:10716](∅→∅),[3.5065]→[3.10624:10716](∅→∅),[3.5578]→[3.10624:10716](∅→∅),[3.20686]→[3.10624:10716](∅→∅),[3.3811]→[3.10624:10716](∅→∅),[3.2220]→[3.10624:10716](∅→∅),[3.3485]→[3.10624:10716](∅→∅),[3.3174]→[3.10624:10716](∅→∅),[3.6651]→[3.10624:10716](∅→∅),[3.10624]→[3.10624:10716](∅→∅),[3.3850]→[3.10716:10756](∅→∅),[3.2259]→[3.10716:10756](∅→∅),[3.3524]→[3.10716:10756](∅→∅),[3.10716]→[3.10716:10756](∅→∅),[3.10756]→[3.20687:20740](∅→∅)
fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if iq.id == id_init_roster {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {self.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.data.roster.insert(i.jid, (i.subscription, i.ask));}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)}}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| conn.process_initial_roster(event, &id_init_roster),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {:?}", presence);mod xmpp_connection;use xmpp_connection::MaybeXmppConnection; - edit in src/xmpp/mod.rs at line 12[3.20741]→[3.6652:6667](∅→∅),[3.8183]→[3.6652:6667](∅→∅),[3.6667]→[2.16912:16956](∅→∅),[3.11510]→[3.8226:8315](∅→∅),[3.8938]→[3.8226:8315](∅→∅),[2.16956]→[3.8226:8315](∅→∅),[3.8226]→[3.8226:8315](∅→∅),[3.8315]→[3.20742:20767](∅→∅),[3.5285]→[3.8340:8426](∅→∅),[3.5170]→[3.8340:8426](∅→∅),[3.5620]→[3.8340:8426](∅→∅),[3.20767]→[3.8340:8426](∅→∅),[3.3920]→[3.8340:8426](∅→∅),[3.6693]→[3.8340:8426](∅→∅),[3.8340]→[3.8340:8426](∅→∅),[3.8426]→[3.20768:20823](∅→∅),[3.5335]→[3.8483:8658](∅→∅),[3.3966]→[3.8483:8658](∅→∅),[3.5220]→[3.8483:8658](∅→∅),[3.5678]→[3.8483:8658](∅→∅),[3.4551]→[3.8483:8658](∅→∅),[3.777]→[3.8483:8658](∅→∅),[3.20823]→[3.8483:8658](∅→∅),[3.3956]→[3.8483:8658](∅→∅),[3.6853]→[3.8483:8658](∅→∅),[3.8483]→[3.8483:8658](∅→∅),[3.8658]→[2.16957:17484](∅→∅),[3.8514]→[3.9042:9105](∅→∅),[3.9323]→[3.9042:9105](∅→∅),[3.21208]→[3.9042:9105](∅→∅),[3.4480]→[3.9042:9105](∅→∅),[2.17484]→[3.9042:9105](∅→∅),[3.9042]→[3.9042:9105](∅→∅),[3.9105]→[3.21209:21324](∅→∅),[3.5424]→[3.9220:9320](∅→∅),[3.5309]→[3.9220:9320](∅→∅),[3.5794]→[3.9220:9320](∅→∅),[3.21324]→[3.9220:9320](∅→∅),[3.4569]→[3.9220:9320](∅→∅),[3.6969]→[3.9220:9320](∅→∅),[3.9220]→[3.9220:9320](∅→∅),[3.9320]→[3.21325:21624](∅→∅),[3.21624]→[3.4570:4585](∅→∅),[3.9320]→[3.4570:4585](∅→∅),[3.4585]→[3.21625:21632](∅→∅),[3.21632]→[2.17485:19528](∅→∅),[3.9463]→[3.21686:21716](∅→∅),[2.19528]→[3.21686:21716](∅→∅),[3.21686]→[3.21686:21716](∅→∅),[3.21716]→[2.19529:20613](∅→∅),[3.5300]→[3.23186:23196](∅→∅),[3.755]→[3.23186:23196](∅→∅),[3.9665]→[3.23186:23196](∅→∅),[3.831]→[3.23186:23196](∅→∅),[3.4593]→[3.23186:23196](∅→∅),[3.3544]→[3.23186:23196](∅→∅),[2.20613]→[3.23186:23196](∅→∅),[3.23186]→[3.23186:23196](∅→∅),[3.23196]→[2.20614:23149](∅→∅),[3.9691]→[3.5042:5048](∅→∅),[2.23149]→[3.5042:5048](∅→∅),[3.5042]→[3.5042:5048](∅→∅),[3.5470]→[3.3789:3792](∅→∅),[3.12136]→[3.3789:3792](∅→∅),[3.3125]→[3.3789:3792](∅→∅),[3.3789]→[3.3789:3792](∅→∅)
client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {Ok(presence) => {Ok(presence.from.as_ref() == Some(&conn.state.client.jid))}Err(e) => {warn!("Not a self-presence: {}", e);Ok(false)}}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());self.state.data.pending_add_roster_ids.insert(id_add_roster, xmpp_to.clone());info!("Adding jid to roster... {:?}", add_roster);self.state.data.send_queue.push_back(add_roster);}}}}fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);}}}fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {:?}", muc_presence);use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {:?}", muc_presence);let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}} - replacement in src/xmpp/element_processor.rs at line 0
type Func<S, T, E> = dyn Fn(&mut S, E) -> T;pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}}[3.11301]type Func<S, T, E> = dyn Fn(&mut S, E) -> T;pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}}