Move out XmppConnection into own file

[?]
Apr 14, 2019, 7:38 AM
CCLGGFKRUNXBM6IOGEYJOFULCSVOL6C5PFAGCKEWNXOZIKPWLLWQC

Dependencies

  • [2] RRLRZTMR Use element processor for iq
  • [3] J7VX56FW ToDo
  • [4] 77USPY5I Sending messages works!
  • [5] TDOR5XQU Accept destination
  • [6] L3D22A5J Prepare to check incoming presence
  • [7] LL3D5CXK Staring using element processor
  • [8] 3ADA5BBX Add items to roster from iq of "set" type
  • [9] TPVUBB3F Answer to ping requests
  • [10] EBETRYK7 Add counter for id. Check for jid in roster
  • [11] HU3NZX5Z Process self-presence via new processing code
  • [12] AA2ZWGRL Enter to MUC
  • [13] FDHRCKH5 Unneded Box
  • [14] JD62RVOJ Update dependencies
  • [15] AGIW6YR3 Use shared future for signal everywhere
  • [16] IK3YDPTY Update deps
  • [17] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [18] 3GEU7TC7 Welcome to 2018!
  • [19] O2GM5J4F Don't split xmpp receiving and sending
  • [20] WJNXI6Z4 Fill roster
  • [21] SU4DNVCB Start to processing roster data
  • [22] CBWCXUZZ Prepare adding new items to roster
  • [23] SA2IOFGY Add items to roster
  • [24] DCGEFPRC Better README
  • [25] RGOSS73U Convert self-presence to xmpp_parser's type
  • [26] XGP44R5H Rework stopping xmpp connection
  • [27] DKXSFTDY Send stanzas via send queue
  • [28] 2L3JHRUL Create separate functions to process incoming XMPP stanzas
  • [29] BWDUANCV Second part of processing result is only about stop_future
  • [30] 4LRBIGVT Show info about xmpp errors
  • [31] OGMBXBKP Move online to XmppConnection
  • [32] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [33] UWY5EVZ6 Add dummy roster data
  • [34] FCPF2FV6 Break connection on iq error
  • [35] UMTLHH77 Process commands in the separate function
  • [36] ACXUIS63 Update dependecies
  • [37] NDDQQP2P Update deps
  • [38] QTCUURXN Add additional requirement for command stream
  • [39] AYQZ2UIA Update deps
  • [40] OB3HA2MD Use Client::new_with_jid to parse jid only once
  • [41] X6L47BHQ Use different structure for established xmpp connection
  • [42] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [43] HCCX7VW6 Generate ids from counter
  • [44] SYH7UQP6 Make xmpp command enum to allow different commands Save subscription ask status. Don't ask if already requested subscription.
  • [45] 2VZBEEXA Messages fixed
  • [46] 5GINRCKL Send ping XEP-0199
  • [47] 37OMJ4CK Send MUC message
  • [48] UAT5MV5O Directly use id for initial roster request
  • [49] HOAZX2PB Reorganize roster processing. Output roster
  • [50] V5HDBSZM Use jid for receiver address
  • [51] ALP2YJIU Rename XmppState to XmppProcessState
  • [52] OANBCLN5 Move xmpp client into XmppState
  • [53] XOAM22TT Simplify xmpp incoming stanzas processing without futures
  • [54] UO4WTU6U Update dependencies
  • [55] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [56] VS6AHRWI Move XMPP to separate dir
  • [57] H7R7Y3FQ Use new processing code to wait online
  • [58] QWE26TMV update deps
  • [59] UIXIQHDY Wait for commands via new processing code
  • [60] 3FYEOGCI Move additional rarely changed data to separate structure
  • [61] PVCRPP3B Some servers don't send to in initial presence
  • [62] 5Y6YJ6UH Add shutdown function to make actions before offline
  • [63] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection

Change contents

  • file addition: xmpp_connection.rs (-xw-x--x--)
    [3.7]
    use tokio_xmpp::{Client, Event, Packet};
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use std::collections::{HashMap, VecDeque};
    use super::XmppCommand;
    use super::stanzas;
    use super::element_processor;
    use crate::config;
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {:#?}", e);
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    use std::convert::TryInto;
    if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    pub fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    let sub_from = match rdata.0 {
    xmpp_parsers::roster::Subscription::From => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if !sub_from {
    info!("Not subscription from {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    pub fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • edit in src/xmpp/stanzas.rs at line 47
    [2.812]
    [2.812]
    presence.to = Some(jid);
    presence.into()
    }
    pub fn make_allow_subscribe(jid: xmpp_parsers::Jid) -> Element {
    let mut presence = Presence::new(PresenceType::Subscribed);
  • replacement in src/xmpp/mod.rs at line 0
    [3.17][3.18:62]()
    use tokio::prelude::future::{self, Either};
    [3.17]
    [3.62]
    use tokio::prelude::future::{self, Either, Future};
  • edit in src/xmpp/mod.rs at line 2
    [3.90][3.90:128](),[3.163][3.163:164](),[3.164][2.2363:2404]()
    use tokio::prelude::{Future, Stream};
    use tokio_xmpp::{Client, Event, Packet};
  • edit in src/xmpp/mod.rs at line 4
    [3.48][3.1192:1193](),[3.1193][2.2405:2448](),[3.306][3.420:434](),[3.489][3.420:434](),[3.305][3.420:434](),[3.43][3.420:434](),[3.262][3.420:434](),[3.32][3.420:434](),[3.34][3.420:434](),[3.48][3.420:434](),[3.1236][3.420:434](),[3.32][3.420:434](),[3.32][3.420:434](),[3.48][3.420:434](),[2.2448][3.420:434](),[3.210][3.420:434]()
    use std::collections::{HashMap, VecDeque};
    mod stanzas;
  • edit in src/xmpp/mod.rs at line 6
    [3.433][3.430:431](),[3.430][3.430:431](),[3.431][3.1237:1274](),[3.1274][2.2449:3031](),[3.329][3.1786:1848](),[3.490][3.1786:1848](),[3.1104][3.1786:1848](),[3.378][3.1786:1848](),[2.3031][3.1786:1848](),[3.1786][3.1786:1848](),[3.1848][3.491:524](),[3.524][2.3032:3121](),[3.611][3.1848:1851](),[2.3121][3.1848:1851](),[3.1848][3.1848:1851](),[3.1851][3.612:679](),[3.679][2.3122:3336](),[2.3336][3.795:847](),[3.795][3.795:847](),[3.847][3.1851:1880](),[3.1851][3.1851:1880](),[3.460][3.354:397](),[3.392][3.354:397](),[3.1880][3.354:397](),[3.33][3.354:397](),[3.33][3.354:397](),[3.354][3.354:397](),[3.397][3.1881:1911](),[3.491][3.425:427](),[3.423][3.425:427](),[3.1911][3.425:427](),[3.61][3.425:427](),[3.27][3.425:427](),[3.61][3.425:427](),[3.425][3.425:427](),[3.427][3.105:106](),[3.105][3.105:106](),[3.106][3.1912:1936](),[3.101][3.31:74](),[3.516][3.31:74](),[3.448][3.31:74](),[3.1936][3.31:74](),[3.90][3.31:74](),[3.90][3.31:74](),[3.31][3.31:74](),[3.74][3.1937:1959](),[3.539][3.448:627](),[3.471][3.448:627](),[3.1959][3.448:627](),[3.110][3.448:627](),[3.47][3.448:627](),[3.110][3.448:627](),[3.448][3.448:627](),[3.627][3.1960:1997](),[3.1997][3.0:19](),[3.148][3.0:19](),[3.577][3.0:19](),[3.19][3.1998:2111](),[3.172][3.132:162](),[3.113][3.132:162](),[3.158][3.132:162](),[3.677][3.132:162](),[3.2111][3.132:162](),[3.238][3.132:162](),[3.132][3.132:162](),[3.162][3.2112:2422](),[3.2422][3.748:764](),[3.312][3.748:764](),[3.748][3.748:764](),[3.764][3.2423:2425](),[3.2425][3.766:767](),[3.766][3.766:767](),[3.767][3.2426:2453](),[3.431][3.516:554](),[3.430][3.516:554](),[3.994][3.516:554](),[3.2453][3.516:554](),[3.516][3.516:554](),[3.554][3.2454:2505](),[3.483][3.876:1011](),[3.268][3.876:1011](),[3.482][3.876:1011](),[3.254][3.876:1011](),[3.1046][3.876:1011](),[3.2505][3.876:1011](),[3.359][3.876:1011](),[3.876][3.876:1011](),[3.1011][3.604:649](),[3.649][3.1056:1132](),[3.581][3.1056:1132](),[3.37][3.1056:1132](),[3.149][3.1056:1132](),[3.1056][3.1056:1132](),[3.648][3.653:690](),[3.282][3.653:690](),[3.302][3.653:690](),[3.460][3.653:690](),[3.563][3.653:690](),[3.282][3.653:690](),[3.152][3.653:690](),[3.257][3.653:690](),[3.1132][3.653:690](),[3.109][3.653:690](),[3.781][3.653:690](),[3.653][3.653:690](),[3.690][3.2506:2565](),[3.337][3.744:745](),[3.709][3.744:745](),[3.641][3.744:745](),[3.623][3.744:745](),[3.337][3.744:745](),[3.213][3.744:745](),[3.2565][3.744:745](),[3.419][3.744:745](),[3.312][3.744:745](),[3.209][3.744:745](),[3.1193][3.744:745](),[3.841][3.744:745](),[3.234][3.744:745](),[3.744][3.744:745](),[3.745][3.2566:2671](),[3.2671][2.3337:3393](),[3.598][3.944:961](),[3.769][3.944:961](),[3.1197][3.944:961](),[3.500][3.944:961](),[3.355][3.944:961](),[3.900][3.944:961](),[3.581][3.944:961](),[3.1155][3.944:961](),[3.744][3.944:961](),[3.500][3.944:961](),[3.320][3.944:961](),[3.2727][3.944:961](),[3.577][3.944:961](),[3.475][3.944:961](),[3.1354][3.944:961](),[3.216][3.944:961](),[3.162][3.944:961](),[3.962][3.944:961](),[3.216][3.944:961](),[2.3393][3.944:961](),[3.944][3.944:961](),[3.961][3.1355:1405](),[3.1405][3.816:845](),[3.845][3.1434:1464](),[3.777][3.1434:1464](),[3.1434][3.1434:1464](),[3.1464][3.846:912](),[3.912][3.1530:1591](),[3.844][3.1530:1591](),[3.99][3.1530:1591](),[3.377][3.1530:1591](),[3.1530][3.1530:1591](),[3.1591][2.3394:3584](),[3.1391][3.1476:1477](),[3.1388][3.1476:1477](),[3.1016][3.1476:1477](),[3.612][3.1476:1477](),[3.998][3.1476:1477](),[3.1088][3.1476:1477](),[3.1320][3.1476:1477](),[3.349][3.1476:1477](),[3.1346][3.1476:1477](),[3.1016][3.1476:1477](),[3.2918][3.1476:1477](),[3.2010][3.1476:1477](),[3.678][3.1476:1477](),[3.1618][3.1476:1477](),[3.636][3.1476:1477](),[2.3584][3.1476:1477](),[3.1476][3.1476:1477](),[3.1477][2.3585:3789](),[3.1593][3.1065:1066](),[3.765][3.1065:1066](),[3.1473][3.1065:1066](),[3.1551][3.1065:1066](),[3.3055][3.1065:1066](),[3.789][3.1065:1066](),[2.3789][3.1065:1066](),[3.1065][3.1065:1066](),[3.1066][2.3790:4201](),[3.2005][3.3128:3162](),[3.2431][3.3128:3162](),[3.3139][3.3128:3162](),[3.1963][3.3128:3162](),[3.3467][3.3128:3162](),[3.2455][3.3128:3162](),[2.4201][3.3128:3162](),[3.3128][3.3128:3162](),[3.3162][2.4202:5219](),[3.3023][3.3192:3287](),[3.2462][3.3192:3287](),[3.3170][3.3192:3287](),[3.2981][3.3192:3287](),[3.4405][3.3192:3287](),[3.2486][3.3192:3287](),[2.5219][3.3192:3287](),[3.3192][3.3192:3287](),[3.1167][3.2319:2359](),[3.3287][3.2319:2359](),[3.2319][3.2319:2359](),[3.2359][3.3288:3733](),[3.2883][3.1858:1868](),[3.1359][3.1858:1868](),[3.2804][3.1858:1868](),[3.1359][3.1858:1868](),[3.3733][3.1858:1868](),[3.1137][3.1858:1868](),[3.3372][3.1858:1868](),[3.1858][3.1858:1868](),[3.1868][3.2884:2890](),[3.2890][3.3734:3736](),[3.3736][3.2997:2998](),[3.3375][3.2997:2998](),[3.2997][3.2997:2998](),[3.2998][3.3737:3788](),[3.3788][2.5220:5324](),[3.1289][3.4545:4606](),[3.3163][3.4545:4606](),[3.666][3.4545:4606](),[3.3306][3.4545:4606](),[3.1209][3.4545:4606](),[3.395][3.4545:4606](),[2.5324][3.4545:4606](),[3.4545][3.4545:4606](),[3.4606][3.3307:3369](),[3.3369][2.5325:5391](),[3.4784][3.8539:8553](),[3.6248][3.8539:8553](),[3.3049][3.8539:8553](),[3.3603][3.8539:8553](),[3.2217][3.8539:8553](),[3.1135][3.8539:8553](),[2.5391][3.8539:8553](),[3.8539][3.8539:8553](),[3.8553][2.5392:5427](),[3.4820][3.8600:8670](),[3.6296][3.8600:8670](),[3.3097][3.8600:8670](),[3.3651][3.8600:8670](),[3.2253][3.8600:8670](),[3.1171][3.8600:8670](),[2.5427][3.8600:8670](),[3.8600][3.8600:8670](),[3.8670][2.5428:7935](),[3.4843][3.8712:8726](),[3.6339][3.8712:8726](),[3.3140][3.8712:8726](),[3.3694][3.8712:8726](),[3.2276][3.8712:8726](),[3.1194][3.8712:8726](),[2.7935][3.8712:8726](),[3.8712][3.8712:8726](),[3.8726][2.7936:7967](),[2.7967][3.8726:8736](),[3.8726][3.8726:8736](),[3.8736][2.7968:7981](),[3.2431][3.2124:2130](),[3.871][3.2124:2130](),[3.1026][3.2124:2130](),[3.1227][3.2124:2130](),[3.8736][3.2124:2130](),[3.970][3.2124:2130](),[3.920][3.2124:2130](),[2.7981][3.2124:2130](),[3.2124][3.2124:2130](),[3.2130][3.3896:3983](),[3.1590][3.3896:3983](),[3.285][3.3896:3983](),[3.970][3.3896:3983](),[3.3896][3.3896:3983](),[3.3983][3.8737:8888](),[3.658][3.4019:4137](),[3.2282][3.4019:4137](),[3.709][3.4019:4137](),[3.1178][3.4019:4137](),[3.8888][3.4019:4137](),[3.1007][3.4019:4137](),[3.1072][3.4019:4137](),[3.4019][3.4019:4137](),[3.4137][3.8889:9010](),[3.841][3.4319:4335](),[3.1773][3.4319:4335](),[3.892][3.4319:4335](),[3.1300][3.4319:4335](),[3.9010][3.4319:4335](),[3.1190][3.4319:4335](),[3.407][3.4319:4335](),[3.1092][3.4319:4335](),[3.1194][3.4319:4335](),[3.4319][3.4319:4335](),[3.4335][3.9011:9167](),[3.943][3.4476:4612](),[3.2339][3.4476:4612](),[3.1875][3.4476:4612](),[3.1034][3.4476:4612](),[3.1457][3.4476:4612](),[3.9167][3.4476:4612](),[3.1333][3.4476:4612](),[3.453][3.4476:4612](),[3.1138][3.4476:4612](),[3.1351][3.4476:4612](),[3.4476][3.4476:4612](),[3.4612][3.9168:9205](),[3.9205][2.7982:8041](),[3.3750][3.9264:9319](),[2.8041][3.9264:9319](),[3.9264][3.9264:9319](),[3.9319][2.8042:14798](),[3.7546][3.5580:5595](),[3.6633][3.5580:5595](),[3.3719][3.5580:5595](),[3.6982][3.5580:5595](),[3.4691][3.5580:5595](),[3.6822][3.5580:5595](),[3.3484][3.5580:5595](),[3.3510][3.5580:5595](),[3.3851][3.5580:5595](),[3.4185][3.5580:5595](),[3.16291][3.5580:5595](),[3.3480][3.5580:5595](),[3.2864][3.5580:5595](),[2.14798][3.5580:5595](),[3.5580][3.5580:5595](),[3.5595][3.6613:6623](),[3.3945][3.3349:3355](),[3.2690][3.3349:3355](),[3.1027][3.3349:3355](),[3.2690][3.3349:3355](),[3.1842][3.3349:3355](),[3.6623][3.3349:3355](),[3.1060][3.3349:3355](),[3.6318][3.3349:3355](),[3.1761][3.3349:3355](),[3.3349][3.3349:3355]()
    #[derive(Default)]
    struct XmppData {
    /// known roster data
    roster: HashMap<
    xmpp_parsers::Jid,
    (
    xmpp_parsers::roster::Subscription,
    xmpp_parsers::roster::Ask,
    ),
    >,
    /// ids counter
    counter: usize,
    /// map from id of adding item to roster and jid of item
    pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox
    outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    }
    struct XmppState {
    client: Client,
    data: XmppData,
    }
    struct XmppElementProcessor {
    incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
    }
    impl XmppElementProcessor {
    fn new() -> XmppElementProcessor {
    let mut incoming = element_processor::Processor::new(&|_, e| {
    warn!("Unknown stanza {:#?}", e);
    true
    });
    incoming.register(&XmppConnection::incoming_iq_processing);
    XmppElementProcessor { incoming }
    }
    }
    struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
    }
    struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    state: Some(from.state),
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
    info!("xmpp connection...");
    let MaybeXmppConnection { account, state } = self;
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    stop_future
    .clone()
    .select2(
    future::loop_fn(account, move |account| {
    info!("xmpp initialization...");
    let client =
    Client::new_with_jid(account.jid.clone(), &account.password);
    info!("xmpp initialized");
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
    let stop_future4 = stop_future.clone();
    // future to wait for online
    Box::new(
    XmppConnection {
    state: XmppState {
    client,
    data: std::default::Default::default(),
    },
    account,
    }
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .and_then(|conn| conn.enter_mucs(stop_future4))
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    }),
    )
    })
    .map_err(|_: ()| ()),
    )
    .then(|r| match r {
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
    }
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    match event {
    Event::Stanza(stanza) => {
    let processors = XmppElementProcessor::new();
    processors.incoming.process(self, stanza.clone())
    }
    Event::Online => true,
    e => {
    warn!("Unexpected event {:?}", e);
    false
    }
    }
    }
    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
    use std::convert::TryInto;
    if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
    if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
    if self.state.data.roster.contains_key(&jid) {
    info!("Jid {} updated to roster", jid);
    } else {
    info!("Jid {} added in roster", jid);
    self.state.data.roster.insert(
    jid.clone(),
    (
    xmpp_parsers::roster::Subscription::None,
    xmpp_parsers::roster::Ask::None,
    ),
    );
    }
    self.process_jid(&jid);
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
    }
    }
    match iq.payload {
    xmpp_parsers::iq::IqType::Set(element) => {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    rdata.1 = i.ask;
    } else {
    info!("Add {} to roster", i.jid);
    self.state
    .data
    .roster
    .insert(i.jid.clone(), (i.subscription, i.ask));
    }
    self.process_jid(&i.jid);
    }
    }
    }
    xmpp_parsers::iq::IqType::Error(e) => {
    error!("iq error: {:?}", e);
    return false;
    }
    xmpp_parsers::iq::IqType::Get(element) => {
    if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
    let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
    self.state.data.send_queue.push_back(pong);
    }
    }
    _ => (), // ignore
    }
    true
    }
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection {
    state: XmppState { client, mut data },
    account,
    } = xmpp;
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(
    client
    .send(Packet::Stanza(send_element))
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    b,
    stop_condition,
    ))))
    as Box<dyn Future<Item = _, Error = _>>
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    }),
    ) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    },
    )
    }
  • replacement in src/xmpp/mod.rs at line 7
    [3.3356][3.6624:6691](),[3.6691][3.16292:16429](),[3.3839][3.4011:4339](),[3.3630][3.4011:4339](),[3.3989][3.4011:4339](),[3.16429][3.4011:4339](),[3.3601][3.4011:4339](),[3.4756][3.4011:4339](),[3.4011][3.4011:4339](),[3.4339][3.16430:16484]()
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    [3.3356]
    [3.16484]
    mod stanzas;
  • replacement in src/xmpp/mod.rs at line 9
    [3.16485][2.14799:14898](),[3.6900][3.16562:16604](),[3.1320][3.16562:16604](),[2.14898][3.16562:16604](),[3.16562][3.16562:16604](),[3.16604][2.14899:14938](),[3.6453][3.16639:16735](),[3.6936][3.16639:16735](),[2.14938][3.16639:16735](),[3.16639][3.16639:16735](),[3.16735][2.14939:15999](),[3.7514][3.18104:18146](),[3.6606][3.18104:18146](),[3.8330][3.18104:18146](),[3.2869][3.18104:18146](),[2.15999][3.18104:18146](),[3.18104][3.18104:18146](),[3.18146][2.16000:16049](),[3.7564][3.18374:18412](),[3.6835][3.18374:18412](),[3.3131][3.18374:18412](),[2.16049][3.18374:18412](),[3.18374][3.18374:18412](),[3.18412][2.16050:16262](),[3.7777][3.6991:7025](),[2.16262][3.6991:7025](),[3.6991][3.6991:7025](),[3.7025][2.16263:16436](),[3.8520][3.7921:7951](),[2.16436][3.7921:7951](),[3.7921][3.7921:7951](),[3.7951][3.18736:18791](),[3.7097][3.18736:18791](),[3.8592][3.18736:18791](),[3.18736][3.18736:18791](),[3.18791][2.16437:16471](),[3.7986][3.18879:18957](),[3.7186][3.18879:18957](),[3.8681][3.18879:18957](),[3.3340][3.18879:18957](),[2.16471][3.18879:18957](),[3.18879][3.18879:18957](),[3.3902][3.4363:4377](),[3.3693][3.4363:4377](),[3.4014][3.4363:4377](),[3.18957][3.4363:4377](),[3.3664][3.4363:4377](),[3.4781][3.4363:4377](),[3.4363][3.4363:4377](),[3.4377][3.18958:19051](),[3.19051][3.4377:4387](),[3.4377][3.4377:4387](),[3.7793][3.8518:8524](),[3.2042][3.8518:8524](),[3.8518][3.8518:8524](),[3.8524][3.19052:19318](),[3.19318][2.16472:16523](),[3.8729][3.19369:19421](),[2.16523][3.19369:19421](),[3.19369][3.19369:19421](),[3.19421][3.8524:8525](),[3.8524][3.8524:8525](),[3.8525][2.16524:16691](),[3.8796][3.19641:19750](),[2.16691][3.19641:19750](),[3.19641][3.19641:19750](),[3.19750][2.16692:16738](),[3.11465][3.19780:20084](),[3.8827][3.19780:20084](),[2.16738][3.19780:20084](),[3.19780][3.19780:20084](),[3.20084][2.16739:16911](),[3.8909][3.20165:20486](),[3.1638][3.20165:20486](),[2.16911][3.20165:20486](),[3.20165][3.20165:20486](),[3.20486][3.7794:7942](),[3.8525][3.7794:7942](),[3.7942][3.20487:20570](),[3.5010][3.8005:8011](),[3.5460][3.8005:8011](),[3.20570][3.8005:8011](),[3.3728][3.8005:8011](),[3.6431][3.8005:8011](),[3.8005][3.8005:8011](),[3.8011][3.20571:20686](),[3.5180][3.10624:10716](),[3.8128][3.10624:10716](),[3.8796][3.10624:10716](),[3.5065][3.10624:10716](),[3.5578][3.10624:10716](),[3.20686][3.10624:10716](),[3.3811][3.10624:10716](),[3.2220][3.10624:10716](),[3.3485][3.10624:10716](),[3.3174][3.10624:10716](),[3.6651][3.10624:10716](),[3.10624][3.10624:10716](),[3.3850][3.10716:10756](),[3.2259][3.10716:10756](),[3.3524][3.10716:10756](),[3.10716][3.10716:10756](),[3.10756][3.20687:20740]()
    fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if iq.id == id_init_roster {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
    Ok(roster) => {
    self.state.data.roster.clear();
    info!("Got first roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, i.ask));
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
    }
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    use tokio::prelude::Sink;
    data.counter += 1;
    let id_init_roster = format!("id_init_roster{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(Packet::Stanza(get_roster))
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| conn.process_initial_roster(event, &id_init_roster),
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection {
    account,
    state: XmppState { client, data },
    } = self;
    use tokio::prelude::Sink;
    let presence = stanzas::make_presence(&account);
    let account2 = account.clone();
    info!("Sending presence... {:?}", presence);
    [3.16485]
    [3.20740]
    mod xmpp_connection;
    use xmpp_connection::MaybeXmppConnection;
  • edit in src/xmpp/mod.rs at line 12
    [3.20741][3.6652:6667](),[3.8183][3.6652:6667](),[3.6667][2.16912:16956](),[3.11510][3.8226:8315](),[3.8938][3.8226:8315](),[2.16956][3.8226:8315](),[3.8226][3.8226:8315](),[3.8315][3.20742:20767](),[3.5285][3.8340:8426](),[3.5170][3.8340:8426](),[3.5620][3.8340:8426](),[3.20767][3.8340:8426](),[3.3920][3.8340:8426](),[3.6693][3.8340:8426](),[3.8340][3.8340:8426](),[3.8426][3.20768:20823](),[3.5335][3.8483:8658](),[3.3966][3.8483:8658](),[3.5220][3.8483:8658](),[3.5678][3.8483:8658](),[3.4551][3.8483:8658](),[3.777][3.8483:8658](),[3.20823][3.8483:8658](),[3.3956][3.8483:8658](),[3.6853][3.8483:8658](),[3.8483][3.8483:8658](),[3.8658][2.16957:17484](),[3.8514][3.9042:9105](),[3.9323][3.9042:9105](),[3.21208][3.9042:9105](),[3.4480][3.9042:9105](),[2.17484][3.9042:9105](),[3.9042][3.9042:9105](),[3.9105][3.21209:21324](),[3.5424][3.9220:9320](),[3.5309][3.9220:9320](),[3.5794][3.9220:9320](),[3.21324][3.9220:9320](),[3.4569][3.9220:9320](),[3.6969][3.9220:9320](),[3.9220][3.9220:9320](),[3.9320][3.21325:21624](),[3.21624][3.4570:4585](),[3.9320][3.4570:4585](),[3.4585][3.21625:21632](),[3.21632][2.17485:19528](),[3.9463][3.21686:21716](),[2.19528][3.21686:21716](),[3.21686][3.21686:21716](),[3.21716][2.19529:20613](),[3.5300][3.23186:23196](),[3.755][3.23186:23196](),[3.9665][3.23186:23196](),[3.831][3.23186:23196](),[3.4593][3.23186:23196](),[3.3544][3.23186:23196](),[2.20613][3.23186:23196](),[3.23186][3.23186:23196](),[3.23196][2.20614:23149](),[3.9691][3.5042:5048](),[2.23149][3.5042:5048](),[3.5042][3.5042:5048](),[3.5470][3.3789:3792](),[3.12136][3.3789:3792](),[3.3125][3.3789:3792](),[3.3789][3.3789:3792]()
    client
    .send(Packet::Stanza(presence))
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, data },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    use std::convert::TryInto;
    match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
    Ok(presence) => {
    Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
    }
    Err(e) => {
    warn!("Not a self-presence: {}", e);
    Ok(false)
    }
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    })
    }
    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
    if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
    if !mailbox.is_empty() {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
    info!("Jid {} in roster", xmpp_to);
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", xmpp_to);
    self.state.data.send_queue.extend(
    mailbox.drain(..).map(|message| {
    stanzas::make_chat_message(xmpp_to.clone(), message)
    }),
    );
    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
    info!("Not subscribed to {}", xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
    }
    } else {
    info!("Jid {} not in roster", xmpp_to);
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, xmpp_to.clone());
    info!("Adding jid to roster... {:?}", add_roster);
    self.state.data.send_queue.push_back(add_roster);
    }
    }
    }
    }
    fn process_command(&mut self, cmd: XmppCommand) {
    info!("Got command");
    match cmd {
    XmppCommand::Chat { xmpp_to, message } => {
    self.state
    .data
    .outgoing_mailbox
    .entry(xmpp_to.clone())
    .or_default()
    .push(message);
    self.process_jid(&xmpp_to);
    }
    XmppCommand::Chatroom { muc_id, message } => {
    if let Some(muc) = self.state.data.mucs.get(&muc_id) {
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_muc_message(muc.clone(), message));
    } else {
    error!("Not found MUC {}", muc_id);
    }
    }
    XmppCommand::Ping => {
    self.state.data.counter += 1;
    let id_ping = format!("id_ping{}", self.state.data.counter);
    let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
    self.state.data.send_queue.push_back(ping);
    }
    }
    }
    fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
    info!("Shutdown connection");
    let XmppConnection { account, state } = self;
    stream::iter_ok(
    state
    .data
    .mucs
    .values()
    .map(std::clone::Clone::clone)
    .collect::<Vec<_>>(),
    )
    .fold(state, move |XmppState { client, data }, muc_jid| {
    let muc_presence =
    stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
    info!("Sending muc leave presence... {:?}", muc_presence);
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    e
    })
    .and_then(|client| future::ok(XmppState { client, data }))
    })
    .map(|_| ())
    }
    fn enter_mucs<F, E>(
    self,
    _stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    {
    let XmppConnection { account, state } = self;
    let account2 = account.clone();
    let account3 = account.clone();
    stream::iter_ok(account.chatrooms.clone())
    .fold(state, move |XmppState { client, mut data }, muc_jid| {
    data.counter += 1;
    let id_muc_presence = format!("id_muc_presence{}", data.counter);
    let muc_presence = stanzas::make_muc_presence(
    &id_muc_presence,
    account2.jid.clone(),
    muc_jid.1.clone(),
    );
    info!("Sending muc presence... {:?}", muc_presence);
    let account4 = account2.clone();
    use tokio::prelude::Sink;
    client
    .send(Packet::Stanza(muc_presence))
    .map_err(|e| {
    error!("Error on send muc presence: {}", e);
    account4
    })
    .and_then(|client| {
    data.mucs.insert(muc_jid.0, muc_jid.1);
    future::ok(XmppState { client, data })
    })
    })
    .map(|state| XmppConnection {
    account: account3,
    state,
    })
    }
    }
  • replacement in src/xmpp/element_processor.rs at line 0
    [3.11301][2.24939:26015]()
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }
    [3.11301]
    type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
    pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
    processors: Vec<Box<Func<S, Option<T>, E>>>,
    default: &'static Func<S, T, E>,
    }
    impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
    pub fn new<F>(f: &'static F) -> Processor<S, T, E>
    where
    F: Fn(&mut S, E) -> T + 'static,
    {
    Processor {
    processors: vec![],
    default: f,
    }
    }
    pub fn register<F, A>(&mut self, f: &'static F)
    where
    F: Fn(&mut S, A) -> T + 'static,
    A: std::convert::TryFrom<E>,
    {
    self.processors.push(Box::new(move |s, e: E| {
    use std::convert::TryInto;
    (e.try_into().ok() as Option<A>).map(|a| f(s, a))
    }));
    }
    pub fn process(&self, s: &mut S, e: E) -> T {
    for processor in self.processors.iter() {
    match processor(s, e.clone()) {
    Some(t) => return t,
    None => continue,
    }
    }
    (*self.default)(s, e)
    }
    }