Process commands in the separate function

[?]
Jan 1, 2019, 9:28 AM
UMTLHH77LGABTVKULH6ONVSBTMSFGH3CJ6GPNTFWH73AWNJZV6LQC

Dependencies

  • [2] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [3] 5OBTKGDL Update deps
  • [4] WJNXI6Z4 Fill roster
  • [5] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [6] ALP2YJIU Rename XmppState to XmppProcessState
  • [7] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [8] XGP44R5H Rework stopping xmpp connection
  • [9] OGMBXBKP Move online to XmppConnection
  • [10] QYY3KRGL Use failure instead Box<dyn Error>
  • [11] UWY5EVZ6 Add dummy roster data
  • [12] QTCUURXN Add additional requirement for command stream
  • [13] L77O4T7M Formatting and fixes
  • [14] ZI4GJ72V Add message to xmpp command
  • [15] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [16] BTOZT4JP Use failure
  • [17] NDDQQP2P Update deps
  • [18] AYQZ2UIA Update deps
  • [19] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [20] HU3NZX5Z Process self-presence via new processing code
  • [21] IK3YDPTY Update deps
  • [22] V5HDBSZM Use jid for receiver address
  • [23] 4LRBIGVT Show info about xmpp errors
  • [24] AGIW6YR3 Use shared future for signal everywhere
  • [25] TDOR5XQU Accept destination
  • [26] OANBCLN5 Move xmpp client into XmppState
  • [27] EOHEZXX3 Move request processing to structure
  • [28] UIXIQHDY Wait for commands via new processing code
  • [29] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [30] X6L47BHQ Use different structure for established xmpp connection
  • [31] QWE26TMV update deps
  • [32] VS6AHRWI Move XMPP to separate dir
  • [33] BWDUANCV Second part of processing result is only about stop_future
  • [34] PVCRPP3B Some servers don't send to in initial presence

Change contents

  • edit in src/xmpp/mod.rs at line 7
    [3.48][2.0:32]()
    use std::collections::HashMap;
  • edit in src/xmpp/mod.rs at line 14
    [3.393][2.33:68]()
    roster: HashMap<jid::Jid, ()>,
  • replacement in src/xmpp/mod.rs at line 35
    [3.19][2.69:158]()
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
    [3.19]
    [3.132]
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 38
    [3.162][2.159:207]()
    account: std::rc::Rc::new(account),
    [3.162]
    [3.207]
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
  • edit in src/xmpp/mod.rs at line 51
    [3.682]
    [3.128]
    }
  • edit in src/xmpp/mod.rs at line 53
    [3.129]
    [3.516]
    impl MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 55
    [3.554][2.208:254]()
    /// don't connect if stop_future resolved
    [3.554]
    [3.876]
    /// don't connect only if stop_future resolved
  • replacement in src/xmpp/mod.rs at line 90
    [3.435][2.255:493]()
    state: XmppState {
    client,
    roster: HashMap::new(),
    },
    [3.435]
    [3.494]
    state: XmppState { client },
  • edit in src/xmpp/mod.rs at line 94
    [3.1403]
    [3.1403]
    .map(|(conn, _)| conn)
  • edit in src/xmpp/mod.rs at line 96
    [3.1468][2.494:864]()
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
  • replacement in src/xmpp/mod.rs at line 125
    [3.3788][2.865:1000]()
    fn xmpp_processing(
    self,
    event: &Event,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    [3.3788]
    [3.2073]
    fn xmpp_processing(&mut self, event: &Event) {
  • edit in src/xmpp/mod.rs at line 127
    [3.2124][2.1001:1026]()
    future::ok(self)
  • replacement in src/xmpp/mod.rs at line 131
    [3.3983][2.1027:1178]()
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    [3.3983]
    [3.4019]
    /// or stop future was resolved
  • replacement in src/xmpp/mod.rs at line 137
    [3.4137][2.1179:1300]()
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    [3.4137]
    [3.4319]
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
  • replacement in src/xmpp/mod.rs at line 144
    [3.4335][2.1301:1457]()
    F: Future<Item = T, Error = E> + 'static,
    S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
    T: 'static,
    E: 'static,
    [3.4335]
    [3.4476]
    F: Future<Item = T, Error = E>,
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
  • replacement in src/xmpp/mod.rs at line 151
    [3.4612][2.1458:2169]()
    let XmppConnection {
    state: XmppState { client, roster },
    account,
    } = xmpp;
    client.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, roster },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    [3.4612]
    [3.1639]
    let XmppConnection { state, account } = xmpp;
    state
    .client
    .into_future()
    .select2(stop_future)
    .then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client },
    account,
    };
    xmpp.xmpp_processing(&event);
    match stop_condition(&mut xmpp, event) {
  • replacement in src/xmpp/mod.rs at line 173
    [3.2096][2.2170:2573]()
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    },
    Err(account) => future::err((account, Ok(Either::A(b)))),
    })) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::err((account, Ok(Either::A(b)))))
    [3.2096]
    [3.2940]
    Err(e) => future::err((xmpp.account, Err(e))),
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Err(e.into()),
    )))
    } else {
    future::err((account, Err(e.into())))
    }
  • replacement in src/xmpp/mod.rs at line 209
    [3.2966][2.2574:3851]()
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
    [3.2966]
    [3.5580]
    })
  • replacement in src/xmpp/mod.rs at line 215
    [3.6691][2.3852:3989]()
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    [3.6691]
    [3.4011]
    /// returns error if something went wrong
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
  • replacement in src/xmpp/mod.rs at line 228
    [3.4339][2.3990:4014]()
    Err(())
    [3.4339]
    [3.4363]
    Err(format_err!("Disconnected while online"))
  • replacement in src/xmpp/mod.rs at line 233
    [3.4394][2.4015:4092]()
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {
    [3.4394]
    [3.4471]
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
  • replacement in src/xmpp/mod.rs at line 242
    [3.4859][2.4093:4212]()
    error!("Get error instead of roster");
    Err(())
    [3.4859]
    [3.4978]
    Err(format_err!("Get error instead of roster"))
  • edit in src/xmpp/mod.rs at line 254
    [3.5865][3.5865:5907](),[3.5907][2.4213:4399]()
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
  • edit in src/xmpp/mod.rs at line 255
    [2.4441]
    [3.5907]
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
  • edit in src/xmpp/mod.rs at line 257
    [3.6134][3.6134:6168](),[3.6168][2.4442:4597]()
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
  • edit in src/xmpp/mod.rs at line 258
    [2.4631]
    [3.6168]
    _ => Err(format_err!("Unknown result of roster")),
  • replacement in src/xmpp/mod.rs at line 264
    [3.6299][2.4632:4720]()
    error!("Iq stanza without id");
    Err(())
    [3.6299]
    [3.8819]
    Err(format_err!("Iq stanza without id"))
  • replacement in src/xmpp/mod.rs at line 270
    [3.6475][2.4721:4797]()
    error!("Wrong event while waiting roster");
    Err(())
    [3.6475]
    [3.6551]
    Err(format_err!("Wrong event while waiting roster"))
  • replacement in src/xmpp/mod.rs at line 279
    [3.6717][2.4798:4858]()
    F: Future<Error = E> + 'static,
    E: 'static,
    [3.6717]
    [3.6747]
    F: Future<Error = E>,
    E: Into<failure::Error>,
  • replacement in src/xmpp/mod.rs at line 282
    [3.6753][2.4859:4976]()
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
    [3.6753]
    [3.6870]
    let XmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 289
    [3.7065][2.4977:4992]()
    client
    [3.7065]
    [3.7080]
    state
    .client
  • replacement in src/xmpp/mod.rs at line 294
    [3.7201][2.4993:5018]()
    account2
    [3.7201]
    [3.7226]
    (account2, Err(failure::SyncFailure::new(e).into()))
  • replacement in src/xmpp/mod.rs at line 298
    [3.7312][2.5019:5076]()
    state: XmppState { client, roster },
    [3.7312]
    [3.7369]
    state: XmppState { client },
  • replacement in src/xmpp/mod.rs at line 302
    [3.7497][2.5077:5376]()
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    [3.7497]
    [3.7778]
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
  • replacement in src/xmpp/mod.rs at line 323
    [3.7942][2.5377:5460]()
    F: Future<Error = E> + 'static,
    E: Into<failure::Error> + 'static,
    [3.7942]
    [3.8005]
    F: Future<Error = E>,
    E: Into<failure::Error>,
  • replacement in src/xmpp/mod.rs at line 326
    [3.8011][2.5461:5578]()
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
    [3.8011]
    [3.10624]
    let XmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 333
    [3.8183][2.5579:5594]()
    client
    [3.8183]
    [3.8198]
    state
    .client
  • replacement in src/xmpp/mod.rs at line 338
    [3.8315][2.5595:5620]()
    account2
    [3.8315]
    [3.8340]
    (account2, Err(failure::SyncFailure::new(e).into()))
  • replacement in src/xmpp/mod.rs at line 342
    [3.8426][2.5621:5678]()
    state: XmppState { client, roster },
    [3.8426]
    [3.8483]
    state: XmppState { client },
  • replacement in src/xmpp/mod.rs at line 357
    [3.9105][2.5679:5794]()
    error!("Wrong event while waiting self-presence");
    Err(())
    [3.9105]
    [3.9220]
    Err(format_err!("Wrong event while waiting self-presence"))
  • replacement in src/xmpp/mod.rs at line 362
    [3.9320][2.5795:6094]()
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    [3.9320]
    [3.5425]
    })
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
  • edit in src/xmpp/mod.rs at line 371
    [3.5440]
    [3.12130]
    ================================
    }
    fn process_command(
    self,
    _cmd: &XmppCommand,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    info!("Got command");
    future::ok(self)
    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
  • replacement in src/xmpp/mod.rs at line 387
    [3.3817][2.6095:6120]()
    struct XmppState<F, S> {
    [3.3817]
    [3.86]
    struct XmppProcessState<F, S> {
  • replacement in src/xmpp/mod.rs at line 393
    [3.3920][2.6121:6253]()
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 413
    [3.4357][2.6254:6304]()
    let conn = MaybeXmppConnection::new(account);
    [3.4357]
    [3.4526]
    let conn = account.into();
  • replacement in src/xmpp/mod.rs at line 415
    [3.4527][2.6305:6395]()
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 421
    [3.4692][2.6396:7143]()
    conn.connect(signal.clone())
    .and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    format_err!("Receive cmd error")
    })
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
    .then(|r| {
    match r {
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
    [3.4692]
    [2.7143]
    conn.connect(signal.clone()).and_then(|conn| {
    info!("xmpp connected!");
    conn.processing(|_, _| Ok(false), cmd_recv.into_future())
    .then(|r| match r {
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 430
    [2.7225][2.7225:7302]()
    conn.into(),
    )))
    [2.7225]
    [2.7302]
    conn: conn.into(),
    }))
  • replacement in src/xmpp/mod.rs at line 433
    [2.7335][2.7335:7416]()
    future::ok(future::Loop::Break((None, conn.into())))
    [2.7335]
    [3.13130]
    future::err(format_err!("Command receiver is gone"))
    })
    as Box<dyn Future<Item = _, Error = _>>,
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(cmd) = cmd {
    Box::new(conn.process_command(&cmd).then(|r| {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: match r {
    Ok(conn) => conn.into(),
    Err(account) => account.into(),
    },
    }))
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::ok(future::Loop::Break(())))
    }
    }
    Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
    },
    Err((account, r)) => Box::new(match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
  • replacement in src/xmpp/mod.rs at line 467
    [3.13156][2.7417:8180]()
    }
    Err(e) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    future::err(e)
    }
    }
    })
    })
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }
    [3.13156]
    [3.10245]
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    error!("Xmpp connection broken while get command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    }),
    })
    })