Update dependencies
[?]
Apr 29, 2019, 6:34 PM
SSOKGGCEUV463YHS4Q4WSURV3OWTBUYTENKM6FQE5EB7WNZDJACQCDependencies
- [2]
3DSOPLCGAdd rustdoc - [3]
BYJPYYSMProcess iq ping response - [4]
PJV5HPIFStarting to imlements timeouts for iqs - [5]
ZFBPXPADCleanup timeouted iq requests with ping Output elapsed time. Refactor iq handling. - [6]
PLWPCM47Add id to initital presence - [7]
VS6AHRWIMove XMPP to separate dir - [8]
6UKCVM6EUse new iq processng for initial roster - [9]
LQXBWNFTRemove unneeded requirement - [10]
OFLAP2G2Fix possible utf8 errors - [11]
RRLRZTMRUse element processor for iq - [12]
YTN366WASupport disco#items - [13]
GVZ4JAR5Process self-presence with incoming stanza processor - [14]
DYRPAV6TUpdate dependencies - [15]
LL3D5CXKStaring using element processor - [16]
2THKW66MIgnore .orig files - [17]
S754Y5DFRefactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas. - [18]
RQZCVDFDImplement applying timeout for expired iq await - [19]
WDCZNZOPFix rustdoc - [20]
ZT3YEIVXConsume connection on processing command - [21]
LNUU5R56Support disco#info from XEP-0030 Service Discovery - [22]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [23]
DCMDASHVMention XEP-0050 and XEP-0203 support - [24]
GXQCDLYQUse element processor for incoming iq get - [25]
QDHDTOLMStarting support for commands XEP-0050: Ad-Hoc Commands (there no support in xmpp_parsers still) - [26]
HDLI2X4HIgnore delayed XEP-0203 messages - [27]
Z3NQEYVIRename IqSetHandler to IqResuestHandler as it should provide both get and set handling - [28]
AEH7WP42Make element processors static - [29]
FVVPKFTLInitial commit - [30]
YEMBT7TBAdd support for XEP-0092: Software Version - [31]
JY4F7VBCUse element processor for incoming iq set - [32]
CCLGGFKRMove out XmppConnection into own file - [33]
4IPZTMFIUpdate dependencies
Change contents
- replacement in src/xmpp/xmpp_connection.rs at line 0
use tokio_xmpp::{Client, Event, Packet};use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use std::collections::{HashMap, VecDeque};use std::time::{Duration, Instant};use super::stanzas;use super::element_processor;use crate::config;#[derive(Debug)]pub enum XmppCommand {/// Send message to someone by jidChat {xmpp_to: xmpp_parsers::Jid,message: String,},/// Send message to MUCChatroom { muc_id: String, message: String },/// Send ping request to the server to test connectionPing,/// Check iq requests if some have expired timeoutsTimeoutCleanup,}/// trait of processing iq/// each function consumes handlers and/// returns false if connection should be resettrait IqHandler {/// process resultfn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool;/// process errorfn error(self: Box<Self>,conn: &mut XmppConnection,error: xmpp_parsers::stanza_error::StanzaError,) -> bool;/// process tmeoutfn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;}struct AddRosterIqHandler {jid: xmpp_parsers::Jid,}impl IqHandler for AddRosterIqHandler {fn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool {match opt_element {Some(element) => {warn!("Wrong payload when adding {} to roster: {}",self.jid,String::from(&element));}None => {if conn.state.data.roster.contains_key(&self.jid) {info!("Jid {} updated to roster", self.jid);} else {info!("Jid {} added in roster", self.jid);conn.state.data.roster.insert(self.jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}conn.process_jid(&self.jid);}}true}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {true}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {true // ignore}}struct PingIqHandler {}impl IqHandler for PingIqHandler {fn result(self: Box<Self>,_conn: &mut XmppConnection,_opt_element: Option<xmpp_parsers::Element>,) -> bool {info!("ping successed");true}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {false}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {false}}struct InitRosterIqHandler {}impl IqHandler for InitRosterIqHandler {fn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool {if let Some(result) = opt_element {use std::convert::TryInto;match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {conn.state.data.roster_init = true;conn.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);conn.state.data.roster.insert(i.jid, (i.subscription, i.ask));}true}Err(e) => {error!("Cann't parse roster: {}", e);false}}} else {error!("No roster responded");false}}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {false}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {false}}#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// if roster was initialized/// ToDo: remove it as it is used only for initializationroster_init: bool,/// if self-presence accepted/// ToDo: remove it as it is used only for initializationself_presence: bool,/// ids countercounter: usize,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,/// map from iq's id to handler of this type of iqspending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,}struct XmppState {client: Client,data: XmppData,}pub struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}trait IqRequestHandler {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq;}struct IqRequestUnknown {element: xmpp_parsers::Element,type_: &'static str,}impl IqRequestHandler for IqRequestUnknown {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {warn!("Unsupported IQ {} request from {:?}: {}",self.type_,from,String::from(&self.element));stanzas::make_iq_unsupported_error(id, conn.state.client.jid.clone(), from)}}struct IqSetRoster {}impl IqRequestHandler for IqSetRoster {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got roster push {} from {:?}", id, from);stanzas::make_roster_push_answer(id, conn.state.client.jid.clone(), from)}}struct IqGetPing {}impl IqRequestHandler for IqGetPing {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got ping {} from {:?}", id, from);stanzas::make_pong(id, conn.state.client.jid.clone(), from)}}struct IqGetVersion {}impl IqRequestHandler for IqGetVersion {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got version query {} from {:?}", id, from);stanzas::make_version(id, conn.state.client.jid.clone(), from)}}struct IqGetDiscoInfo {}impl IqRequestHandler for IqGetDiscoInfo {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got disco query {} from {:?}", id, from);stanzas::make_disco_info_result(id, conn.state.client.jid.clone(), from)}}struct IqGetDiscoItems {}impl IqRequestHandler for IqGetDiscoItems {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got disco items query {} from {:?}", id, from);stanzas::make_disco_items_result(id, conn.state.client.jid.clone(), from)}}lazy_static! {static ref INCOMING: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element> = {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {}", String::from(&e));true});incoming.register(&XmppConnection::incoming_iq_processing);incoming.register(&XmppConnection::incoming_presence_processing);incoming.register(&XmppConnection::incoming_message_processing);incoming};static ref INCOMING_IQ_SET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {let mut iq_set =element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {Box::new(IqRequestUnknown {element,type_: "set",}) as Box<dyn IqRequestHandler>});iq_set.register(&XmppConnection::incoming_iq_processing_set_roster);iq_set};static ref INCOMING_IQ_GET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {let mut iq_get =element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {Box::new(IqRequestUnknown {element,type_: "get",}) as Box<dyn IqRequestHandler>});iq_get.register(&XmppConnection::incoming_iq_processing_get_ping);iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_info);iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_items);iq_get.register(&XmppConnection::incoming_iq_processing_get_version);iq_get};}pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedpub fn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => INCOMING.process(self, stanza.clone()),Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}/// Process roster push/// see RFC 6212 2.1.6. Roster Pushfn incoming_iq_processing_set_roster(&mut self,roster: xmpp_parsers::roster::Roster,) -> Box<dyn IqRequestHandler> {for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}Box::new(IqSetRoster {})}/// Enforce to answer to IQ "set"fn incoming_iq_processing_set(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {INCOMING_IQ_SET.process(self, element).process(self, id, from)}/// Process ping request/// see XEP-0199: XMPP Pingfn incoming_iq_processing_get_ping(&mut self,_ping: xmpp_parsers::ping::Ping,) -> Box<dyn IqRequestHandler> {Box::new(IqGetPing {})}/// Process disco query/// see XEP-0030: Service Discoveryfn incoming_iq_processing_get_disco_info(&mut self,disco: xmpp_parsers::disco::DiscoInfoQuery,) -> Box<dyn IqRequestHandler> {if let Some(ref node) = disco.node {warn!("Unsupported node {}", node);Box::new(IqRequestUnknown {element: disco.into(),type_: "get",})} else {Box::new(IqGetDiscoInfo {})}}/// Process disco items query/// see XEP-0030: Service Discoveryfn incoming_iq_processing_get_disco_items(&mut self,disco: xmpp_parsers::disco::DiscoItemsQuery,) -> Box<dyn IqRequestHandler> {if let Some(ref node) = disco.node {warn!("Unsupported node {}", node);Box::new(IqRequestUnknown {element: disco.into(),type_: "get",})} else {Box::new(IqGetDiscoItems {})}}/// Process version query/// see XEP-0092: Software Versionfn incoming_iq_processing_get_version(&mut self,_version: xmpp_parsers::version::VersionQuery,) -> Box<dyn IqRequestHandler> {Box::new(IqGetVersion {})}/// Enforce to answer to IQ "get"fn incoming_iq_processing_get(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {INCOMING_IQ_GET.process(self, element).process(self, id, from)}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Error(e) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.1.error(self, e);}error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Result(opt_element) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.1.result(self, opt_element);}warn!("Unwanted iq result id {} from {:?}: {:?}",iq.id,iq.from,opt_element.map(|e| String::from(&e)));}}true}fn incoming_presence_processing(&mut self, presence: xmpp_parsers::presence::Presence) -> bool {if presence.from.as_ref() == Some(&self.state.client.jid) {info!("Self-presence accepted");self.state.data.self_presence = true;} else {warn!("Incoming presence stanza: {:?}", presence);}true}fn incoming_message_processing(&mut self, message: xmpp_parsers::message::Message) -> bool {for payload in message.payloads.iter() {use std::convert::TryInto;if let Some(_delay) =payload.clone().try_into().ok() as Option<xmpp_parsers::delay::Delay>{return true; // ignore delayed messages}}warn!("Incoming message stanza: {:?}", message);true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurepub fn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {// ToDo: check timeouts if iqslet XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {}", String::from(&send_element));Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {}", String::from(&s));Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {}", String::from(&get_roster));data.pending_ids.insert(id_init_roster.clone(),(Instant::now() + Duration::from_secs(60),Box::new(InitRosterIqHandler {}),),);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {}", String::from(&presence));client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, _| Ok(conn.state.data.self_presence),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}let sub_from = match rdata.0 {xmpp_parsers::roster::Subscription::From => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if !sub_from {info!("Not subscription from {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);self.state.data.pending_ids.insert(id_add_roster,(Instant::now() + Duration::from_secs(60),Box::new(AddRosterIqHandler {jid: xmpp_to.clone(),}),),);self.state.data.send_queue.push_back(add_roster);}}}}pub fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);self.state.data.pending_ids.insert(id_ping,(Instant::now() + Duration::from_secs(30),Box::new(PingIqHandler {}),),);}XmppCommand::TimeoutCleanup => {let now = Instant::now();let timeouted: Vec<String> = self.state.data.pending_ids.iter().filter_map(|(id, (timeout, _))| {if now >= *timeout {Some(id.to_string())} else {None}}).collect();let mut correct = true;timeouted.into_iter().for_each(|id| {if let Some((_, handler)) = self.state.data.pending_ids.remove(&id) {correct &= handler.timeout(&mut self);}})}}}pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {}",String::from(&muc_presence));use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {}", String::from(&muc_presence));let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}}[3.21]use tokio_xmpp::{Client, Event, Packet};use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use std::collections::{HashMap, VecDeque};use std::time::{Duration, Instant};use super::stanzas;use super::element_processor;use crate::config;#[derive(Debug)]pub enum XmppCommand {/// Send message to someone by jidChat {xmpp_to: xmpp_parsers::Jid,message: String,},/// Send message to MUCChatroom { muc_id: String, message: String },/// Send ping request to the server to test connectionPing,/// Check iq requests if some have expired timeoutsTimeoutCleanup,}/// trait of processing iq/// each function consumes handlers and/// returns false if connection should be resettrait IqHandler {/// process resultfn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool;/// process errorfn error(self: Box<Self>,conn: &mut XmppConnection,error: xmpp_parsers::stanza_error::StanzaError,) -> bool;/// process tmeoutfn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;}struct AddRosterIqHandler {jid: xmpp_parsers::Jid,}impl IqHandler for AddRosterIqHandler {fn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool {match opt_element {Some(element) => {warn!("Wrong payload when adding {} to roster: {}",self.jid,String::from(&element));}None => {if conn.state.data.roster.contains_key(&self.jid) {info!("Jid {} updated to roster", self.jid);} else {info!("Jid {} added in roster", self.jid);conn.state.data.roster.insert(self.jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}conn.process_jid(&self.jid);}}true}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {true}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {true // ignore}}struct PingIqHandler {}impl IqHandler for PingIqHandler {fn result(self: Box<Self>,_conn: &mut XmppConnection,_opt_element: Option<xmpp_parsers::Element>,) -> bool {info!("ping successed");true}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {false}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {false}}struct InitRosterIqHandler {}impl IqHandler for InitRosterIqHandler {fn result(self: Box<Self>,conn: &mut XmppConnection,opt_element: Option<xmpp_parsers::Element>,) -> bool {if let Some(result) = opt_element {use std::convert::TryInto;match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {Ok(roster) => {conn.state.data.roster_init = true;conn.state.data.roster.clear();info!("Got first roster:");for i in roster.items {info!(" >>> {:?}", i);conn.state.data.roster.insert(i.jid, (i.subscription, i.ask));}true}Err(e) => {error!("Cann't parse roster: {}", e);false}}} else {error!("No roster responded");false}}fn error(self: Box<Self>,_conn: &mut XmppConnection,_error: xmpp_parsers::stanza_error::StanzaError,) -> bool {false}fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {false}}#[derive(Default)]struct XmppData {/// known roster dataroster: HashMap<xmpp_parsers::Jid,(xmpp_parsers::roster::Subscription,xmpp_parsers::roster::Ask,),>,/// if roster was initialized/// ToDo: remove it as it is used only for initializationroster_init: bool,/// if self-presence accepted/// ToDo: remove it as it is used only for initializationself_presence: bool,/// ids countercounter: usize,/// stanzas to sendsend_queue: VecDeque<minidom::Element>,/// outgoing mailboxoutgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,/// muc id to muc jidmucs: HashMap<String, xmpp_parsers::Jid>,/// map from iq's id to handler of this type of iqspending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,}struct XmppState {client: Client,data: XmppData,}pub struct XmppConnection {account: std::rc::Rc<config::Account>,state: XmppState,}trait IqRequestHandler {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq;}struct IqRequestUnknown {element: xmpp_parsers::Element,type_: &'static str,}impl IqRequestHandler for IqRequestUnknown {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {warn!("Unsupported IQ {} request from {:?}: {}",self.type_,from,String::from(&self.element));stanzas::make_iq_unsupported_error(id, conn.state.client.jid.clone(), from)}}struct IqSetRoster {}impl IqRequestHandler for IqSetRoster {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got roster push {} from {:?}", id, from);stanzas::make_roster_push_answer(id, conn.state.client.jid.clone(), from)}}struct IqGetPing {}impl IqRequestHandler for IqGetPing {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got ping {} from {:?}", id, from);stanzas::make_pong(id, conn.state.client.jid.clone(), from)}}struct IqGetVersion {}impl IqRequestHandler for IqGetVersion {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got version query {} from {:?}", id, from);stanzas::make_version(id, conn.state.client.jid.clone(), from)}}struct IqGetDiscoInfo {}impl IqRequestHandler for IqGetDiscoInfo {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got disco query {} from {:?}", id, from);stanzas::make_disco_info_result(id, conn.state.client.jid.clone(), from)}}struct IqGetDiscoItems {}impl IqRequestHandler for IqGetDiscoItems {fn process(self: Box<Self>,conn: &mut XmppConnection,id: String,from: Option<xmpp_parsers::Jid>,) -> xmpp_parsers::iq::Iq {info!("Got disco items query {} from {:?}", id, from);stanzas::make_disco_items_result(id, conn.state.client.jid.clone(), from)}}lazy_static! {static ref INCOMING: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element> = {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {}", String::from(&e));true});incoming.register(&XmppConnection::incoming_iq_processing);incoming.register(&XmppConnection::incoming_presence_processing);incoming.register(&XmppConnection::incoming_message_processing);incoming};static ref INCOMING_IQ_SET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {let mut iq_set =element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {Box::new(IqRequestUnknown {element,type_: "set",}) as Box<dyn IqRequestHandler>});iq_set.register(&XmppConnection::incoming_iq_processing_set_roster);iq_set};static ref INCOMING_IQ_GET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {let mut iq_get =element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {Box::new(IqRequestUnknown {element,type_: "get",}) as Box<dyn IqRequestHandler>});iq_get.register(&XmppConnection::incoming_iq_processing_get_ping);iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_info);iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_items);iq_get.register(&XmppConnection::incoming_iq_processing_get_version);iq_get};}pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,state: Option<XmppState>,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,state: Some(from.state),}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection {/// connects if nothing connected/// don't connect only if stop_future resolvedpub fn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{info!("xmpp connection...");let MaybeXmppConnection { account, state } = self;if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state }))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(stop_future.clone().select2(future::loop_fn(account, move |account| {info!("xmpp initialization...");let client =Client::new_with_jid(account.jid.clone(), &account.password);info!("xmpp initialized");let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone();let stop_future4 = stop_future.clone();// future to wait for onlineBox::new(XmppConnection {state: XmppState {client,data: std::default::Default::default(),},account,}.processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).and_then(|conn| conn.enter_mucs(stop_future4)).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}),)}).map_err(|_: ()| ()),).then(|r| match r {Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),)}}}impl XmppConnection {/// base XMPP processing/// Returns false on error to disconnectfn xmpp_processing(&mut self, event: &Event) -> bool {match event {Event::Stanza(stanza) => INCOMING.process(self, stanza.clone()),Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}/// Process roster push/// see RFC 6212 2.1.6. Roster Pushfn incoming_iq_processing_set_roster(&mut self,roster: xmpp_parsers::roster::Roster,) -> Box<dyn IqRequestHandler> {for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}Box::new(IqSetRoster {})}/// Enforce to answer to IQ "set"fn incoming_iq_processing_set(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {INCOMING_IQ_SET.process(self, element).process(self, id, from)}/// Process ping request/// see XEP-0199: XMPP Pingfn incoming_iq_processing_get_ping(&mut self,_ping: xmpp_parsers::ping::Ping,) -> Box<dyn IqRequestHandler> {Box::new(IqGetPing {})}/// Process disco query/// see XEP-0030: Service Discoveryfn incoming_iq_processing_get_disco_info(&mut self,disco: xmpp_parsers::disco::DiscoInfoQuery,) -> Box<dyn IqRequestHandler> {if let Some(ref node) = disco.node {warn!("Unsupported node {}", node);Box::new(IqRequestUnknown {element: disco.into(),type_: "get",})} else {Box::new(IqGetDiscoInfo {})}}/// Process disco items query/// see XEP-0030: Service Discoveryfn incoming_iq_processing_get_disco_items(&mut self,disco: xmpp_parsers::disco::DiscoItemsQuery,) -> Box<dyn IqRequestHandler> {if let Some(ref node) = disco.node {warn!("Unsupported node {}", node);Box::new(IqRequestUnknown {element: disco.into(),type_: "get",})} else {Box::new(IqGetDiscoItems {})}}/// Process version query/// see XEP-0092: Software Versionfn incoming_iq_processing_get_version(&mut self,_version: xmpp_parsers::version::VersionQuery,) -> Box<dyn IqRequestHandler> {Box::new(IqGetVersion {})}/// Enforce to answer to IQ "get"fn incoming_iq_processing_get(&mut self,id: String,from: Option<xmpp_parsers::Jid>,element: minidom::Element,) -> xmpp_parsers::iq::Iq {INCOMING_IQ_GET.process(self, element).process(self, id, from)}fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Error(e) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.1.error(self, e);}error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);self.state.data.send_queue.push_back(iq_answer.into());}xmpp_parsers::iq::IqType::Result(opt_element) => {if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {return handler.1.result(self, opt_element);}warn!("Unwanted iq result id {} from {:?}: {:?}",iq.id,iq.from,opt_element.map(|e| String::from(&e)));}}true}fn incoming_presence_processing(&mut self, presence: xmpp_parsers::presence::Presence) -> bool {if presence.from.as_ref() == Some(&self.state.client.jid) {info!("Self-presence accepted");self.state.data.self_presence = true;} else {warn!("Incoming presence stanza: {:?}", presence);}true}fn incoming_message_processing(&mut self, message: xmpp_parsers::message::Message) -> bool {for payload in message.payloads.iter() {use std::convert::TryInto;if let Some(_delay) =payload.clone().try_into().ok() as Option<xmpp_parsers::delay::Delay>{return true; // ignore delayed messages}}warn!("Incoming message stanza: {:?}", message);true}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_futurepub fn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {// ToDo: check timeouts if iqslet XmppConnection {state: XmppState { client, mut data },account,} = xmpp;if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {}", String::from(&send_element));Box::new(client.send(Packet::Stanza(send_element)).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,},b,stop_condition,))))as Box<dyn Future<Item = _, Error = _>>}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),}),) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}},)}/// get connection and wait for online status and set presence/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {}", String::from(&s));Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: 'static,{let XmppConnection {account,state: XmppState { client, mut data },} = self;use tokio::prelude::Sink;data.counter += 1;let id_init_roster = format!("id_init_roster{}", data.counter);let get_roster = stanzas::make_get_roster(&id_init_roster);let account2 = account.clone();info!("Quering roster... {}", String::from(&get_roster));data.pending_ids.insert(id_init_roster.clone(),(Instant::now() + Duration::from_secs(60),Box::new(InitRosterIqHandler {}),),);client.send(Packet::Stanza(get_roster)).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection {account,state: XmppState { client, data },} = self;use tokio::prelude::Sink;let presence = stanzas::make_presence(&account);let account2 = account.clone();info!("Sending presence... {}", String::from(&presence));client.send(Packet::Stanza(presence)).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, data },account,}.processing(move |conn, _| Ok(conn.state.data.self_presence),stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})}fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {if !mailbox.is_empty() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {info!("Jid {} in roster", xmpp_to);let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", xmpp_to);self.state.data.send_queue.extend(mailbox.drain(..).map(|message| {stanzas::make_chat_message(xmpp_to.clone(), message)}),);} else if rdata.1 == xmpp_parsers::roster::Ask::None {info!("Not subscribed to {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));}let sub_from = match rdata.0 {xmpp_parsers::roster::Subscription::From => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if !sub_from {info!("Not subscription from {}", xmpp_to);self.state.data.send_queue.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));}} else {info!("Jid {} not in roster", xmpp_to);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);self.state.data.pending_ids.insert(id_add_roster,(Instant::now() + Duration::from_secs(60),Box::new(AddRosterIqHandler {jid: xmpp_to.clone(),}),),);self.state.data.send_queue.push_back(add_roster);}}}}pub fn process_command(&mut self, cmd: XmppCommand) {info!("Got command");match cmd {XmppCommand::Chat { xmpp_to, message } => {self.state.data.outgoing_mailbox.entry(xmpp_to.clone()).or_default().push(message);self.process_jid(&xmpp_to);}XmppCommand::Chatroom { muc_id, message } => {if let Some(muc) = self.state.data.mucs.get(&muc_id) {self.state.data.send_queue.push_back(stanzas::make_muc_message(muc.clone(), message));} else {error!("Not found MUC {}", muc_id);}}XmppCommand::Ping => {self.state.data.counter += 1;let id_ping = format!("id_ping{}", self.state.data.counter);let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());self.state.data.send_queue.push_back(ping);self.state.data.pending_ids.insert(id_ping,(Instant::now() + Duration::from_secs(30),Box::new(PingIqHandler {}),),);}XmppCommand::TimeoutCleanup => {let now = Instant::now();let timeouted: Vec<String> = self.state.data.pending_ids.iter().filter_map(|(id, (timeout, _))| {if now >= *timeout {Some(id.to_string())} else {None}}).collect();let mut correct = true;timeouted.into_iter().for_each(|id| {if let Some((_, handler)) = self.state.data.pending_ids.remove(&id) {correct &= handler.timeout(&mut self);}})}}}pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {info!("Shutdown connection");let XmppConnection { account, state } = self;stream::iter_ok(state.data.mucs.values().map(std::clone::Clone::clone).collect::<Vec<_>>(),).fold(state, move |XmppState { client, data }, muc_jid| {let muc_presence =stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());info!("Sending muc leave presence... {}",String::from(&muc_presence));use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);e}).and_then(|client| future::ok(XmppState { client, data }))}).map(|_| ())}fn enter_mucs<F, E>(self,_stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,{let XmppConnection { account, state } = self;let account2 = account.clone();let account3 = account.clone();stream::iter_ok(account.chatrooms.clone()).fold(state, move |XmppState { client, mut data }, muc_jid| {data.counter += 1;let id_muc_presence = format!("id_muc_presence{}", data.counter);let muc_presence = stanzas::make_muc_presence(&id_muc_presence,account2.jid.clone(),muc_jid.1.clone(),);info!("Sending muc presence... {}", String::from(&muc_presence));let account4 = account2.clone();use tokio::prelude::Sink;client.send(Packet::Stanza(muc_presence)).map_err(|e| {error!("Error on send muc presence: {}", e);account4}).and_then(|client| {data.mucs.insert(muc_jid.0, muc_jid.1);future::ok(XmppState { client, data })})}).map(|state| XmppConnection {account: account3,state,})}} - edit in src/xmpp/mod.rs at line 11[3.38811]→[3.296:297](∅→∅),[3.34286]→[3.296:297](∅→∅),[2.39194]→[3.296:297](∅→∅),[3.296]→[3.296:297](∅→∅),[3.297]→[3.42774:42797](∅→∅)
mod element_processor; - edit in src/xmpp/mod.rs at line 37
}struct XmppElementProcessor {incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,}impl XmppElementProcessor {fn new() -> XmppElementProcessor {let mut incoming = element_processor::Processor::new(&|_, e| {warn!("Unknown stanza {:#?}", e);true});incoming.register(&XmppConnection::incoming_iq_processing);XmppElementProcessor { incoming }} - replacement in src/xmpp/mod.rs at line 154
let processors = XmppElementProcessor::new();processors.incoming.process(self, stanza.clone())}Event::Online => true,e => {warn!("Unexpected event {:?}", e);false}}}info!("Incoming xmpp event: {:?}", stanza);let stanza = stanza.clone(); - replacement in src/xmpp/mod.rs at line 157
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {use std::convert::TryInto;if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid);} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;use std::convert::TryInto;if let Some(iq) = stanza.clone().try_into().ok() as Option<xmpp_parsers::iq::Iq> {if let Some((_, jid)) =self.state.data.pending_add_roster_ids.remove_entry(&iq.id){if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {if self.state.data.roster.contains_key(&jid) {info!("Jid {} updated to roster", jid);} else {info!("Jid {} added in roster", jid);self.state.data.roster.insert(jid.clone(),(xmpp_parsers::roster::Subscription::None,xmpp_parsers::roster::Ask::None,),);}self.process_jid(&jid); - replacement in src/xmpp/mod.rs at line 177
info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload);}}match iq.payload {xmpp_parsers::iq::IqType::Set(element) => {if let Some(roster) =element.try_into().ok() as Option<xmpp_parsers::roster::Roster>{for i in roster.items {if let Some(ref mut rdata) =self.state.data.roster.get_mut(&i.jid){info!("Update {} in roster", i.jid);rdata.0 = i.subscription;rdata.1 = i.ask;} else {info!("Add {} to roster", i.jid);self.state.data.roster.insert(i.jid.clone(), (i.subscription, i.ask));}self.process_jid(&i.jid);}}}xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;}xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) =element.try_into().ok() as Option<xmpp_parsers::ping::Ping>{let pong = stanzas::make_pong(&iq.id,self.state.client.jid.clone(),iq.from,);self.state.data.send_queue.push_back(pong);} - replacement in src/xmpp/mod.rs at line 222
self.process_jid(&i.jid);_ => (), // ignore - edit in src/xmpp/mod.rs at line 224
} else if let Some(_presence) =stanza.try_into().ok() as Option<xmpp_parsers::presence::Presence>{// to do something with presence - edit in src/xmpp/mod.rs at line 229
true - replacement in src/xmpp/mod.rs at line 231
xmpp_parsers::iq::IqType::Error(e) => {error!("iq error: {:?}", e);return false;Event::Online => true,e => {warn!("Unexpected event {:?}", e);false - edit in src/xmpp/mod.rs at line 236
xmpp_parsers::iq::IqType::Get(element) => {if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);self.state.data.send_queue.push_back(pong);}}_ => (), // ignore - edit in src/xmpp/mod.rs at line 237
true - replacement in src/xmpp/element_processor.rs at line 0
type Func<S, T, E> = dyn Fn(&mut S, E) -> T;/// TryFrom based visitorpub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}}[3.11301]type Func<S, T, E> = dyn Fn(&mut S, E) -> T;/// TryFrom based visitorpub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {processors: Vec<Box<Func<S, Option<T>, E>>>,default: &'static Func<S, T, E>,}impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {pub fn new<F>(f: &'static F) -> Processor<S, T, E>whereF: Fn(&mut S, E) -> T + 'static,{Processor {processors: vec![],default: f,}}pub fn register<F, A>(&mut self, f: &'static F)whereF: Fn(&mut S, A) -> T + 'static,A: std::convert::TryFrom<E>,{self.processors.push(Box::new(move |s, e: E| {use std::convert::TryInto;(e.try_into().ok() as Option<A>).map(|a| f(s, a))}));}pub fn process(&self, s: &mut S, e: E) -> T {for processor in self.processors.iter() {match processor(s, e.clone()) {Some(t) => return t,None => continue,}}(*self.default)(s, e)}} - edit in src/main.rs at line 215
use hyper::rt::{Future, Stream}; - edit in Cargo.lock at line 2[3.47733]→[3.54799:54933](∅→∅),[3.54799]→[3.54799:54933](∅→∅),[3.54933]→[3.47734:47806](∅→∅),[3.69701]→[3.55005:55008](∅→∅),[3.47806]→[3.55005:55008](∅→∅),[3.55005]→[3.55005:55008](∅→∅)
[[package]]name = "MacTypes-sys"version = "2.1.0"source = "registry+https://github.com/rust-lang/crates.io-index"dependencies = ["libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",] - replacement in Cargo.lock at line 20
version = "0.3.10"version = "0.3.11" - replacement in Cargo.lock at line 36[3.54069]→[3.47994:48066](∅→∅),[3.55363]→[3.54141:54215](∅→∅),[3.47128]→[3.54141:54215](∅→∅),[3.69960]→[3.54141:54215](∅→∅),[3.48066]→[3.54141:54215](∅→∅),[3.54141]→[3.54141:54215](∅→∅)
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)","termion 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 54
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 65
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 94
version = "0.7.0"version = "0.7.3" - replacement in Cargo.lock at line 97
"block-padding 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)","block-padding 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 105
version = "0.1.3"version = "0.1.4" - replacement in Cargo.lock at line 179
version = "0.5.1"version = "0.6.4" - replacement in Cargo.lock at line 182
"core-foundation-sys 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 188[3.59104]→[3.59104:59122](∅→∅),[3.59104]→[3.59104:59122](∅→∅),[3.59104]→[3.59104:59122](∅→∅),[3.59104]→[3.59104:59122](∅→∅),[3.59104]→[3.59104:59122](∅→∅)
version = "0.5.1"version = "0.6.2" - edit in Cargo.lock at line 190[3.59703]→[3.59703:59720](∅→∅),[3.59703]→[3.59703:59720](∅→∅),[3.59703]→[3.59703:59720](∅→∅),[3.59703]→[3.59703:59720](∅→∅),[3.59720]→[3.48926:48998](∅→∅),[3.71524]→[3.60182:60184](∅→∅),[3.48998]→[3.60182:60184](∅→∅),[3.60182]→[3.60182:60184](∅→∅)
dependencies = ["libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",] - replacement in Cargo.lock at line 273
"regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)","regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 299
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 301
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.33 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 409
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 453
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)","tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 458
"tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)","tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 483
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 540
version = "0.2.51"version = "0.2.53" - replacement in Cargo.lock at line 628
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 642
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 659[3.75588]→[3.75588:75606](∅→∅),[3.75588]→[3.75588:75606](∅→∅),[3.75588]→[3.75588:75606](∅→∅),[3.75588]→[3.75588:75606](∅→∅),[3.75588]→[3.75588:75606](∅→∅)
version = "0.2.2"version = "0.2.3" - replacement in Cargo.lock at line 663
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 669
"security-framework 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)","security-framework-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)","security-framework 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)","security-framework-sys 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 680
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 712
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - edit in Cargo.lock at line 716
name = "numtoa"version = "0.1.0"source = "registry+https://github.com/rust-lang/crates.io-index"[[package]] - replacement in Cargo.lock at line 734
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 749
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 777
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 844
version = "0.4.27"version = "0.4.29" - replacement in Cargo.lock at line 876
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 886
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 897
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 952
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 964
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1010
version = "1.1.5"version = "1.1.6" - replacement in Cargo.lock at line 1092
version = "0.2.2"version = "0.3.1" - replacement in Cargo.lock at line 1095[3.89944]→[3.89944:90112](∅→∅),[3.89944]→[3.89944:90112](∅→∅),[3.89944]→[3.89944:90112](∅→∅),[3.90112]→[3.55843:55915](∅→∅),[3.78881]→[3.72783:72872](∅→∅),[3.55915]→[3.72783:72872](∅→∅),[3.72783]→[3.72783:72872](∅→∅)
"core-foundation 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation-sys 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","security-framework-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)","security-framework-sys 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1103
version = "0.2.3"version = "0.3.1" - replacement in Cargo.lock at line 1106[3.90420]→[3.72892:72971](∅→∅),[3.72971]→[3.90499:90585](∅→∅),[3.90499]→[3.90499:90585](∅→∅),[3.90585]→[3.55916:55988](∅→∅)
"MacTypes-sys 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation-sys 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1134
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)","tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1152
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1154
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.33 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1172
"block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)","block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1183
"block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)","block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1191
version = "0.8.1"version = "0.8.2" - replacement in Cargo.lock at line 1194
"block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)","block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1203
version = "0.1.8"version = "0.1.9"source = "registry+https://github.com/rust-lang/crates.io-index"dependencies = ["libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)","signal-hook-registry 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",][[package]]name = "signal-hook-registry"version = "1.0.0" - replacement in Cargo.lock at line 1215
"arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1240
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1276
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1308
version = "0.15.30"version = "0.15.33" - replacement in Cargo.lock at line 1311
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1329
"proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)","proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1331
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)","syn 0.15.33 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1341
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1368
version = "1.5.1"version = "1.5.2" - replacement in Cargo.lock at line 1371
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)","numtoa 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1398
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1405
version = "0.1.18"version = "0.1.19" - replacement in Cargo.lock at line 1418
"tokio-sync 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)","tokio-sync 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1420
"tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)","tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1470
"tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)","tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1498
"tokio-sync 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)","tokio-sync 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1507
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1510
"signal-hook 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)","signal-hook 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1519
version = "0.1.4"version = "0.1.5" - replacement in Cargo.lock at line 1541
version = "0.1.13"version = "0.1.14" - replacement in Cargo.lock at line 1572[3.83477]→[3.108313:108390](∅→∅),[3.70950]→[3.108313:108390](∅→∅),[3.85993]→[3.108313:108390](∅→∅),[3.63088]→[3.108313:108390](∅→∅),[3.108313]→[3.108313:108390](∅→∅)
"native-tls 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)","native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1606
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)","libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1624
"native-tls 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)","native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1627
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)","tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1682
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)","tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1880
"sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)","sha3 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", - replacement in Cargo.lock at line 1897
"sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)","sha3 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", - edit in Cargo.lock at line 1901
"checksum MacTypes-sys 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eaf9f0d0b1cc33a4d2aee14fb4b2eac03462ef4db29c8ac4057327d8a71ad86f" - replacement in Cargo.lock at line 1903
"checksum arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)" = "a57a5698f85c6fd92f19dad87ff2d822fc4ba79dd85c13914d8c4dad589cb815""checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841" - replacement in Cargo.lock at line 1912[3.91549]→[3.67261:67416](∅→∅),[3.89630]→[3.91704:91860](∅→∅),[3.67416]→[3.91704:91860](∅→∅),[3.91704]→[3.91704:91860](∅→∅)
"checksum block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49665c62e0e700857531fa5d3763e91b539ff1abeebd56808d378b495870d60d""checksum block-padding 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d75255892aeb580d3c566f213a2b6fdc1c66667839f45719ee1d30ebf2aea591""checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b""checksum block-padding 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "6d4dc3af3ee2e12f3e5d224e5e1e3d73668abbeb69e566d361f7d5563a4fdf09" - replacement in Cargo.lock at line 1923
"checksum core-foundation 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "286e0b41c3a20da26536c6000a280585d519fd07b3956b43aed8a79e9edce980""checksum core-foundation-sys 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "716c271e8613ace48344f723b60b900a93150271e5be206212d052bbc0883efa""checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d""checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" - replacement in Cargo.lock at line 1965
"checksum libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)" = "bedcc7a809076656486ffe045abeeac163da1b558e963a31e29fbfbeba916917""checksum libc 0.2.53 (registry+https://github.com/rust-lang/crates.io-index)" = "ec350a9417dfd244dc9a6c4a71e13895a4db6b92f0b106f07ebbc3f3bc580cee" - replacement in Cargo.lock at line 1979[3.132712]→[3.132712:132865](∅→∅),[3.132712]→[3.132712:132865](∅→∅),[3.132712]→[3.132712:132865](∅→∅)
"checksum native-tls 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ff8e08de0070bbf4c31f452ea2a70db092f36f6f2e4d897adf5674477d488fb2""checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" - edit in Cargo.lock at line 1986
"checksum numtoa 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b8f8bdf33df195859076e54ab11ee78a1b208382d3a26ec40d142ffc1ecc49ef" - replacement in Cargo.lock at line 2002
"checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915""checksum proc-macro2 0.4.29 (registry+https://github.com/rust-lang/crates.io-index)" = "64c827cea7a7ab30ce4593e5e04d7a11617ad6ece2fa230605a78b00ff965316" - replacement in Cargo.lock at line 2021
"checksum regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "559008764a17de49a3146b234641644ed37d118d1ef641a0bb573d146edc6ce0""checksum regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8f0a0bcab2fd7d1d7c54fa9eae6f43eddeb9ce2e7352f8518a814a4f65d60c58" - replacement in Cargo.lock at line 2031
"checksum security-framework 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfab8dda0e7a327c696d893df9ffa19cadc4bd195797997f5223cf5831beaf05""checksum security-framework-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3d6696852716b589dff9e886ff83778bb635150168e83afa8ac6b8a78cb82abc""checksum security-framework 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eee63d0f4a9ec776eeb30e220f0bc1e092c3ad744b2a379e3993070364d3adc2""checksum security-framework-sys 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9636f8989cbf61385ae4824b98c1aaa54c994d7d8b41f11c601ed799f0549a56" - replacement in Cargo.lock at line 2040[3.104077]→[3.104077:104224](∅→∅),[3.104077]→[3.104077:104224](∅→∅),[3.104077]→[3.104077:104224](∅→∅),[3.104224]→[3.72470:72624](∅→∅)
"checksum sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "34a5e54083ce2b934bf059fdf38e7330a154177e029ab6c4e18638f2f624053a""checksum signal-hook 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "97a47ae722318beceb0294e6f3d601205a1e6abaa4437d9d33e3a212233e3021""checksum sha3 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd26bc0e7a2e3a7c959bc494caf58b72ee0c71d67704e9520f736ca7e4853ecf""checksum signal-hook 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "72ab58f1fda436857e6337dcb6a5aaa34f16c5ddc87b3a8b6ef7a212f90b9c5a""checksum signal-hook-registry 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "591fe2ee5a2412968f63a008a190d99918c2cda3f616411026f0975715e1cf62" - replacement in Cargo.lock at line 2055
"checksum syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)" = "66c8865bf5a7cbb662d8b011950060b3c8743dca141b054bf7195b20d314d8e2""checksum syn 0.15.33 (registry+https://github.com/rust-lang/crates.io-index)" = "ec52cd796e5f01d0067225a5392e70084acc4c0013fa71d55166d38a8b307836" - replacement in Cargo.lock at line 2061[3.144947]→[3.144947:145097](∅→∅),[3.144947]→[3.144947:145097](∅→∅),[3.144947]→[3.144947:145097](∅→∅)
"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096""checksum termion 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dde0593aeb8d47accea5392b39350015b5eccb12c0d98044d856983d89548dea" - replacement in Cargo.lock at line 2065
"checksum tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "65641e515a437b308ab131a82ce3042ff9795bef5d6c5a9be4eb24195c417fd9""checksum tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "cec6c34409089be085de9403ba2010b80e36938c9ca992c4f67f407bb13db0b1" - replacement in Cargo.lock at line 2074
"checksum tokio-sync 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fda385df506bf7546e70872767f71e81640f1f251bdf2fd8eb81a0eaec5fe022""checksum tokio-sync 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "5b2f843ffdf8d6e1f90bddd48da43f99ab071660cd92b7ec560ef3cdfd7a409a" - replacement in Cargo.lock at line 2076
"checksum tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ec5759cf26cf9659555f36c431b515e3d05f66831741c85b4b5d5dfb9cf1323c""checksum tokio-threadpool 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "72558af20be886ea124595ea0f806dd5703b8958e4705429dd58b3d8231f72f2"