Second part of processing result is only about stop_future

[?]
Dec 31, 2018, 8:21 PM
BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC

Dependencies

  • [2] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [3] V5HDBSZM Use jid for receiver address
  • [4] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [5] AGIW6YR3 Use shared future for signal everywhere
  • [6] X6L47BHQ Use different structure for established xmpp connection
  • [7] HU3NZX5Z Process self-presence via new processing code
  • [8] PFC7OJQF Query roster
  • [9] ALP2YJIU Rename XmppState to XmppProcessState
  • [10] VS6AHRWI Move XMPP to separate dir
  • [11] QTCUURXN Add additional requirement for command stream
  • [12] BTOZT4JP Use failure
  • [13] O2GM5J4F Don't split xmpp receiving and sending
  • [14] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [15] QWE26TMV update deps
  • [16] TDOR5XQU Accept destination
  • [17] OANBCLN5 Move xmpp client into XmppState
  • [18] UWY5EVZ6 Add dummy roster data
  • [19] NDDQQP2P Update deps
  • [20] AYQZ2UIA Update deps
  • [21] XGP44R5H Rework stopping xmpp connection
  • [22] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [23] 4LRBIGVT Show info about xmpp errors
  • [24] IK3YDPTY Update deps
  • [25] WJNXI6Z4 Fill roster
  • [26] PVCRPP3B Some servers don't send to in initial presence
  • [27] OGMBXBKP Move online to XmppConnection
  • [*] FVVPKFTL Initial commit

Change contents

  • edit in src/xmpp/stanzas.rs at line 2
    [3.57]
    [3.57]
    use xmpp_parsers::iq::Iq;
  • edit in src/xmpp/stanzas.rs at line 4
    [3.141]
    [3.141]
    use xmpp_parsers::roster::Roster;
  • edit in src/xmpp/stanzas.rs at line 14
    [3.419]
    pub fn make_get_roster(id: &str) -> Element {
    let mut get_roster = Iq::from_get(Roster {
    ver: None,
    items: vec![],
    });
    get_roster.id = Some(id.to_string());
    get_roster.into()
    }
  • edit in src/xmpp/mod.rs at line 7
    [3.48]
    [3.420]
    use std::collections::HashMap;
  • edit in src/xmpp/mod.rs at line 11
    [3.434]
    [3.320]
    const ID_GET_ROSTER: &str = "id_get_roster0";
  • replacement in src/xmpp/mod.rs at line 14
    [3.321][3.0:33]()
    pub struct MaybeXmppConnection {
    [3.321]
    [3.354]
    struct XmppState {
    client: Client,
    roster: HashMap<jid::Jid, ()>,
    }
    struct MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 21
    [3.397][3.34:61]()
    inner: Option<Client>,
    [3.397]
    [3.425]
    state: Option<XmppState>,
  • replacement in src/xmpp/mod.rs at line 24
    [3.106][3.62:90]()
    pub struct XmppConnection {
    [3.106]
    [3.31]
    struct XmppConnection {
  • replacement in src/xmpp/mod.rs at line 26
    [3.74][3.91:110]()
    inner: Client,
    [3.74]
    [3.448]
    state: XmppState,
  • replacement in src/xmpp/mod.rs at line 33
    [3.627][3.111:148]()
    inner: Some(from.inner),
    [3.627]
    [3.666]
    state: Some(from.state),
  • replacement in src/xmpp/mod.rs at line 42
    [3.527][3.149:174]()
    inner: None,
    [3.527]
    [3.499]
    state: None,
  • replacement in src/xmpp/mod.rs at line 53
    [3.1011][3.0:37]()
    F: future::Future + 'static,
    [3.1011]
    [3.1056]
    F: future::Future + Clone + 'static,
  • replacement in src/xmpp/mod.rs at line 57
    [3.690][3.175:234]()
    let MaybeXmppConnection { account, inner } = self;
    [3.690]
    [3.744]
    let MaybeXmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 59
    [3.745][3.235:340]()
    if let Some(inner) = inner {
    Box::new(future::ok(XmppConnection { account, inner }))
    [3.745]
    [3.1302]
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
  • edit in src/xmpp/mod.rs at line 65
    [3.1405]
    [3.1434]
    .clone()
  • replacement in src/xmpp/mod.rs at line 67
    [3.1464][3.38:99]()
    future::loop_fn(account, |account| {
    [3.1464]
    [3.1530]
    future::loop_fn(account, move |account| {
  • edit in src/xmpp/mod.rs at line 76
    [3.1477]
    [3.2164]
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
  • replacement in src/xmpp/mod.rs at line 82
    [3.435][3.341:400]()
    inner: client,
    [3.435]
    [3.494]
    state: XmppState {
    client,
    roster: HashMap::new(),
    },
  • replacement in src/xmpp/mod.rs at line 88
    [3.589][3.100:231]()
    .online()
    .and_then(XmppConnection::self_presence)
    [3.589]
    [3.720]
    .processing(XmppConnection::online, stop_future.clone())
    .map_err(|(acc, _)| acc)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
  • replacement in src/xmpp/mod.rs at line 124
    [3.3788][3.232:285]()
    fn xmpp_processing(&mut self, _event: &Event) {}
    [3.3788]
    [3.3896]
    fn xmpp_processing(&mut self, event: &Event) {
    info!("Incoming xmpp event: {:?}", event);
    }
  • replacement in src/xmpp/mod.rs at line 130
    [3.3983][3.3983:4019](),[3.3983][3.3983:4019](),[3.3983][3.3983:4019](),[3.3983][3.3983:4019]()
    /// or stop future was resolved
    [3.3983]
    [3.4019]
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
  • replacement in src/xmpp/mod.rs at line 143
    [3.4375][3.408:453]()
    S: FnMut(&mut Self, &Event) -> bool,
    [3.4375]
    [3.4476]
    S: FnMut(&mut Self, Event) -> Result<bool, ()>,
  • replacement in src/xmpp/mod.rs at line 148
    [3.4612][3.401:539]()
    let XmppConnection { inner, account } = xmpp;
    inner.into_future().select2(stop_future).then(|r| match r {
    [3.4612]
    [3.534]
    let XmppConnection {
    state: XmppState { client, roster },
    account,
    } = xmpp;
    client.into_future().select2(stop_future).then(|r| match r {
  • replacement in src/xmpp/mod.rs at line 156
    [3.708][3.540:587]()
    inner: client,
    [3.708]
    [3.777]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 160
    [3.907][3.454:742]()
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    } else {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    [3.907]
    [3.3255]
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    }
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
  • replacement in src/xmpp/mod.rs at line 174
    [3.1548][3.588:650]()
    if let Some(inner) = a.into_inner() {
    [3.1548]
    [3.1611]
    if let Some(client) = a.into_inner() {
  • replacement in src/xmpp/mod.rs at line 176
    [3.1672][3.651:718]()
    XmppConnection { inner, account },
    [3.1672]
    [3.1874]
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
  • replacement in src/xmpp/mod.rs at line 186
    [3.2270][3.719:808]()
    Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
    [3.2270]
    [3.2270]
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
  • replacement in src/xmpp/mod.rs at line 191
    [3.2318][3.809:871]()
    if let Some(inner) = a.into_inner() {
    [3.2318]
    [3.2381]
    if let Some(client) = a.into_inner() {
  • replacement in src/xmpp/mod.rs at line 193
    [3.2442][3.872:939]()
    XmppConnection { inner, account },
    [3.2442]
    [3.743]
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
  • replacement in src/xmpp/mod.rs at line 209
    [3.6691][3.6691:6737](),[3.6737][3.844:1453]()
    /// returns error if something went wrong
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
    Box::new(future::loop_fn(
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
    Some(Event::Online) => {
    info!("Online");
    future::ok(future::Loop::Break(XmppConnection {
    account,
    inner: client,
    }))
    [3.6691]
    [3.1453]
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(())
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {
    if let Event::Stanza(s) = event {
    use try_from::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if let Some(id) = iq.id {
    if id == ID_GET_ROSTER {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    error!("Get error instead of roster");
    Err(())
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into()
    as Result<xmpp_parsers::roster::Roster, _>
    {
    Ok(roster) => {
    info!("Got roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    }
    Ok(true)
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
    }
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
    }
    } else {
    Ok(false)
  • replacement in src/xmpp/mod.rs at line 264
    [3.1479][3.1479:1829](),[3.5337][3.5558:5584](),[3.6802][3.5558:5584](),[3.1829][3.5558:5584](),[3.3094][3.5558:5584](),[3.8724][3.5558:5584](),[3.5558][3.5558:5584](),[3.5584][3.1830:1996]()
    Some(Event::Stanza(s)) => {
    info!("xmpp stanza: {:?}", s);
    future::ok(future::Loop::Continue((client, account)))
    }
    _ => {
    warn!("Disconnected");
    future::err(account)
    }
    },
    Err((e, _)) => {
    error!("xmpp receive error: {}", e);
    future::err(account)
    [3.1479]
    [3.8819]
    } else {
    error!("Iq stanza without id");
    Err(())
  • edit in src/xmpp/mod.rs at line 268
    [3.8841]
    [3.1997]
    }
    Err(_e) => Ok(false),
    }
    } else {
    error!("Wrong event while waiting roster");
    Err(())
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    {
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
    use tokio::prelude::Sink;
    let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(get_roster)
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, roster },
    account,
    }
    .processing(XmppConnection::process_initial_roster, stop_future)
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
  • replacement in src/xmpp/mod.rs at line 312
    [3.2016][3.2016:2042]()
    },
    ))
    [3.2016]
    [3.8518]
    })
  • replacement in src/xmpp/mod.rs at line 315
    [3.8525][3.2043:2220]()
    fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    let XmppConnection { account, inner } = self;
    let client = inner;
    [3.8525]
    [3.10624]
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
  • edit in src/xmpp/mod.rs at line 330
    [3.10716][3.2221:2259]()
    info!("Sending presence...");
  • replacement in src/xmpp/mod.rs at line 331
    [3.10756][3.2260:2485]()
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    [3.10756]
    [3.2485]
    info!("Sending presence... {:?}", presence);
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    account2
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client, roster },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    if s.name() == "presence"
    && s.attr("from").map_or(false, |f| f == conn.account.jid)
    && s.attr("to").map_or(false, |f| f == conn.account.jid)
    {
    Ok(true)
    } else {
    Ok(false)
    }
    } else {
    error!("Wrong event while waiting self-presence");
    Err(())
    }
    },
    stop_future,
    )
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
  • replacement in src/xmpp/mod.rs at line 368
    [3.2504][3.2504:4597]()
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
    } else {
    future::ok(future::Loop::Continue((account, client)))
    }
    } else {
    future::err("Got wrong event".to_owned())
    }
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
    [3.2504]
    [3.12130]
    })
  • replacement in src/xmpp/mod.rs at line 432
    [3.13049][2.54:118]()
    future::ok(future::Loop::Break(()))
    [3.13049]
    [3.13130]
    future::ok(future::Loop::Break((None, conn.into())))
  • replacement in src/xmpp/mod.rs at line 438
    [3.6342][3.4664:4726]()
    future::err(format_err!("Cmd error"))
    [3.6342]
    [3.6572]
    future::err(e)
  • edit in src/xmpp/mod.rs at line 443
    [3.900]
    [3.7379]
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }
    })
  • edit in Cargo.toml at line 23
    [3.7111]
    [3.8552]
    try_from = "=0.2.2" # dependency of xmpp-parsers
  • edit in Cargo.lock at line 1110
    [3.44807]
    [3.44807]
    "try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",