Starting to imlements timeouts for iqs

[?]
Apr 14, 2019, 9:31 PM
PJV5HPIFUSZGVYOSAWLIH2YJIPWDITUSW5NYRLB5DDNITQHJC6CQC

Dependencies

  • [2] HDLI2X4H Ignore delayed XEP-0203 messages
  • [3] CCLGGFKR Move out XmppConnection into own file
  • [4] OFLAP2G2 Fix possible utf8 errors
  • [5] PLWPCM47 Add id to initital presence
  • [6] RRLRZTMR Use element processor for iq
  • [7] S754Y5DF Refactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas.
  • [8] 2THKW66M Ignore .orig files
  • [9] DYRPAV6T Update dependencies
  • [10] 6UKCVM6E Use new iq processng for initial roster
  • [11] BYJPYYSM Process iq ping response
  • [12] LL3D5CXK Staring using element processor
  • [13] GVZ4JAR5 Process self-presence with incoming stanza processor

Change contents

  • replacement in src/xmpp/xmpp_connection.rs at line 0
    [3.21][2.0:32680]()
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    /// trait of processing iq
    /// each function consumes handlers and
    /// returns false if connection should be reset
    trait IqHandler {
    /// process result
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;
    /// process error
    fn error(
    self: Box<Self>,
    conn: &mut XmppConnection,
    error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;
    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
    }
    struct AddRosterIqHandler {
    jid: xmpp_parsers::Jid,
    }
    impl IqHandler for AddRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    match opt_element {
    Some(element) => {
    warn!(
    "Wrong payload when adding {} to roster: {}",
    self.jid,
    String::from(&element)
    );
    }
    None => {
    if conn.state.data.roster.contains_key(&self.jid) {
    info!("Jid {} updated to roster", self.jid);
    } else {
    info!("Jid {} added in roster", self.jid);
    conn.state.data.roster.insert(
    self.jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    conn.process_jid(&self.jid);
    }
    }
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    true
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    true // ignore
    }
    }
    struct PingIqHandler {}
    impl IqHandler for PingIqHandler {
    fn result(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    info!("ping successed");
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    struct InitRosterIqHandler {}
    impl IqHandler for InitRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    if let Some(result) = opt_element {
    use std::convert::TryInto;
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    conn.state.data.roster_init = true;
    conn.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    conn.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    true
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    false
    }
    }
    } else {
    error!("No roster responded");
    false
    }
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it is used only for initialization
    roster_init: bool,
    /// if self-presence accepted
    /// ToDo: remove it as it is used only for initialization
    self_presence: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, Box<dyn IqHandler>>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {}", String::from(&e));
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    incoming.register(&XmppConnection::incoming_presence_processing);
    incoming.register(&XmppConnection::incoming_message_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(roster) =
    element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    // RFC 6212 2.1.6. Roster Push
    info!("Got roster push {} from {:?}", id, from);
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ set request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    info!("Got ping {} from {:?}", id, from);
    return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ get request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.error(self, e);
    }
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Result(opt_element) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.result(self, opt_element);
    }
    warn!(
    "Unwanted iq result id {} from {:?}: {:?}",
    iq.id,
    iq.from,
    opt_element.map(|e| String::from(&e))
    );
    }
    }
    true
    }
    fn incoming_presence_processing(&mut self, presence: xmpp_parsers::presence::Presence) -> bool {
    if presence.from.as_ref() == Some(&self.state.client.jid) {
    info!("Self-presence accepted");
    self.state.data.self_presence = true;
    } else {
    warn!("Incoming presence stanza: {:?}", presence);
    }
    true
    }
    fn incoming_message_processing(&mut self, message: xmpp_parsers::message::Message) -> bool {
    for payload in message.payloads.iter() {
    use std::convert::TryInto;
    if let Some(_delay) =
    payload.clone().try_into().ok() as Option<xmpp_parsers::delay::Delay>
    {
    return true; // ignore delayed messages
    }
    }
    warn!("Incoming message stanza: {:?}", message);
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {}", String::from(&send_element));
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {}", String::from(&s));
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {}", String::from(&get_roster));
    data.pending_ids
    .insert(id_init_roster.clone(), Box::new(InitRosterIqHandler {}));
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {}", String::from(&presence));
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, _| Ok(conn.state.data.self_presence),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
    self.state.data.pending_ids.insert(
    id_add_roster,
    Box::new(AddRosterIqHandler {
    jid: xmpp_to.clone(),
    }),
    );
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    self.state
    .data
    .pending_ids
    .insert(id_ping, Box::new(PingIqHandler {}));
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!(
    "Sending muc leave presence... {}",
    String::from(&muc_presence)
    );
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {}", String::from(&muc_presence));
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
    [3.21]
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use std::time::{Duration, Instant};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    /// trait of processing iq
    /// each function consumes handlers and
    /// returns false if connection should be reset
    trait IqHandler {
    /// process result
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;
    /// process error
    fn error(
    self: Box<Self>,
    conn: &mut XmppConnection,
    error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;
    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
    }
    struct AddRosterIqHandler {
    jid: xmpp_parsers::Jid,
    }
    impl IqHandler for AddRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    match opt_element {
    Some(element) => {
    warn!(
    "Wrong payload when adding {} to roster: {}",
    self.jid,
    String::from(&element)
    );
    }
    None => {
    if conn.state.data.roster.contains_key(&self.jid) {
    info!("Jid {} updated to roster", self.jid);
    } else {
    info!("Jid {} added in roster", self.jid);
    conn.state.data.roster.insert(
    self.jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    conn.process_jid(&self.jid);
    }
    }
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    true
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    true // ignore
    }
    }
    struct PingIqHandler {}
    impl IqHandler for PingIqHandler {
    fn result(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    true // ignore
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    struct InitRosterIqHandler {}
    impl IqHandler for InitRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    if let Some(result) = opt_element {
    use std::convert::TryInto;
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    conn.state.data.roster_init = true;
    conn.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    conn.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    true
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    false
    }
    }
    } else {
    error!("No roster responded");
    false
    }
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it used only for initialization
    roster_init: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {}", String::from(&e));
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(roster) =
    element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    // RFC 6212 2.1.6. Roster Push
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ set request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ get request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.error(self, e);
    }
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Result(opt_element) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.result(self, opt_element);
    }
    warn!(
    "Unwanted iq result id {} from {:?}: {:?}",
    iq.id,
    iq.from,
    opt_element.map(|e| String::from(&e))
    );
    }
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    // ToDo: check timeouts if iqs
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {}", String::from(&send_element));
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {}", String::from(&s));
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {}", String::from(&get_roster));
    data.pending_ids.insert(
    id_init_roster.clone(),
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(InitRosterIqHandler {}),
    ),
    );
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {}", String::from(&presence));
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
    self.state.data.pending_ids.insert(
    id_add_roster,
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(AddRosterIqHandler {
    jid: xmpp_to.clone(),
    }),
    ),
    );
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    self.state.data.pending_ids.insert(
    id_ping,
    (
    Instant::now() + Duration::from_secs(30),
    Box::new(PingIqHandler {}),
    ),
    );
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!(
    "Sending muc leave presence... {}",
    String::from(&muc_presence)
    );
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {}", String::from(&muc_presence));
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • replacement in src/xmpp/element_processor.rs at line 0
    [3.11301][2.33663:34739]()
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
    [3.11301]
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }