Wait for commands via new processing code

[?]
Dec 31, 2018, 10:41 PM
UIXIQHDY7E4DLLRQP4ATXF625QC4KGMHWOV24UEQ6WD7G2IPM4HAC

Dependencies

  • [2] BWDUANCV Second part of processing result is only about stop_future
  • [3] PFC7OJQF Query roster
  • [4] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [5] QYY3KRGL Use failure instead Box<dyn Error>
  • [6] XGP44R5H Rework stopping xmpp connection
  • [7] V5HDBSZM Use jid for receiver address
  • [8] IK3YDPTY Update deps
  • [9] X6L47BHQ Use different structure for established xmpp connection
  • [10] ZI4GJ72V Add message to xmpp command
  • [11] OANBCLN5 Move xmpp client into XmppState
  • [12] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [13] NDDQQP2P Update deps
  • [14] EOHEZXX3 Move request processing to structure
  • [15] TDOR5XQU Accept destination
  • [16] H7R7Y3FQ Use new processing code to wait online
  • [17] HU3NZX5Z Process self-presence via new processing code
  • [18] 3GEU7TC7 Welcome to 2018!
  • [19] OGMBXBKP Move online to XmppConnection
  • [20] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [21] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [22] L77O4T7M Formatting and fixes
  • [23] WJNXI6Z4 Fill roster
  • [24] 4LRBIGVT Show info about xmpp errors
  • [25] AGIW6YR3 Use shared future for signal everywhere
  • [26] PVCRPP3B Some servers don't send to in initial presence
  • [27] ALP2YJIU Rename XmppState to XmppProcessState
  • [28] QWE26TMV update deps
  • [29] AYQZ2UIA Update deps
  • [30] QTCUURXN Add additional requirement for command stream
  • [31] VS6AHRWI Move XMPP to separate dir
  • [32] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [33] 5OBTKGDL Update deps
  • [34] BTOZT4JP Use failure
  • [35] UWY5EVZ6 Add dummy roster data

Change contents

  • edit in src/xmpp/mod.rs at line 7
    [3.48][2.273:305]()
    use std::collections::HashMap;
  • edit in src/xmpp/mod.rs at line 14
    [2.393][2.393:428]()
    roster: HashMap<jid::Jid, ()>,
  • edit in src/xmpp/mod.rs at line 31
    [2.577]
    [3.666]
    }
    }
    }
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: std::rc::Rc::new(from),
    state: None,
  • replacement in src/xmpp/mod.rs at line 44
    [3.129][3.683:772]()
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
    [3.129]
    [3.772]
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 47
    [3.182][3.479:527](),[3.478][3.479:527](),[3.182][3.479:527](),[3.153][3.479:527](),[3.182][3.479:527](),[3.802][3.479:527](),[3.478][3.479:527](),[3.453][3.479:527]()
    account: std::rc::Rc::new(account),
    [3.802]
    [2.578]
    account: from,
  • edit in src/xmpp/mod.rs at line 51
    [3.515]
    [3.515]
    }
  • edit in src/xmpp/mod.rs at line 53
    [3.516]
    [3.516]
    impl MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 55
    [3.554][3.830:876]()
    /// don't connect if stop_future resolved
    [3.554]
    [3.876]
    /// don't connect only if stop_future resolved
  • replacement in src/xmpp/mod.rs at line 90
    [3.435][2.1067:1305]()
    state: XmppState {
    client,
    roster: HashMap::new(),
    },
    [3.435]
    [3.494]
    state: XmppState { client },
  • edit in src/xmpp/mod.rs at line 94
    [2.1403]
    [2.1403]
    .map(|(conn, _)| conn)
  • edit in src/xmpp/mod.rs at line 96
    [2.1468][2.1468:1838]()
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
  • replacement in src/xmpp/mod.rs at line 131
    [3.3983][2.2131:2282]()
    /// or stop future was resolved.
    /// Return item if connection was preserved or error otherwise.
    /// Second part is a state of stop_future
    [3.3983]
    [3.4019]
    /// or stop future was resolved
  • replacement in src/xmpp/mod.rs at line 137
    [3.4137][3.286:407]()
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    [3.4137]
    [3.4319]
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
  • replacement in src/xmpp/mod.rs at line 145
    [3.4375][2.2283:2339]()
    S: FnMut(&mut Self, Event) -> Result<bool, ()>,
    [3.4375]
    [3.4476]
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
  • replacement in src/xmpp/mod.rs at line 151
    [3.4612][2.2340:2566](),[2.2566][3.534:708](),[3.539][3.534:708](),[3.534][3.534:708](),[3.708][2.2567:2636](),[2.2636][3.777:907](),[3.587][3.777:907](),[3.777][3.777:907](),[3.907][2.2637:3025]()
    let XmppConnection {
    state: XmppState { client, roster },
    account,
    } = xmpp;
    client.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, roster },
    account,
    };
    xmpp.xmpp_processing(&event);
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    [3.4612]
    [2.3025]
    let XmppConnection { state, account } = xmpp;
    state
    .client
    .into_future()
    .select2(stop_future)
    .then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client },
    account,
    };
    xmpp.xmpp_processing(&event);
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(e) => future::err((xmpp.account, Err(e))),
  • replacement in src/xmpp/mod.rs at line 175
    [2.3059][2.3059:3149]()
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    [2.3059]
    [3.3255]
    } else {
    future::err((account, Ok(Either::A(b))))
  • replacement in src/xmpp/mod.rs at line 178
    [3.3285][3.1376:1409]()
    } else {
    [3.3285]
    [3.1409]
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
  • replacement in src/xmpp/mod.rs at line 196
    [3.3311][3.1479:1548](),[3.1548][2.3150:3213](),[2.3213][3.1611:1672](),[3.650][3.1611:1672](),[3.1611][3.1611:1672](),[3.1672][2.3214:3416](),[2.3416][3.1874:2058](),[3.718][3.1874:2058](),[3.1874][3.1874:2058]()
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    [3.3311]
    [3.4505]
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Err(e.into()),
    )))
    } else {
    future::err((account, Err(e.into())))
    }
  • replacement in src/xmpp/mod.rs at line 209
    [3.2248][3.2248:2270](),[3.2270][2.3417:3606](),[2.3606][3.2270:2318](),[3.808][3.2270:2318](),[3.2270][3.2270:2318](),[3.2318][2.3607:3670](),[2.3670][3.2381:2442](),[3.871][3.2381:2442](),[3.2381][3.2381:2442](),[3.2442][2.3671:3873](),[2.3873][3.743:783](),[3.939][3.743:783](),[3.783][3.2691:2756](),[3.2691][3.2691:2756](),[3.2756][3.784:843](),[3.2511][3.5513:5539](),[3.4667][3.5513:5539](),[3.3907][3.5513:5539](),[3.2511][3.5513:5539](),[3.843][3.5513:5539](),[3.2108][3.5513:5539](),[3.6612][3.5513:5539](),[3.2822][3.5513:5539](),[3.1938][3.5513:5539](),[3.6307][3.5513:5539](),[3.1272][3.5513:5539](),[3.5513][3.5513:5539](),[3.5539][3.2823:2864]()
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    })
    [3.4531]
    [3.5580]
    })
  • replacement in src/xmpp/mod.rs at line 215
    [3.6691][2.3874:4011]()
    /// returns error if something went wrong and xmpp connection is broken
    fn online(&mut self, event: Event) -> Result<bool, ()> {
    [3.6691]
    [2.4011]
    /// returns error if something went wrong
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
  • replacement in src/xmpp/mod.rs at line 228
    [2.4339][2.4339:4363]()
    Err(())
    [2.4339]
    [2.4363]
    Err(format_err!("Disconnected while online"))
  • replacement in src/xmpp/mod.rs at line 233
    [2.4394][2.4394:4471]()
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {
    [2.4394]
    [2.4471]
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
  • replacement in src/xmpp/mod.rs at line 242
    [2.4859][2.4859:4978]()
    error!("Get error instead of roster");
    Err(())
    [2.4859]
    [2.4978]
    Err(format_err!("Get error instead of roster"))
  • edit in src/xmpp/mod.rs at line 254
    [2.5637][2.5637:5865]()
    }
    Err(e) => {
    error!("Cann't parse roster: {}", e);
    Err(())
  • edit in src/xmpp/mod.rs at line 255
    [2.5907]
    [2.5907]
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
  • edit in src/xmpp/mod.rs at line 257
    [2.5945][2.5945:6134]()
    }
    _ => {
    error!("Unknown result of roster");
    Err(())
  • edit in src/xmpp/mod.rs at line 258
    [2.6168]
    [2.6168]
    _ => Err(format_err!("Unknown result of roster")),
  • replacement in src/xmpp/mod.rs at line 264
    [2.6299][2.6299:6387]()
    error!("Iq stanza without id");
    Err(())
    [2.6299]
    [3.8819]
    Err(format_err!("Iq stanza without id"))
  • replacement in src/xmpp/mod.rs at line 270
    [2.6475][2.6475:6551]()
    error!("Wrong event while waiting roster");
    Err(())
    [2.6475]
    [2.6551]
    Err(format_err!("Wrong event while waiting roster"))
  • edit in src/xmpp/mod.rs at line 280
    [2.6747]
    [2.6747]
    E: Into<failure::Error>,
  • replacement in src/xmpp/mod.rs at line 282
    [2.6753][2.6753:6870]()
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
    [2.6753]
    [2.6870]
    let XmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 289
    [2.7065][2.7065:7080]()
    client
    [2.7065]
    [2.7080]
    state
    .client
  • replacement in src/xmpp/mod.rs at line 294
    [2.7201][2.7201:7226]()
    account2
    [2.7201]
    [2.7226]
    (account2, Err(failure::SyncFailure::new(e).into()))
  • replacement in src/xmpp/mod.rs at line 298
    [2.7312][2.7312:7369]()
    state: XmppState { client, roster },
    [2.7312]
    [2.7369]
    state: XmppState { client },
  • replacement in src/xmpp/mod.rs at line 302
    [2.7497][2.7497:7777](),[2.7777][3.1997:2016](),[3.8841][3.1997:2016]()
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    [2.7497]
    [2.7778]
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
  • replacement in src/xmpp/mod.rs at line 326
    [2.8011][2.8011:8128]()
    let XmppConnection {
    account,
    state: XmppState { client, roster },
    } = self;
    [2.8011]
    [3.10624]
    let XmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 333
    [2.8183][2.8183:8198]()
    client
    [2.8183]
    [2.8198]
    state
    .client
  • replacement in src/xmpp/mod.rs at line 338
    [2.8315][2.8315:8340]()
    account2
    [2.8315]
    [2.8340]
    (account2, Err(failure::SyncFailure::new(e).into()))
  • replacement in src/xmpp/mod.rs at line 342
    [2.8426][2.8426:8483]()
    state: XmppState { client, roster },
    [2.8426]
    [2.8483]
    state: XmppState { client },
  • replacement in src/xmpp/mod.rs at line 357
    [2.9105][2.9105:9220]()
    error!("Wrong event while waiting self-presence");
    Err(())
    [2.9105]
    [2.9220]
    Err(format_err!("Wrong event while waiting self-presence"))
  • replacement in src/xmpp/mod.rs at line 362
    [2.9320][2.9320:9600](),[2.9600][3.2485:2504](),[3.2485][3.2485:2504]()
    .map_err(|(account, _)| account)
    .and_then(|(conn, r)| match r {
    Ok(Either::A(_)) => future::ok(conn),
    Ok(Either::B(_)) => future::err(conn.account),
    Err(_e) => future::err(conn.account),
    })
    [2.9320]
    [2.9601]
    })
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
  • replacement in src/xmpp/mod.rs at line 376
    [3.3817][3.10286:10311]()
    struct XmppState<F, S> {
    [3.3817]
    [3.86]
    struct XmppProcessState<F, S> {
  • replacement in src/xmpp/mod.rs at line 382
    [3.3920][3.10312:10444]()
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 402
    [3.4357][3.12381:12431]()
    let conn = MaybeXmppConnection::new(account);
    [3.4357]
    [3.4526]
    let conn = account.into();
  • replacement in src/xmpp/mod.rs at line 404
    [3.4527][3.10445:10535]()
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 410
    [3.4692][3.12432:12500](),[3.8720][3.4761:4955](),[3.6088][3.4761:4955](),[3.6123][3.4761:4955](),[3.5985][3.4761:4955](),[3.12500][3.4761:4955](),[3.5884][3.4761:4955](),[3.9107][3.4761:4955](),[3.4761][3.4761:4955](),[3.4955][3.595:675](),[3.675][3.12501:12582](),[3.8802][3.731:755](),[3.6145][3.731:755](),[3.6205][3.731:755](),[3.6042][3.731:755](),[3.12582][3.731:755](),[3.5941][3.731:755](),[3.9189][3.731:755](),[3.731][3.731:755](),[3.723][3.5120:5146](),[3.482][3.5120:5146](),[3.755][3.5120:5146](),[3.5120][3.5120:5146](),[3.5146][3.12583:12779](),[3.12779][3.10536:10614](),[3.10614][3.12857:13049](),[3.6240][3.12857:13049](),[3.12857][3.12857:13049](),[3.13049][2.9617:9698]()
    conn.connect(signal.clone())
    .and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    format_err!("Receive cmd error")
    })
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
    .then(|r| {
    match r {
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
    cmd_recv,
    signal,
    conn.into(),
    )))
    } else {
    future::ok(future::Loop::Break((None, conn.into())))
    [3.4692]
    [3.13130]
    conn.connect(signal.clone()).and_then(|conn| {
    info!("xmpp connected!");
    conn.processing(|_, _| Ok(false), cmd_recv.into_future())
    .then(|r| match r {
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
  • replacement in src/xmpp/mod.rs at line 426
    [3.13156][3.7148:7170](),[3.7148][3.7148:7170](),[3.7170][3.13157:13189](),[3.9409][3.6240:6342](),[3.7219][3.6240:6342](),[3.6812][3.6240:6342](),[3.7116][3.6240:6342](),[3.13189][3.6240:6342](),[3.7036][3.6240:6342](),[3.9796][3.6240:6342](),[3.6240][3.6240:6342](),[3.6342][2.9699:9738](),[3.9472][3.6572:6627](),[3.7356][3.6572:6627](),[2.9738][3.6572:6627](),[3.10654][3.6572:6627](),[3.6875][3.6572:6627](),[3.4095][3.6572:6627](),[3.331][3.6572:6627](),[3.3926][3.6572:6627](),[3.954][3.6572:6627](),[3.7253][3.6572:6627](),[3.4726][3.6572:6627](),[3.6303][3.6572:6627](),[3.13229][3.6572:6627](),[3.7173][3.6572:6627](),[3.892][3.6572:6627](),[3.9859][3.6572:6627](),[3.6572][3.6572:6627](),[3.6627][3.893:900](),[3.900][2.9739:10245]()
    }
    Err(e) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    future::err(e)
    }
    }
    })
    })
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }
    [3.13156]
    [2.10245]
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    info!("Got command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    },
    Err((account, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    error!("Xmpp connection broken while get command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    },
    })
    })