Update dependencies

[?]
Apr 17, 2019, 5:03 PM
4IPZTMFIDUAQTFEGTJG5FWWABA6NQWRT23XRLUV3TJQOX4JIPILAC

Dependencies

  • [2] PJV5HPIF Starting to imlements timeouts for iqs
  • [3] 6UKCVM6E Use new iq processng for initial roster
  • [4] BYJPYYSM Process iq ping response
  • [5] GVZ4JAR5 Process self-presence with incoming stanza processor
  • [6] 2THKW66M Ignore .orig files
  • [7] OFLAP2G2 Fix possible utf8 errors
  • [8] S754Y5DF Refactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas.
  • [9] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [10] CCLGGFKR Move out XmppConnection into own file
  • [11] HDLI2X4H Ignore delayed XEP-0203 messages
  • [12] RRLRZTMR Use element processor for iq
  • [13] X6L47BHQ Use different structure for established xmpp connection
  • [14] QWE26TMV update deps
  • [15] DYRPAV6T Update dependencies
  • [16] VS6AHRWI Move XMPP to separate dir
  • [17] LL3D5CXK Staring using element processor
  • [18] PLWPCM47 Add id to initital presence
  • [19] FVVPKFTL Initial commit
  • [*] 5A5UVGNM Move receiver closing logic out of xmpp processing

Change contents

  • replacement in src/xmpp/xmpp_connection.rs at line 0
    [3.21][2.0:32662]()
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use std::time::{Duration, Instant};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    /// trait of processing iq
    /// each function consumes handlers and
    /// returns false if connection should be reset
    trait IqHandler {
    /// process result
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;
    /// process error
    fn error(
    self: Box<Self>,
    conn: &mut XmppConnection,
    error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;
    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
    }
    struct AddRosterIqHandler {
    jid: xmpp_parsers::Jid,
    }
    impl IqHandler for AddRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    match opt_element {
    Some(element) => {
    warn!(
    "Wrong payload when adding {} to roster: {}",
    self.jid,
    String::from(&element)
    );
    }
    None => {
    if conn.state.data.roster.contains_key(&self.jid) {
    info!("Jid {} updated to roster", self.jid);
    } else {
    info!("Jid {} added in roster", self.jid);
    conn.state.data.roster.insert(
    self.jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    conn.process_jid(&self.jid);
    }
    }
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    true
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    true // ignore
    }
    }
    struct PingIqHandler {}
    impl IqHandler for PingIqHandler {
    fn result(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    true // ignore
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    struct InitRosterIqHandler {}
    impl IqHandler for InitRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    if let Some(result) = opt_element {
    use std::convert::TryInto;
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    conn.state.data.roster_init = true;
    conn.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    conn.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    true
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    false
    }
    }
    } else {
    error!("No roster responded");
    false
    }
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it used only for initialization
    roster_init: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {}", String::from(&e));
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(roster) =
    element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    // RFC 6212 2.1.6. Roster Push
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ set request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ get request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.error(self, e);
    }
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Result(opt_element) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.result(self, opt_element);
    }
    warn!(
    "Unwanted iq result id {} from {:?}: {:?}",
    iq.id,
    iq.from,
    opt_element.map(|e| String::from(&e))
    );
    }
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    // ToDo: check timeouts if iqs
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {}", String::from(&send_element));
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {}", String::from(&s));
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {}", String::from(&get_roster));
    data.pending_ids.insert(
    id_init_roster.clone(),
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(InitRosterIqHandler {}),
    ),
    );
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {}", String::from(&presence));
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
    self.state.data.pending_ids.insert(
    id_add_roster,
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(AddRosterIqHandler {
    jid: xmpp_to.clone(),
    }),
    ),
    );
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    self.state.data.pending_ids.insert(
    id_ping,
    (
    Instant::now() + Duration::from_secs(30),
    Box::new(PingIqHandler {}),
    ),
    );
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!(
    "Sending muc leave presence... {}",
    String::from(&muc_presence)
    );
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {}", String::from(&muc_presence));
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
    [3.21]
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use std::time::{Duration, Instant};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    /// trait of processing iq
    /// each function consumes handlers and
    /// returns false if connection should be reset
    trait IqHandler {
    /// process result
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;
    /// process error
    fn error(
    self: Box<Self>,
    conn: &mut XmppConnection,
    error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;
    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
    }
    struct AddRosterIqHandler {
    jid: xmpp_parsers::Jid,
    }
    impl IqHandler for AddRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    match opt_element {
    Some(element) => {
    warn!(
    "Wrong payload when adding {} to roster: {}",
    self.jid,
    String::from(&element)
    );
    }
    None => {
    if conn.state.data.roster.contains_key(&self.jid) {
    info!("Jid {} updated to roster", self.jid);
    } else {
    info!("Jid {} added in roster", self.jid);
    conn.state.data.roster.insert(
    self.jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    conn.process_jid(&self.jid);
    }
    }
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    true
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    true // ignore
    }
    }
    struct PingIqHandler {}
    impl IqHandler for PingIqHandler {
    fn result(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    true // ignore
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    struct InitRosterIqHandler {}
    impl IqHandler for InitRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    if let Some(result) = opt_element {
    use std::convert::TryInto;
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    conn.state.data.roster_init = true;
    conn.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    conn.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    true
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    false
    }
    }
    } else {
    error!("No roster responded");
    false
    }
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it used only for initialization
    roster_init: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {}", String::from(&e));
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(roster) =
    element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    // RFC 6212 2.1.6. Roster Push
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ set request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ get request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.error(self, e);
    }
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Result(opt_element) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.result(self, opt_element);
    }
    warn!(
    "Unwanted iq result id {} from {:?}: {:?}",
    iq.id,
    iq.from,
    opt_element.map(|e| String::from(&e))
    );
    }
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    // ToDo: check timeouts if iqs
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {}", String::from(&send_element));
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {}", String::from(&s));
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {}", String::from(&get_roster));
    data.pending_ids.insert(
    id_init_roster.clone(),
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(InitRosterIqHandler {}),
    ),
    );
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {}", String::from(&presence));
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
    self.state.data.pending_ids.insert(
    id_add_roster,
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(AddRosterIqHandler {
    jid: xmpp_to.clone(),
    }),
    ),
    );
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    self.state.data.pending_ids.insert(
    id_ping,
    (
    Instant::now() + Duration::from_secs(30),
    Box::new(PingIqHandler {}),
    ),
    );
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!(
    "Sending muc leave presence... {}",
    String::from(&muc_presence)
    );
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {}", String::from(&muc_presence));
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • edit in src/xmpp/stanzas.rs at line 7
    [3.130][3.32681:32757]()
    use xmpp_parsers::stanza_error::{DefinedCondition, ErrorType, StanzaError};
  • edit in src/xmpp/stanzas.rs at line 45
    [3.32816][3.32816:33065](),[3.33065][3.32526:32529](),[3.32526][3.32526:32529]()
    pub fn make_roster_push_answer(
    id: String,
    from: xmpp_parsers::Jid,
    to: Option<xmpp_parsers::Jid>,
    ) -> Iq {
    let mut answer = Iq::from_result(id, None as Option<Roster>);
    answer.from = Some(from);
    answer.to = to;
    answer
    }
  • edit in src/xmpp/stanzas.rs at line 47
    [3.812][3.32530:32711]()
    presence.to = Some(jid);
    presence.into()
    }
    pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {
    let mut presence = Presence::new(PresenceType::Subscribed);
  • replacement in src/xmpp/stanzas.rs at line 88
    [3.30150][3.33066:33157]()
    pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Iq {
    [3.30150]
    [3.30246]
    pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Element {
  • replacement in src/xmpp/stanzas.rs at line 92
    [3.30356][3.33158:33167]()
    pong
    [3.30356]
    [3.33167]
    pong.into()
  • edit in src/xmpp/stanzas.rs at line 94
    [3.33169][3.33169:33662](),[3.33662][3.30372:30374](),[3.32813][3.30372:30374](),[3.29524][3.30372:30374](),[3.30372][3.30372:30374]()
    pub fn make_iq_unsupported_error(
    id: String,
    from: xmpp_parsers::Jid,
    to: Option<xmpp_parsers::Jid>,
    ) -> Iq {
    let mut error = Iq::from_error(
    id,
    StanzaError {
    type_: ErrorType::Cancel,
    by: Some(from.clone()),
    defined_condition: DefinedCondition::ServiceUnavailable,
    texts: std::collections::BTreeMap::new(),
    other: None,
    },
    );
    error.from = Some(from);
    error.to = to;
    error
    }
  • replacement in src/xmpp/mod.rs at line 0
    [3.17][3.33310:33362]()
    use tokio::prelude::future::{self, Either, Future};
    [3.17]
    [3.62]
    use tokio::prelude::future::{self, Either};
  • edit in src/xmpp/mod.rs at line 2
    [3.90]
    [3.197]
    use tokio::prelude::{Future, Stream};
    use tokio_xmpp::{Client, Event, Packet};
  • replacement in src/xmpp/mod.rs at line 8
    [3.297][3.33363:33386]()
    mod element_processor;
    [3.297]
    [3.3355]
    use std::collections::{HashMap, VecDeque};
  • edit in src/xmpp/mod.rs at line 11
    [3.29460]
    [3.51399]
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
  • replacement in src/xmpp/mod.rs at line 148
    [3.51400][3.33387:33450]()
    mod xmpp_connection;
    use xmpp_connection::MaybeXmppConnection;
    [3.51400]
    [3.51550]
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    info!("Incoming xmpp event: {:?}", stanza);
    let stanza = stanza.clone();
    use std::convert::TryInto;
    if let Some(iq) = stanza.clone().try_into().ok() as Option<xmpp_parsers::iq::Iq> {
    if let Some((_, jid)) =
    self.state.data.pending_add_roster_ids.remove_entry(&iq.id)
    {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) =
    self.state.data.roster.get_mut(&i.jid)
    {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) =
    element.try_into().ok() as Option<xmpp_parsers::ping::Ping>
    {
    let pong = stanzas::make_pong(
    &iq.id,
    self.state.client.jid.clone(),
    iq.from,
    );
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    } else if let Some(_presence) =
    stanza.try_into().ok() as Option<xmpp_parsers::presence::Presence>
    {
    // to do something with presence
    }
    true
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
  • edit in src/xmpp/mod.rs at line 515
    [3.51551]
    [21.43]
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • replacement in src/xmpp/element_processor.rs at line 0
    [3.11301][2.32663:33739]()
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
    [3.11301]
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
  • edit in src/main.rs at line 215
    [3.49351][3.49351:49388]()
    use hyper::rt::{Future, Stream};
  • replacement in Cargo.lock at line 102
    [3.56068][3.56068:56086](),[3.56068][3.56068:56086]()
    version = "0.7.0"
    [3.56068]
    [3.56086]
    version = "0.7.3"
  • replacement in Cargo.lock at line 284
    [3.62112][3.62112:62184](),[3.62112][3.62112:62184]()
    "regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
    [3.62112]
    [3.62184]
    "regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
  • replacement in Cargo.lock at line 1016
    [3.87194][3.87194:87212](),[3.87194][3.87194:87212]()
    version = "1.1.5"
    [3.87194]
    [3.87212]
    version = "1.1.6"
  • replacement in Cargo.lock at line 1180
    [3.93036][3.93036:93115](),[3.93036][3.93036:93115]()
    "block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
    [3.93036]
    [3.93115]
    "block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
  • replacement in Cargo.lock at line 1191
    [3.93472][3.93472:93551](),[3.93472][3.93472:93551]()
    "block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
    [3.93472]
    [3.93551]
    "block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
  • replacement in Cargo.lock at line 1202
    [3.93908][3.93908:93987](),[3.93908][3.93908:93987]()
    "block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
    [3.93908]
    [3.93987]
    "block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
  • replacement in Cargo.lock at line 1911
    [3.122530][3.122530:122685](),[3.122530][3.122530:122685]()
    "checksum block-buffer 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49665c62e0e700857531fa5d3763e91b539ff1abeebd56808d378b495870d60d"
    [3.122530]
    [3.122685]
    "checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
  • replacement in Cargo.lock at line 2019
    [3.138993][3.138993:139141](),[3.138993][3.138993:139141]()
    "checksum regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "559008764a17de49a3146b234641644ed37d118d1ef641a0bb573d146edc6ce0"
    [3.138993]
    [3.139141]
    "checksum regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8f0a0bcab2fd7d1d7c54fa9eae6f43eddeb9ce2e7352f8518a814a4f65d60c58"