Allow process xmpp incoming stanzas with futures

[?]
Jan 1, 2019, 8:35 AM
FWJDW3G5KT66GHFS7IPPSFS3G5COH2BNN57GHTVT533FE7467QZAC

Dependencies

  • [2] UIXIQHDY Wait for commands via new processing code
  • [3] AGIW6YR3 Use shared future for signal everywhere
  • [4] QTCUURXN Add additional requirement for command stream
  • [5] IK3YDPTY Update deps
  • [6] QYY3KRGL Use failure instead Box<dyn Error>
  • [7] BWDUANCV Second part of processing result is only about stop_future
  • [8] NDDQQP2P Update deps
  • [9] UWY5EVZ6 Add dummy roster data
  • [10] XGP44R5H Rework stopping xmpp connection
  • [11] 5OBTKGDL Update deps
  • [12] OANBCLN5 Move xmpp client into XmppState
  • [13] QWE26TMV update deps
  • [14] TDOR5XQU Accept destination
  • [15] EOHEZXX3 Move request processing to structure
  • [16] V5HDBSZM Use jid for receiver address
  • [17] 4LRBIGVT Show info about xmpp errors
  • [18] AYQZ2UIA Update deps
  • [19] HU3NZX5Z Process self-presence via new processing code
  • [20] OGMBXBKP Move online to XmppConnection
  • [21] PVCRPP3B Some servers don't send to in initial presence
  • [22] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [23] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [24] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [25] L77O4T7M Formatting and fixes
  • [26] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [27] X6L47BHQ Use different structure for established xmpp connection
  • [28] VS6AHRWI Move XMPP to separate dir
  • [29] ZI4GJ72V Add message to xmpp command
  • [30] H7R7Y3FQ Use new processing code to wait online
  • [31] BTOZT4JP Use failure
  • [32] ALP2YJIU Rename XmppState to XmppProcessState
  • [33] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [34] WJNXI6Z4 Fill roster

Change contents

  • edit in src/xmpp/mod.rs at line 7
    [3.48]
    [3.420]
    use std::collections::HashMap;
  • edit in src/xmpp/mod.rs at line 16
    [3.393]
    [3.428]
    roster: HashMap<jid::Jid, ()>,
  • replacement in src/xmpp/mod.rs at line 38
    [2.19][2.19:132]()
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    [2.19]
    [2.132]
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 41
    [2.162][2.162:207]()
    account: std::rc::Rc::new(from),
    [2.162]
    [2.207]
    account: std::rc::Rc::new(account),
  • edit in src/xmpp/mod.rs at line 45
    [3.77][3.126:128](),[3.682][3.126:128](),[3.126][3.126:128]()
    }
  • edit in src/xmpp/mod.rs at line 46
    [3.129][2.233:372](),[2.372][3.772:802](),[3.772][3.772:802](),[3.802][2.373:400](),[2.400][3.578:603](),[3.527][3.578:603](),[3.603][3.499:515](),[3.535][3.499:515](),[3.111][3.499:515](),[3.829][3.499:515](),[3.174][3.499:515](),[3.499][3.499:515](),[3.515][2.401:403](),[2.403][3.515:516](),[3.515][3.515:516](),[3.516][2.404:431]()
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 47
    [3.554][2.432:483]()
    /// don't connect only if stop_future resolved
    [3.554]
    [3.876]
    /// don't connect if stop_future resolved
  • replacement in src/xmpp/mod.rs at line 82
    [3.435][2.484:557]()
    state: XmppState { client },
    [3.435]
    [3.494]
    state: XmppState {
    client,
    roster: HashMap::new(),
    },
  • edit in src/xmpp/mod.rs at line 89
    [3.1403][2.558:621]()
    .map(|(conn, _)| conn)
  • edit in src/xmpp/mod.rs at line 90
    [3.1468]
    [3.1838]
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
  • replacement in src/xmpp/mod.rs at line 124
    [3.3788][3.2022:2073]()
    fn xmpp_processing(&mut self, event: &Event) {
    [3.3788]
    [3.2073]
    fn xmpp_processing(
    self,
    event: &Event,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
  • edit in src/xmpp/mod.rs at line 129
    [3.2124]
    [3.2124]
    future::ok(self)
  • replacement in src/xmpp/mod.rs at line 134
    [3.3983][2.622:658]()
    /// or stop future was resolved
    [3.3983]
    [3.4019]
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
  • replacement in src/xmpp/mod.rs at line 142
    [3.4137][2.659:841]()
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
    [3.4137]
    [3.4319]
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
  • replacement in src/xmpp/mod.rs at line 146
    [3.4335][3.4335:4375](),[3.4335][3.4335:4375](),[3.4335][3.4335:4375](),[3.4335][3.4335:4375](),[3.4335][3.4335:4375](),[3.4375][2.842:943]()
    F: Future<Item = T, Error = E>,
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
    [3.4335]
    [3.4476]
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
  • replacement in src/xmpp/mod.rs at line 154
    [3.4612][2.944:1639]()
    let XmppConnection { state, account } = xmpp;
    state
    .client
    .into_future()
    .select2(stop_future)
    .then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client },
    account,
    };
    xmpp.xmpp_processing(&event);
    match stop_condition(&mut xmpp, event) {
    [3.4612]
    [2.1639]
    let XmppConnection {
    state: XmppState { client, roster },
    account,
    } = xmpp;
    client.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, roster },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
  • replacement in src/xmpp/mod.rs at line 175
    [2.2096][2.2096:2179](),[2.2179][3.3025:3059](),[3.3025][3.3025:3059](),[3.3059][2.2180:2290](),[2.2290][3.3255:3285](),[3.3149][3.3255:3285](),[3.742][3.3255:3285](),[3.1375][3.3255:3285](),[3.3255][3.3255:3285](),[3.3285][2.2291:2940]()
    Err(e) => future::err((xmpp.account, Err(e))),
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    [2.2096]
    [2.2940]
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    },
    Err(account) => future::err((account, Ok(Either::A(b)))),
    })) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::err((account, Ok(Either::A(b)))))
  • replacement in src/xmpp/mod.rs at line 182
    [2.2966][2.2966:3076](),[2.3076][3.1409:1478](),[3.1409][3.1409:1478](),[3.1478][3.3285:3311](),[3.3285][3.3285:3311](),[3.3311][2.3077:3695](),[3.2179][3.4505:4531](),[2.3695][3.4505:4531](),[3.2179][3.4505:4531](),[3.5636][3.4505:4531](),[3.2058][3.4505:4531](),[3.5335][3.4505:4531](),[3.4505][3.4505:4531](),[3.4531][2.3696:3719]()
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Err(e.into()),
    )))
    } else {
    future::err((account, Err(e.into())))
    }
    }
    })
    [2.2966]
    [3.5580]
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
  • replacement in src/xmpp/mod.rs at line 215
    [3.6691][2.3720:3839]()
    /// returns error if something went wrong
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
    [3.6691]
    [3.4011]
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
  • replacement in src/xmpp/mod.rs at line 228
    [3.4339][2.3840:3902]()
    Err(format_err!("Disconnected while online"))
    [3.4339]
    [3.4363]
    Err(())
  • replacement in src/xmpp/mod.rs at line 233
    [3.4394][2.3903:3992]()
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
    [3.4394]
    [3.4471]
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {
  • replacement in src/xmpp/mod.rs at line 242
    [3.4859][2.3993:4077]()
    Err(format_err!("Get error instead of roster"))
    [3.4859]
    [3.4978]
    error!("Get error instead of roster");
    Err(())
  • replacement in src/xmpp/mod.rs at line 256
    [3.5907][2.4078:4176]()
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
    [3.5907]
    [3.5907]
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
    }
  • replacement in src/xmpp/mod.rs at line 262
    [3.6168][2.4177:4260]()
    _ => Err(format_err!("Unknown result of roster")),
    [3.6168]
    [3.6168]
    _ => {
    error!("Unknown result of roster");
    Err(())
    }
  • replacement in src/xmpp/mod.rs at line 271
    [3.6299][2.4261:4326]()
    Err(format_err!("Iq stanza without id"))
    [3.6299]
    [3.8819]
    error!("Iq stanza without id");
    Err(())
  • replacement in src/xmpp/mod.rs at line 278
    [3.6475][2.4327:4392]()
    Err(format_err!("Wrong event while waiting roster"))
    [3.6475]
    [3.6551]
    error!("Wrong event while waiting roster");
    Err(())
  • replacement in src/xmpp/mod.rs at line 288
    [3.6717][3.6717:6747](),[3.6717][3.6717:6747](),[3.6747][2.4393:4426]()
    F: Future<Error = E>,
    E: Into<failure::Error>,
    [3.6717]
    [3.6747]
    F: Future<Error = E> + 'static,
    E: 'static,
  • replacement in src/xmpp/mod.rs at line 291
    [3.6753][2.4427:4481]()
    let XmppConnection { account, state } = self;
    [3.6753]
    [3.6870]
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
  • replacement in src/xmpp/mod.rs at line 301
    [3.7065][2.4482:4516]()
    state
    .client
    [3.7065]
    [3.7080]
    client
  • replacement in src/xmpp/mod.rs at line 305
    [3.7201][2.4517:4586]()
    (account2, Err(failure::SyncFailure::new(e).into()))
    [3.7201]
    [3.7226]
    account2
  • replacement in src/xmpp/mod.rs at line 309
    [3.7312][2.4587:4636]()
    state: XmppState { client },
    [3.7312]
    [3.7369]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 313
    [3.7497][2.4637:5125]()
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    [3.7497]
    [3.7778]
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
  • replacement in src/xmpp/mod.rs at line 327
    [3.7942][3.7942:8005]()
    F: Future<Error = E>,
    E: Into<failure::Error>,
    [3.7942]
    [3.8005]
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
  • replacement in src/xmpp/mod.rs at line 330
    [3.8011][2.5126:5180]()
    let XmppConnection { account, state } = self;
    [3.8011]
    [3.10624]
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
  • replacement in src/xmpp/mod.rs at line 340
    [3.8183][2.5181:5215]()
    state
    .client
    [3.8183]
    [3.8198]
    client
  • replacement in src/xmpp/mod.rs at line 344
    [3.8315][2.5216:5285]()
    (account2, Err(failure::SyncFailure::new(e).into()))
    [3.8315]
    [3.8340]
    account2
  • replacement in src/xmpp/mod.rs at line 348
    [3.8426][2.5286:5335]()
    state: XmppState { client },
    [3.8426]
    [3.8483]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 363
    [3.9105][2.5336:5424]()
    Err(format_err!("Wrong event while waiting self-presence"))
    [3.9105]
    [3.9220]
    error!("Wrong event while waiting self-presence");
    Err(())
  • edit in src/xmpp/mod.rs at line 369
    [3.9320]
    [2.5425]
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
  • edit in src/xmpp/mod.rs at line 376
    [2.5440][2.5440:5679](),[2.5679][3.9601:9616](),[3.2504][3.9601:9616]()
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
  • replacement in src/xmpp/mod.rs at line 382
    [3.3817][2.5680:5712]()
    struct XmppProcessState<F, S> {
    [3.3817]
    [3.86]
    struct XmppState<F, S> {
  • replacement in src/xmpp/mod.rs at line 388
    [3.3920][2.5713:5866]()
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
  • replacement in src/xmpp/mod.rs at line 408
    [3.4357][2.5867:5898]()
    let conn = account.into();
    [3.4357]
    [3.4526]
    let conn = MaybeXmppConnection::new(account);
  • replacement in src/xmpp/mod.rs at line 410
    [3.4527][2.5899:6003]()
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
  • replacement in src/xmpp/mod.rs at line 416
    [3.4692][2.6004:6783]()
    conn.connect(signal.clone()).and_then(|conn| {
    info!("xmpp connected!");
    conn.processing(|_, _| Ok(false), cmd_recv.into_future())
    .then(|r| match r {
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    [3.4692]
    [3.13130]
    conn.connect(signal.clone())
    .and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    format_err!("Receive cmd error")
    })
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
    .then(|r| {
    match r {
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
    cmd_recv,
    signal,
    conn.into(),
    )))
    } else {
    future::ok(future::Loop::Break((None, conn.into())))
  • replacement in src/xmpp/mod.rs at line 441
    [3.13156][2.6784:8876]()
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    info!("Got command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    },
    Err((account, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    error!("Xmpp connection broken while get command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    },
    })
    })
    [3.13156]
    [3.10245]
    }
    Err(e) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    future::err(e)
    }
    }
    })
    })
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }