Refactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas.
[?]
Apr 14, 2019, 6:33 PM
S754Y5DFUUEOH5RBC363ZSZRWMOZCH4JC2SUBVPEZGYYKA3RETIQCDependencies
- [2]
DYRPAV6TUpdate dependencies - [3]
OB3HA2MDUse Client::new_with_jid to parse jid only once - [4]
BWDUANCVSecond part of processing result is only about stop_future - [5]
LL3D5CXKStaring using element processor - [6]
RRLRZTMRUse element processor for iq - [7]
VS6AHRWIMove XMPP to separate dir - [8]
2VZBEEXAMessages fixed - [9]
OFLAP2G2Fix possible utf8 errors - [10]
X6L47BHQUse different structure for established xmpp connection - [11]
CCLGGFKRMove out XmppConnection into own file - [12]
QWE26TMVupdate deps - [13]
2THKW66MIgnore .orig files - [14]
5A5UVGNMMove receiver closing logic out of xmpp processing - [15]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [16]
FVVPKFTLInitial commit - [17]
EBETRYK7Add counter for id. Check for jid in roster - [*]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel
Change contents
- replacement in src/xmpp/xmpp_connection.rs at line 0
use tokio_xmpp::{Client, Event, Packet};use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use std::collections::{HashMap, VecDeque};use super::XmppCommand;use super::stanzas;use super::element_processor;use crate::config;#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// ids countercounter: usize,/// map from id of adding item to roster and jid of itempending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,}struct XmppState {client: Client,data: XmppData,}pub struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}struct XmppElementProcessor {incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,}impl XmppElementProcessor {fn new() -> XmppElementProcessor {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {:#?}", e);true});incoming.register(&XmppConnection::incoming_iq_processing);XmppElementProcessor { incoming }}}pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedpub fn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => {let processors = XmppElementProcessor::new();processors.incoming.process(self, stanza.clone())}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {use std::convert::TryInto;if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid);} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}}}xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);self.state.data.send_queue.push_back(pong);}}_ => (), // ignore}true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurepub fn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if iq.id == id_init_roster {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {self.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.data.roster.insert(i.jid, (i.subscription, i.ask));}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)}}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| conn.process_initial_roster(event, &id_init_roster),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {:?}", presence);client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {Ok(presence) => {Ok(presence.from.as_ref() == Some(&conn.state.client.jid))}Err(e) => {warn!("Not a self-presence: {}", e);Ok(false)}}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}let sub_from = match rdata.0 {xmpp_parsers::roster::Subscription::From => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if !sub_from {info!("Not subscription from {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());self.state.data.pending_add_roster_ids.insert(id_add_roster, xmpp_to.clone());info!("Adding jid to roster... {:?}", add_roster);self.state.data.send_queue.push_back(add_roster);}}}}pub fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);}}}pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {:?}", muc_presence);use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {:?}", muc_presence);let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}}[3.21]use tokio_xmpp::{Client, Event, Packet};use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use std::collections::{HashMap, VecDeque};use super::XmppCommand;use super::stanzas;use super::element_processor;use crate::config;/// trait of processing iq/// each function consumes handlers and/// returns false if connection should be resettrait IqHandler {/// process resultfn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool;/// process errorfn error(self: Box<Self>,conn: &mut XmppConnection,error: xmpp_parsers::stanza_error::StanzaError,) -> bool;/// process tmeoutfn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;}struct AddRosterIqHandler {jid: xmpp_parsers::Jid,}impl IqHandler for AddRosterIqHandler {fn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool {match opt_element {Some(element) => {warn!("Wrong payload when adding {} to roster: {}",self.jid,String::from(&element));}None => {if conn.state.data.roster.contains_key(&self.jid) {info!("Jid {} updated to roster", self.jid);} else {info!("Jid {} added in roster", self.jid);conn.state.data.roster.insert(self.jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}conn.process_jid(&self.jid);}}true}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {true}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {true // ignore}}#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// ids countercounter: usize,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,/// map from iq's id to handler of this type of iqspending_ids: HashMap<String, Box<dyn IqHandler>>,}struct XmppState {client: Client,data: XmppData,}pub struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}struct XmppElementProcessor {incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,}impl XmppElementProcessor {fn new() -> XmppElementProcessor {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {}", String::from(&e));true});incoming.register(&XmppConnection::incoming_iq_processing);XmppElementProcessor { incoming }}}pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedpub fn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => {let processors = XmppElementProcessor::new();processors.incoming.process(self, stanza.clone())}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}/// Enforce to answer to IQ "set"fn incoming_iq_processing_set(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {use std::convert::TryInto;if let Some(roster) =element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>{// RFC 6212 2.1.6. Roster Pushfor i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);}warn!("Unsupported IQ set request from {:?}: {}",from,String::from(&element));stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)}/// Enforce to answer to IQ "get"fn incoming_iq_processing_get(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {use std::convert::TryInto;if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {return stanzas::make_pong(&id, self.state.client.jid.clone(), from);}warn!("Unsupported IQ get request from {:?}: {}",from,String::from(&element));stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Error(e) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.error(self, e);}error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Result(opt_element) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.result(self, opt_element);}warn!("Unwanted iq result id {} from {:?}: {:?}",iq.id,iq.from,opt_element.map(|e| String::from(&e)));}}true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurepub fn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {}", String::from(&send_element));Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {}", String::from(&s));Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if iq.id == id_init_roster {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {self.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.data.roster.insert(i.jid, (i.subscription, i.ask));}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)}}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| conn.process_initial_roster(event, &id_init_roster),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {:?}", presence);client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {Ok(presence) => {Ok(presence.from.as_ref() == Some(&conn.state.client.jid))}Err(e) => {warn!("Not a self-presence: {}", e);Ok(false)}}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}let sub_from = match rdata.0 {xmpp_parsers::roster::Subscription::From => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if !sub_from {info!("Not subscription from {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);self.state.data.pending_ids.insert(id_add_roster,Box::new(AddRosterIqHandler {jid: xmpp_to.clone(),}),);self.state.data.send_queue.push_back(add_roster);}}}}pub fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);}}}pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {}",String::from(&muc_presence));use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {}", String::from(&muc_presence));let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}} - edit in src/xmpp/stanzas.rs at line 7[3.130][19.141]
use xmpp_parsers::stanza_error::{DefinedCondition, ErrorType, StanzaError}; - edit in src/xmpp/stanzas.rs at line 46
pub fn make_roster_push_answer(id: String,from: xmpp_parsers::Jid,to: Option<xmpp_parsers::Jid>,) -> Iq {let mut answer = Iq::from_result(id, None as Option<Roster>);answer.from = Some(from);answer.to = to;answer} - edit in src/xmpp/stanzas.rs at line 59
presence.to = Some(jid);presence.into()}pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {let mut presence = Presence::new(PresenceType::Subscribed); - replacement in src/xmpp/stanzas.rs at line 106
pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Element {pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Iq { - replacement in src/xmpp/stanzas.rs at line 110
pong.into()pong - edit in src/xmpp/stanzas.rs at line 112[2.30374]
pub fn make_iq_unsupported_error(id: String,from: xmpp_parsers::Jid,to: Option<xmpp_parsers::Jid>,) -> Iq {let mut error = Iq::from_error(id,StanzaError {type_: ErrorType::Cancel,by: Some(from.clone()),defined_condition: DefinedCondition::ServiceUnavailable,texts: std::collections::BTreeMap::new(),other: None,},);error.from = Some(from);error.to = to;error} - replacement in src/xmpp/mod.rs at line 0
use tokio::prelude::future::{self, Either};use tokio::prelude::future::{self, Either, Future}; - edit in src/xmpp/mod.rs at line 2
use tokio::prelude::{Future, Stream};use tokio_xmpp::{Client, Event, Packet}; - replacement in src/xmpp/mod.rs at line 5
use std::collections::{HashMap, VecDeque};mod element_processor; - edit in src/xmpp/mod.rs at line 8[3.29460]→[3.29887:34864](∅→∅),[3.34864]→[2.30375:30479](∅→∅),[2.30479]→[3.35003:35170](∅→∅),[3.35003]→[3.35003:35170](∅→∅),[3.35170]→[2.30480:31004](∅→∅),[2.31004]→[3.36361:36398](∅→∅),[3.36361]→[3.36361:36398](∅→∅),[3.36398]→[2.31005:31419](∅→∅),[2.31419]→[3.36573:36638](∅→∅),[3.36573]→[3.36573:36638](∅→∅),[3.36638]→[2.31420:31699](∅→∅),[2.31699]→[3.36638:36686](∅→∅),[3.36638]→[3.36638:36686](∅→∅),[3.36686]→[2.31700:32886](∅→∅),[2.32886]→[3.37646:37680](∅→∅),[3.37646]→[3.37646:37680](∅→∅),[3.37680]→[2.32887:33701](∅→∅),[2.33701]→[3.37738:37794](∅→∅),[3.37738]→[3.37738:37794](∅→∅),[3.37794]→[2.33702:33745](∅→∅),[2.33745]→[3.37794:37816](∅→∅),[3.37794]→[3.37794:37816](∅→∅),[3.37816]→[2.33746:33952](∅→∅),[2.33952]→[3.37816:37834](∅→∅),[3.37816]→[3.37816:37834](∅→∅),[3.37834]→[2.33953:33974](∅→∅),[2.33974]→[3.37867:37881](∅→∅),[3.37867]→[3.37867:37881](∅→∅),[3.37881]→[2.33975:34010](∅→∅),[2.34010]→[3.37928:37998](∅→∅),[3.37928]→[3.37928:37998](∅→∅),[3.37998]→[2.34011:34033](∅→∅),[2.34033]→[3.38040:39006](∅→∅),[3.38040]→[3.38040:39006](∅→∅),[3.39006]→[3.16484:16485](∅→∅),[3.29460]→[3.16484:16485](∅→∅),[3.16484]→[3.16484:16485](∅→∅),[3.16485]→[3.39007:42308](∅→∅),[3.42308]→[2.34034:34106](∅→∅),[2.34106]→[3.42376:42551](∅→∅),[3.42376]→[3.42376:42551](∅→∅),[3.42551]→[2.34107:35031](∅→∅),[2.35031]→[3.43844:43940](∅→∅),[3.43844]→[3.43844:43940](∅→∅),[3.43940]→[2.35032:35213](∅→∅),[2.35213]→[3.44265:44310](∅→∅),[3.44265]→[3.44265:44310](∅→∅),[3.44310]→[2.35214:35295](∅→∅),[2.35295]→[3.44401:44532](∅→∅),[3.44401]→[3.44401:44532](∅→∅),[3.44532]→[2.35296:35371](∅→∅),[2.35371]→[3.44616:45169](∅→∅),[3.44616]→[3.44616:45169](∅→∅),[3.45169]→[2.35372:35410](∅→∅),[2.35410]→[3.45208:45368](∅→∅),[3.45208]→[3.45208:45368](∅→∅),[3.45368]→[2.35411:35488](∅→∅),[2.35488]→[3.45455:45549](∅→∅),[3.45455]→[3.45455:45549](∅→∅),[3.45549]→[2.35489:35564](∅→∅),[2.35564]→[3.45633:46166](∅→∅),[3.45633]→[3.45633:46166](∅→∅),[3.46166]→[2.35565:35603](∅→∅),[2.35603]→[3.46205:47071](∅→∅),[3.46205]→[3.46205:47071](∅→∅),[3.47071]→[2.35604:35643](∅→∅),[2.35643]→[3.47106:47202](∅→∅),[3.47106]→[3.47106:47202](∅→∅),[3.47202]→[2.35644:36704](∅→∅),[2.36704]→[3.48548:48590](∅→∅),[3.48548]→[3.48548:48590](∅→∅),[3.48590]→[2.36705:36754](∅→∅),[2.36754]→[3.48818:48856](∅→∅),[3.48818]→[3.48818:48856](∅→∅),[3.48856]→[2.36755:36967](∅→∅),[2.36967]→[3.49045:49109](∅→∅),[3.49045]→[3.49045:49109](∅→∅),[3.49109]→[2.36968:37141](∅→∅),[2.37141]→[3.49180:49235](∅→∅),[3.49180]→[3.49180:49235](∅→∅),[3.49235]→[2.37142:37176](∅→∅),[2.37176]→[3.49323:51399](∅→∅),[3.49323]→[3.49323:51399](∅→∅)
#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// ids countercounter: usize,/// map from id of adding item to roster and jid of itempending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,}struct XmppState {client: Client,data: XmppData,}struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedfn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => {info!("Incoming xmpp event: {:?}", stanza);let stanza = stanza.clone();use std::convert::TryInto;if let Some(iq) = stanza.clone().try_into().ok() as Option<xmpp_parsers::iq::Iq> {if let Some((_, jid)) =self.state.data.pending_add_roster_ids.remove_entry(&iq.id){if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid);} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) =self.state.data.roster.get_mut(&i.jid){info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}}}xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) =element.try_into().ok() as Option<xmpp_parsers::ping::Ping>{let pong = stanzas::make_pong(&iq.id,self.state.client.jid.clone(),iq.from,);self.state.data.send_queue.push_back(pong);}}_ => (), // ignore}} else if let Some(_presence) =stanza.try_into().ok() as Option<xmpp_parsers::presence::Presence>{// to do something with presence}true}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurefn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if iq.id == id_init_roster {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {self.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.data.roster.insert(i.jid, (i.subscription, i.ask));}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false)}}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| conn.process_initial_roster(event, &id_init_roster),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 9
let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {:?}", presence);mod xmpp_connection;use xmpp_connection::MaybeXmppConnection; - edit in src/xmpp/mod.rs at line 12[3.51551]→[3.51551:52040](∅→∅),[3.51551]→[3.51551:52040](∅→∅),[3.52040]→[2.37177:37704](∅→∅),[2.37704]→[3.52424:55846](∅→∅),[3.52424]→[3.52424:55846](∅→∅),[3.55846]→[2.37705:38025](∅→∅),[2.38025]→[3.55846:55877](∅→∅),[3.55846]→[3.55846:55877](∅→∅),[3.55877]→[2.38026:39017](∅→∅),[2.39017]→[3.55877:57422](∅→∅),[3.55877]→[3.55877:57422](∅→∅),[3.57422]→[3.20740:20741](∅→∅),[3.29524]→[3.20740:20741](∅→∅),[3.20740]→[3.20740:20741](∅→∅)
client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {use std::convert::TryInto;match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {Ok(presence) => {Ok(presence.from.as_ref() == Some(&conn.state.client.jid))}Err(e) => {warn!("Not a self-presence: {}", e);Ok(false)}}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());self.state.data.pending_add_roster_ids.insert(id_add_roster, xmpp_to.clone());info!("Adding jid to roster... {:?}", add_roster);self.state.data.send_queue.push_back(add_roster);}}}}fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);}}}fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {:?}", muc_presence);use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {:?}", muc_presence);let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}} - replacement in src/xmpp/element_processor.rs at line 0
type Func<S, T, E> = dyn Fn(&mut S, E) -> T;pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}}[3.11301]type Func<S, T, E> = dyn Fn(&mut S, E) -> T;pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}} - replacement in Cargo.lock at line 28
version = "0.3.11"version = "0.3.10" - replacement in Cargo.lock at line 312
"syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1162
"syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1214
"arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)","arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1307
version = "0.15.31"version = "0.15.30" - replacement in Cargo.lock at line 1330
"syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1902
"checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841""checksum arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)" = "a57a5698f85c6fd92f19dad87ff2d822fc4ba79dd85c13914d8c4dad589cb815" - replacement in Cargo.lock at line 2052
"checksum syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b4cfac95805274c6afdb12d8f770fa2d27c045953e7b630a81801953699a9a""checksum syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)" = "66c8865bf5a7cbb662d8b011950060b3c8743dca141b054bf7195b20d314d8e2"