Set presence to account and MUC

[?]
May 29, 2020, 12:02 PM
KORYGY74MKXQZSV6JOJJCHPG3CC5UM2LTNIEV5UAJGZS7QCDKDLQC

Dependencies

  • [2] W6GSBP3Z Add id for presence stanzas
  • [3] BYJPYYSM Process iq ping response
  • [4] LQXBWNFT Remove unneeded requirement
  • [5] V5HDBSZM Use jid for receiver address
  • [6] BWDUANCV Second part of processing result is only about stop_future
  • [7] CCLGGFKR Move out XmppConnection into own file
  • [8] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [9] LL3D5CXK Staring using element processor
  • [10] OANBCLN5 Move xmpp client into XmppState
  • [11] SA2IOFGY Add items to roster
  • [12] PLWPCM47 Add id to initital presence
  • [13] S754Y5DF Refactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas.
  • [14] Z3NQEYVI Rename IqSetHandler to IqResuestHandler as it should provide both get and set handling
  • [15] 2THKW66M Ignore .orig files
  • [16] 3GEU7TC7 Welcome to 2018!
  • [17] YTN366WA Support disco#items
  • [18] JY4F7VBC Use element processor for incoming iq set
  • [19] UAT5MV5O Directly use id for initial roster request
  • [20] CBWCXUZZ Prepare adding new items to roster
  • [21] GXQCDLYQ Use element processor for incoming iq get
  • [22] OB3HA2MD Use Client::new_with_jid to parse jid only once
  • [23] QWE26TMV update deps
  • [24] HDLI2X4H Ignore delayed XEP-0203 messages
  • [25] DCMDASHV Mention XEP-0050 and XEP-0203 support
  • [26] QDHDTOLM Starting support for commands XEP-0050: Ad-Hoc Commands (there no support in xmpp_parsers still)
  • [27] GVZ4JAR5 Process self-presence with incoming stanza processor
  • [28] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [29] RGOSS73U Convert self-presence to xmpp_parser's type
  • [30] 5WHNHD42 Update dependencies
  • [31] TDOR5XQU Accept destination
  • [32] ZT3YEIVX Consume connection on processing command
  • [33] RRLRZTMR Use element processor for iq
  • [34] SH3LIQ4S Starting commands support
  • [35] U3UZTFCH Re-try to subscribe if not subscribed
  • [36] RQZCVDFD Implement applying timeout for expired iq await
  • [37] AEH7WP42 Make element processors static
  • [38] WDCZNZOP Fix rustdoc
  • [39] YEMBT7TB Add support for XEP-0092: Software Version
  • [40] 3DSOPLCG Add rustdoc
  • [41] PFC7OJQF Query roster
  • [42] 6UKCVM6E Use new iq processng for initial roster
  • [43] DYRPAV6T Update dependencies
  • [44] ZFBPXPAD Cleanup timeouted iq requests with ping Output elapsed time. Refactor iq handling.
  • [45] SSOKGGCE Update dependencies
  • [46] OFLAP2G2 Fix possible utf8 errors
  • [47] LNUU5R56 Support disco#info from XEP-0030 Service Discovery
  • [48] 4IPZTMFI Update dependencies
  • [49] PJV5HPIF Starting to imlements timeouts for iqs
  • [*] VS6AHRWI Move XMPP to separate dir
  • [*] FVVPKFTL Initial commit

Change contents

  • replacement in src/xmpp/xmpp_connection.rs at line 0
    [3.21][2.0:30535]()
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {:#?}", e);
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    use std::convert::TryInto;
    if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state.data.counter += 1;
    let id_presence_subscribe =
    format!("id_presence_subscribe{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(
    id_presence_subscribe,
    xmpp_to.clone(),
    ));
    } else {
    warn!(
    "Not subscribed to {}. Currently in {:?} state",
    xmpp_to, rdata.1
    );
    self.state.data.counter += 1;
    let id_presence_subscribe =
    format!("id_presence_subscribe{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(
    id_presence_subscribe,
    xmpp_to.clone(),
    ));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state.data.counter += 1;
    let id_presence_subscribed =
    format!("id_presence_subscribed{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(
    id_presence_subscribed,
    xmpp_to.clone(),
    ));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
    [3.21]
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use std::time::{Duration, Instant};
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    #[derive(Debug)]
    pub enum XmppCommand {
    /// Send message to someone by jid
    Chat {
    xmpp_to: xmpp_parsers::Jid,
    message: String,
    },
    /// Send message to MUC
    Chatroom {
    muc_id: String,
    message: String,
    },
    // Send ping request to the server to test connection
    Ping,
    /// Set presence status
    Presence {
    show: xmpp_parsers::presence::Show,
    message: String,
    },
    /// Send presence status to MUC
    ChatroomPresence {
    muc_id: String,
    show: xmpp_parsers::presence::Show,
    message: String,
    },
    /// Check iq requests if some have expired timeouts
    TimeoutCleanup,
    }
    /// trait of processing iq
    /// each function consumes handlers and
    /// returns false if connection should be reset
    trait IqHandler {
    /// process result
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;
    /// process error
    fn error(
    self: Box<Self>,
    conn: &mut XmppConnection,
    error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;
    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
    }
    struct AddRosterIqHandler {
    jid: xmpp_parsers::Jid,
    }
    impl IqHandler for AddRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    match opt_element {
    Some(element) => {
    warn!(
    "Wrong payload when adding {} to roster: {}",
    self.jid,
    String::from(&element)
    );
    }
    None => {
    if conn.state.data.roster.contains_key(&self.jid) {
    info!("Jid {} updated to roster", self.jid);
    } else {
    info!("Jid {} added in roster", self.jid);
    conn.state.data.roster.insert(
    self.jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    conn.process_jid(&self.jid);
    }
    }
    true
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    true
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    true // ignore
    }
    }
    struct PingIqHandler {}
    impl IqHandler for PingIqHandler {
    fn result(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    true // ignore
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    struct InitRosterIqHandler {}
    impl IqHandler for InitRosterIqHandler {
    fn result(
    self: Box<Self>,
    conn: &mut XmppConnection,
    opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
    if let Some(result) = opt_element {
    use std::convert::TryInto;
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    conn.state.data.roster_init = true;
    conn.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    conn.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    true
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    false
    }
    }
    } else {
    error!("No roster responded");
    false
    }
    }
    fn error(
    self: Box<Self>,
    _conn: &mut XmppConnection,
    _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
    false
    }
    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
    false
    }
    }
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it used only for initialization
    roster_init: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {}", String::from(&e));
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(roster) =
    element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    // RFC 6212 2.1.6. Roster Push
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ set request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
    &mut self,
    id: String,
    from: Option<xmpp_parsers::Jid>,
    element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
    use std::convert::TryInto;
    if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
    }
    warn!(
    "Unsupported IQ get request from {:?}: {}",
    from,
    String::from(&element)
    );
    stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.error(self, e);
    }
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
    self.state.data.send_queue.push_back(iq_answer.into());
    }
    xmpp_parsers::iq::IqType::Result(opt_element) => {
    if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
    return handler.1.result(self, opt_element);
    }
    warn!(
    "Unwanted iq result id {} from {:?}: {:?}",
    iq.id,
    iq.from,
    opt_element.map(|e| String::from(&e))
    );
    }
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    // ToDo: check timeouts if iqs
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {}", String::from(&send_element));
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {}", String::from(&s));
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {}", String::from(&get_roster));
    data.pending_ids.insert(
    id_init_roster.clone(),
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(InitRosterIqHandler {}),
    ),
    );
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_presence = format!("id_init_presence{}", data.counter);
    let presence = stanzas::make_presence(id_presence, xmpp_parsers::presence::Show::None, "Online!".to_string());
    let account2 = account.clone();
    info!("Sending presence... {}", String::from(&presence));
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
    self.state.data.pending_ids.insert(
    id_add_roster,
    (
    Instant::now() + Duration::from_secs(60),
    Box::new(AddRosterIqHandler {
    jid: xmpp_to.clone(),
    }),
    ),
    );
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::ChatroomPresence { muc_id, show, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state.data.counter += 1;
    let id_presence = format!("id_presence{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_presence(
    &id_presence,
    self.account.jid.clone(),
    muc.clone(),
    show,
    Some(message),
    ));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Presence { show, message } => {
    self.state.data.counter += 1;
    let id_presence = format!("id_presence{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_presence(id_presence, show, message));
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    self.state.data.pending_ids.insert(
    id_ping,
    (
    Instant::now() + Duration::from_secs(30),
    Box::new(PingIqHandler {}),
    ),
    );
    }
    XmppCommand::TimeoutCleanup => {
    let now = Instant::now();
    let timeouted: Vec<String> = self
    .state
    .data
    .pending_ids
    .iter()
    .filter_map(|(id, (timeout, _))| {
    if now >= *timeout {
    Some(id.to_string())
    } else {
    None
    }
    })
    .collect();
    let mut correct = true;
    timeouted.into_iter().for_each(|id| {
    if let Some((_, handler)) = self.state.data.pending_ids.remove(&id) {
    correct &= handler.timeout(&mut self);
    }
    })
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!(
    "Sending muc leave presence... {}",
    String::from(&muc_presence)
    );
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    xmpp_parsers::presence::Show::None,
    None,
    );
    info!("Sending muc presence... {}", String::from(&muc_presence));
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • edit in src/xmpp/stanzas.rs at line 0
    [3.13][3.35280:35308]()
    use crate::config::Account;
  • edit in src/xmpp/stanzas.rs at line 6
    [3.35465]
    [3.141]
    use xmpp_parsers::stanza_error::{DefinedCondition, ErrorType, StanzaError};
  • replacement in src/xmpp/stanzas.rs at line 8
    [3.142][3.32758:32812]()
    pub fn make_presence(_account: &Account) -> Element {
    [3.142]
    [3.196]
    pub fn make_presence(id: String, show: PresenceShow, text: String) -> Element {
  • replacement in src/xmpp/stanzas.rs at line 10
    [3.29410][3.254:294](),[3.254][3.254:294]()
    presence.show = PresenceShow::Chat;
    [3.254]
    [3.294]
    presence.id = Some(id);
    presence.show = show;
  • replacement in src/xmpp/stanzas.rs at line 14
    [3.325][3.325:397](),[3.325][3.325:397]()
    .insert(String::from("en"), String::from("Echoing messages."));
    [3.325]
    [3.35542]
    .insert(String::from("en"), text);
  • replacement in src/xmpp/stanzas.rs at line 46
    [3.39861][2.30536:30611]()
    pub fn make_ask_subscribe(id: String, jid: xmpp_parsers::Jid) -> Element {
    [3.39861]
    [3.36481]
    pub fn make_roster_push_answer(
    id: String,
    from: xmpp_parsers::Jid,
    to: Option<xmpp_parsers::Jid>,
    ) -> Iq {
    let mut answer = Iq::from_result(id, None as Option<Roster>);
    answer.from = Some(from);
    answer.to = to;
    answer
    }
    pub fn make_ask_subscribe(jid: xmpp_parsers::Jid) -> Element {
  • edit in src/xmpp/stanzas.rs at line 59
    [3.36544][2.30612:30640]()
    presence.id = Some(id);
  • replacement in src/xmpp/stanzas.rs at line 63
    [3.35009][2.30641:30718]()
    pub fn make_allow_subscribe(id: String, jid: xmpp_parsers::Jid) -> Element {
    [3.35009]
    [3.35074]
    pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {
  • edit in src/xmpp/stanzas.rs at line 65
    [3.35138][2.30719:30747]()
    presence.id = Some(id);
  • replacement in src/xmpp/stanzas.rs at line 76
    [3.37015][3.37015:37111]()
    pub fn make_muc_presence(id: &str, from: xmpp_parsers::Jid, to: xmpp_parsers::Jid) -> Element {
    [3.37015]
    [3.37111]
    pub fn make_muc_presence(
    id: &str,
    from: xmpp_parsers::Jid,
    to: xmpp_parsers::Jid,
    show: PresenceShow,
    text: Option<String>,
    ) -> Element {
  • edit in src/xmpp/stanzas.rs at line 87
    [3.37269]
    [3.37269]
    presence.show = show;
    if let Some(text) = text {
    presence
    .statuses
    .insert(String::from("en"), text);
    }
  • replacement in src/xmpp/stanzas.rs at line 118
    [3.35186][3.29594:29690]()
    pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Element {
    [3.35186]
    [3.35277]
    pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Iq {
  • replacement in src/xmpp/stanzas.rs at line 122
    [3.35387][3.29691:29707]()
    pong.into()
    [3.35387]
    [3.41574]
    pong
    }
    pub fn make_iq_unsupported_error(
    id: String,
    from: xmpp_parsers::Jid,
    to: Option<xmpp_parsers::Jid>,
    ) -> Iq {
    let mut error = Iq::from_error(
    id,
    StanzaError {
    type_: ErrorType::Cancel,
    by: Some(from.clone()),
    defined_condition: DefinedCondition::ServiceUnavailable,
    texts: std::collections::BTreeMap::new(),
    other: None,
    },
    );
    error.from = Some(from);
    error.to = to;
    error
  • replacement in src/xmpp/mod.rs at line 12
    [3.55931][3.29708:29910]()
    #[derive(Debug)]
    pub enum XmppCommand {
    Chat {
    xmpp_to: xmpp_parsers::Jid,
    message: String,
    },
    Chatroom {
    muc_id: String,
    message: String,
    },
    Ping,
    }
    [3.55931]
    [3.60161]
    pub use xmpp_connection::XmppCommand;
  • replacement in src/xmpp/element_processor.rs at line 0
    [3.11301][2.30748:31824]()
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
    [3.11301]
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
  • edit in src/main.rs at line 75
    [3.45022]
    [3.45651]
    let xmpp_pres_opt = req
    .headers()
    .get("X-XMPP-Presence");
    let xmpp_pres_res: Result<xmpp_parsers::presence::Show, failure::Error> = xmpp_pres_opt
    .map_or_else(|| Err(format_err!("No X-XMPP-Presence header")),
    |show| {
    std::str::from_utf8(show.as_bytes())
    .map_err(std::convert::Into::into)
    .and_then(|s| std::str::FromStr::from_str(s)
    .map_err(|e| format_err!("Incorrect presence {}", e) ))
    },
    );
  • replacement in src/main.rs at line 89
    [3.45652][3.45652:45730](),[3.45652][3.45652:45730](),[3.45652][3.45652:45730]()
    match (xmpp_muc_opt, xmpp_to_res) {
    (None, Err(err)) => {
    [3.45652]
    [3.45730]
    match (xmpp_muc_opt, xmpp_to_res, xmpp_pres_res) {
    (None, Err(err), Err(err2)) => {
  • edit in src/main.rs at line 92
    [3.45785]
    [3.45785]
    warn!("Unknown destination2: {}", err2);
  • replacement in src/main.rs at line 96
    [3.45946][3.45946:46029](),[3.45946][3.45946:46029](),[3.45946][3.45946:46029]()
    .body(Body::from(format!("Unknown destination: {}", err)))
    [3.45946]
    [3.45023]
    .body(Body::from(format!(
    "Unknown destination: {}\nUnknown destination: {}",
    err,
    err2,
    )))
  • replacement in src/main.rs at line 104
    [3.46179][3.46179:46216](),[3.46179][3.46179:46216](),[3.46179][3.46179:46216]()
    (None, Ok(xmpp_to)) => {
    [3.46179]
    [3.46216]
    (None, _, Ok(show)) => {
    info!("Got presence request. Reading body...");
    let cmd_send = self.cmd_send.clone();
    Box::new(body_to_string(req).and_then(move |message: String| {
    if !message.is_empty() {
    Box::new(
    cmd_send
    .clone()
    .send(XmppCommand::Presence { show, message })
    .then(|r| match r {
    Ok(_) => tokio::prelude::future::ok(Response::new(Body::from(
    "Accepted",
    ))),
    Err(e) => {
    error!("Command sent error: {}", e);
    tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!(
    "Command sent error: {}",
    e
    ))),
    )
    }
    })
    .map_err(std::convert::Into::into),
    )
    } else {
    warn!("Empty message");
    Box::new(tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from("Empty message"))
    .map_err(std::convert::Into::into),
    ))
    as Box<dyn Future<Item = _, Error = _> + Send + 'static>
    }
    })) as Box<dyn Future<Item = _, Error = _> + Send + 'static>
    }
    (None, Ok(xmpp_to), _) => {
  • replacement in src/main.rs at line 182
    [3.48321][3.48321:48360](),[3.48321][3.48321:48360](),[3.48321][3.48321:48360]()
    (Some(Ok(muc_id)), _) => {
    [3.48321]
    [3.48360]
    (Some(Ok(muc_id)), _, Ok(show)) => {
    info!("Got MUC presence request. Reading body...");
    let cmd_send = self.cmd_send.clone();
    Box::new(body_to_string(req).and_then(move |message: String| {
    if !message.is_empty() {
    Box::new(
    cmd_send
    .clone()
    .send(XmppCommand::ChatroomPresence { muc_id, show, message })
    .then(|r| match r {
    Ok(_) => tokio::prelude::future::ok(Response::new(Body::from(
    "Accepted",
    ))),
    Err(e) => {
    error!("Command sent error: {}", e);
    tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!(
    "Command sent error: {}",
    e
    ))),
    )
    }
    })
    .map_err(std::convert::Into::into),
    )
    } else {
    warn!("Empty message");
    Box::new(tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from("Empty message"))
    .map_err(std::convert::Into::into),
    ))
    as Box<dyn Future<Item = _, Error = _> + Send + 'static>
    }
    })) as Box<dyn Future<Item = _, Error = _> + Send + 'static>
    }
    (Some(Ok(muc_id)), _, _) => {
  • replacement in src/main.rs at line 260
    [3.50472][3.50472:50509](),[3.50472][3.50472:50509](),[3.50472][3.50472:50509]()
    (Some(Err(err)), _) => {
    [3.50472]
    [3.50509]
    (Some(Err(err)), _, _) => {
  • edit in README.md at line 12
    [3.54476]
    [3.54476]
    ```
    ### Setting status
    ```
    curl http://localhost:8083/ -H "X-XMPP-Presence: chat" -d "Test"
    ```
    ### Setting status to MUC
  • edit in README.md at line 23
    [3.54480]
    [3.54480]
    curl http://localhost:8083/ -H "X-XMPP-Muc: smac" -H "X-XMPP-Presence: chat" -d "Test"
    ```