Add id for presence stanzas

[?]
Nov 7, 2019, 6:57 PM
W6GSBP3Z6HBOVEIX236FP3IKEYR46S7LNZGUFFPEGW62SOUCJZ5QC

Dependencies

  • [2] U3UZTFCH Re-try to subscribe if not subscribed
  • [3] 2THKW66M Ignore .orig files
  • [4] CCLGGFKR Move out XmppConnection into own file
  • [5] OFLAP2G2 Fix possible utf8 errors
  • [6] ZFBPXPAD Cleanup timeouted iq requests with ping Output elapsed time. Refactor iq handling.
  • [7] 3DSOPLCG Add rustdoc
  • [8] GVZ4JAR5 Process self-presence with incoming stanza processor
  • [9] LL3D5CXK Staring using element processor
  • [10] RRLRZTMR Use element processor for iq
  • [11] YTN366WA Support disco#items
  • [12] PLWPCM47 Add id to initital presence
  • [13] Z3NQEYVI Rename IqSetHandler to IqResuestHandler as it should provide both get and set handling
  • [14] DYRPAV6T Update dependencies
  • [15] AEH7WP42 Make element processors static
  • [16] LNUU5R56 Support disco#info from XEP-0030 Service Discovery
  • [17] 5WHNHD42 Update dependencies
  • [18] SH3LIQ4S Starting commands support
  • [19] DCMDASHV Mention XEP-0050 and XEP-0203 support
  • [20] ZT3YEIVX Consume connection on processing command
  • [21] GXQCDLYQ Use element processor for incoming iq get
  • [22] S754Y5DF Refactor IQ processing Always answer to set and get requests. Use XML encoding for stanzas.
  • [23] PJV5HPIF Starting to imlements timeouts for iqs
  • [24] LQXBWNFT Remove unneeded requirement
  • [25] QDHDTOLM Starting support for commands XEP-0050: Ad-Hoc Commands (there no support in xmpp_parsers still)
  • [26] WDCZNZOP Fix rustdoc
  • [27] YEMBT7TB Add support for XEP-0092: Software Version
  • [28] 4IPZTMFI Update dependencies
  • [29] BYJPYYSM Process iq ping response
  • [30] SSOKGGCE Update dependencies
  • [31] JY4F7VBC Use element processor for incoming iq set
  • [32] 6UKCVM6E Use new iq processng for initial roster
  • [33] RQZCVDFD Implement applying timeout for expired iq await
  • [34] HDLI2X4H Ignore delayed XEP-0203 messages
  • [*] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel

Change contents

  • replacement in src/xmpp/xmpp_connection.rs at line 0
    [3.21][2.0:29593]()
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {:#?}", e);
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    use std::convert::TryInto;
    if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    } else {
    warn!(
    "Not subscribed to {}. Currently in {:?} state",
    xmpp_to, rdata.1
    );
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
    [3.21]
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {:#?}", e);
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    use std::convert::TryInto;
    if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state.data.counter += 1;
    let id_presence_subscribe =
    format!("id_presence_subscribe{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(
    id_presence_subscribe,
    xmpp_to.clone(),
    ));
    } else {
    warn!(
    "Not subscribed to {}. Currently in {:?} state",
    xmpp_to, rdata.1
    );
    self.state.data.counter += 1;
    let id_presence_subscribe =
    format!("id_presence_subscribe{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(
    id_presence_subscribe,
    xmpp_to.clone(),
    ));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state.data.counter += 1;
    let id_presence_subscribed =
    format!("id_presence_subscribed{}", self.state.data.counter);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(
    id_presence_subscribed,
    xmpp_to.clone(),
    ));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • replacement in src/xmpp/stanzas.rs at line 45
    [3.39861][3.36418:36481](),[3.36418][3.36418:36481]()
    pub fn make_ask_subscribe(jid: xmpp_parsers::Jid) -> Element {
    [3.39861]
    [3.36481]
    pub fn make_ask_subscribe(id: String, jid: xmpp_parsers::Jid) -> Element {
  • edit in src/xmpp/stanzas.rs at line 47
    [3.36544]
    [3.34957]
    presence.id = Some(id);
  • replacement in src/xmpp/stanzas.rs at line 52
    [3.35009][3.35009:35074]()
    pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {
    [3.35009]
    [3.35074]
    pub fn make_allow_subscribe(id: String, jid: xmpp_parsers::Jid) -> Element {
  • edit in src/xmpp/stanzas.rs at line 54
    [3.35138]
    [3.39862]
    presence.id = Some(id);
  • replacement in src/xmpp/element_processor.rs at line 0
    [3.11301][2.29911:30987]()
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
    [3.11301]
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }