Start to processing roster data

[?]
Jan 1, 2019, 3:30 PM
SU4DNVCBZYNXKBPMMNL2TAUVXYVDGBJR2P3BAZJKG7GJMYWELMKQC

Dependencies

  • [2] HCCX7VW6 Generate ids from counter
  • [3] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [4] OANBCLN5 Move xmpp client into XmppState
  • [5] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [6] UIXIQHDY Wait for commands via new processing code
  • [7] UMTLHH77 Process commands in the separate function
  • [8] V5HDBSZM Use jid for receiver address
  • [9] X6L47BHQ Use different structure for established xmpp connection
  • [10] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [11] EBETRYK7 Add counter for id. Check for jid in roster
  • [12] BWDUANCV Second part of processing result is only about stop_future
  • [13] PVCRPP3B Some servers don't send to in initial presence
  • [14] OGMBXBKP Move online to XmppConnection
  • [15] VS6AHRWI Move XMPP to separate dir
  • [16] WJNXI6Z4 Fill roster
  • [17] QTCUURXN Add additional requirement for command stream
  • [18] 4LRBIGVT Show info about xmpp errors
  • [19] IK3YDPTY Update deps
  • [20] 3FYEOGCI Move additional rarely changed data to separate structure
  • [21] L77O4T7M Formatting and fixes
  • [22] 5OBTKGDL Update deps
  • [23] EOHEZXX3 Move request processing to structure
  • [24] ALP2YJIU Rename XmppState to XmppProcessState
  • [25] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [26] HU3NZX5Z Process self-presence via new processing code
  • [27] AYQZ2UIA Update deps
  • [28] NDDQQP2P Update deps
  • [29] QYY3KRGL Use failure instead Box<dyn Error>
  • [30] TDOR5XQU Accept destination
  • [31] XGP44R5H Rework stopping xmpp connection
  • [32] ZI4GJ72V Add message to xmpp command
  • [33] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [34] AGIW6YR3 Use shared future for signal everywhere
  • [35] BTOZT4JP Use failure
  • [36] QWE26TMV update deps
  • [37] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection

Change contents

  • replacement in src/xmpp/mod.rs at line 12
    [3.321][3.0:72](),[3.72][2.0:56](),[2.56][3.93:95](),[3.93][3.93:95]()
    #[derive(Default)]
    struct XmppData {
    roster: HashMap<jid::Jid, ()>,
    counter: usize,
    id_init_roster: Option<String>,
    }
    [3.321]
    [3.95]
    const ID_GET_ROSTER: &str = "id_get_roster0";
  • replacement in src/xmpp/mod.rs at line 16
    [3.393][3.97:117]()
    data: XmppData,
    [3.393]
    [3.428]
    roster: HashMap<jid::Jid, ()>,
  • replacement in src/xmpp/mod.rs at line 38
    [3.19][3.0:113]()
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
    [3.19]
    [3.132]
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 41
    [3.162][3.114:159]()
    account: std::rc::Rc::new(from),
    [3.162]
    [3.159]
    account: std::rc::Rc::new(account),
  • edit in src/xmpp/mod.rs at line 45
    [3.200][3.200:202]()
    }
  • edit in src/xmpp/mod.rs at line 46
    [3.203][3.203:399](),[3.399][3.207:232](),[3.207][3.207:232](),[3.207][3.207:232](),[3.232][3.666:682](),[3.577][3.666:682](),[3.509][3.666:682](),[3.85][3.666:682](),[3.148][3.666:682](),[3.666][3.666:682](),[3.682][3.400:402](),[3.402][3.128:129](),[3.128][3.128:129](),[3.129][3.403:430]()
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
    state: None,
    }
    }
    }
    impl MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 47
    [3.554][3.431:482]()
    /// don't connect only if stop_future resolved
    [3.554]
    [3.876]
    /// don't connect if stop_future resolved
  • replacement in src/xmpp/mod.rs at line 84
    [3.208][3.118:206]()
    data: std::default::Default::default(),
    [3.208]
    [3.340]
    roster: HashMap::new(),
  • replacement in src/xmpp/mod.rs at line 128
    [3.672][3.2073:2124](),[3.1000][3.2073:2124](),[3.894][3.2073:2124](),[3.2073][3.2073:2124](),[3.2124][3.895:920]()
    info!("Incoming xmpp event: {:?}", event);
    future::ok(self)
    [3.894]
    [3.2124]
    match event {
    Event::Stanza(stanza) => {
    info!("Incoming xmpp event: {:?}", stanza);
    let stanza = stanza.clone();
    use try_from::TryInto;
    if let Some(_iq) = stanza.try_into().ok() as Option<xmpp_parsers::iq::Iq> {}
    future::ok(self)
    }
    Event::Online => future::ok(self),
    e => {
    warn!("Unexpected event {:?}", e);
    future::err(self.account)
    }
    }
  • replacement in src/xmpp/mod.rs at line 168
    [3.1389][3.207:262]()
    state: XmppState { client, data },
    [3.1389]
    [3.1588]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 171
    [3.1643][3.1643:1666](),[3.1643][3.1643:1666](),[3.1666][3.1147:1224](),[3.1147][3.1147:1224](),[3.1224][3.1667:1712](),[3.1712][3.1264:1386](),[3.1264][3.1264:1386](),[3.1386][3.1713:1773](),[3.1773][3.263:334](),[3.334][3.1515:1595](),[3.2012][3.1515:1595](),[3.1515][3.1515:1595](),[3.1595][3.2013:3071](),[3.3071][3.1848:1885](),[3.1848][3.1848:1885](),[3.1885][3.3072:3155](),[3.3155][3.1958:2014](),[3.1958][3.1958:2014](),[3.2014][3.3156:3363](),[3.3363][3.335:406](),[3.406][3.3564:3609](),[3.2407][3.3564:3609]()
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    },
    Err(account) => future::err((account, Ok(Either::A(b)))),
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    [3.1643]
    [3.3609]
    client.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, roster },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
  • replacement in src/xmpp/mod.rs at line 190
    [3.3644][3.3644:3726](),[3.3644][3.3644:3726]()
    Ok(Either::B(t)),
    )))
    [3.3644]
    [3.3726]
    Err(account) => future::err((account, Ok(Either::A(b)))),
    })) as Box<dyn Future<Item = _, Error = _>>
  • edit in src/xmpp/mod.rs at line 193
    [3.3759][3.3759:3856](),[3.3759][3.3759:3856](),[3.3856][3.2663:2773](),[3.2663][3.2663:2773]()
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
  • replacement in src/xmpp/mod.rs at line 195
    [3.2868][3.3937:4145](),[3.4145][3.407:478](),[3.478][3.4346:4618](),[3.3262][3.4346:4618](),[3.4618][3.3487:3510](),[3.2966][3.3487:3510]()
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
    [3.2868]
    [3.5580]
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
  • replacement in src/xmpp/mod.rs at line 252
    [3.4690][2.57:129]()
    if Some(id) == self.state.data.id_init_roster {
    [3.4690]
    [3.4739]
    if id == ID_GET_ROSTER {
  • replacement in src/xmpp/mod.rs at line 263
    [3.5333][3.5333:5399](),[3.5333][3.5333:5399](),[3.5333][3.5333:5399](),[3.5333][3.5333:5399](),[3.5333][3.5333:5399]()
    info!("Got roster:");
    [3.5333]
    [3.5399]
    self.state.data.roster.clear();
    info!("Got first roster:");
  • edit in src/xmpp/mod.rs at line 267
    [3.5538][3.479:569]()
    self.state.data.roster.insert(i.jid, ());
  • replacement in src/xmpp/mod.rs at line 307
    [3.5676][2.130:181]()
    state: XmppState { client, mut data },
    [3.5676]
    [3.5827]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 311
    [3.6905][2.182:389]()
    data.counter += 1;
    let id_init_roster = format!("id{}", data.counter);
    let get_roster = stanzas::make_get_roster(&id_init_roster);
    data.id_init_roster = Some(id_init_roster);
    [3.6905]
    [3.6971]
    let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
  • replacement in src/xmpp/mod.rs at line 323
    [3.7312][3.618:673]()
    state: XmppState { client, data },
    [3.7312]
    [3.7369]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 346
    [3.6482][3.674:721]()
    state: XmppState { client, data },
    [3.6482]
    [3.6633]
    state: XmppState { client, roster },
  • replacement in src/xmpp/mod.rs at line 362
    [3.8426][3.722:777]()
    state: XmppState { client, data },
    [3.8426]
    [3.8483]
    state: XmppState { client, roster },
  • edit in src/xmpp/mod.rs at line 390
    [3.5631][3.5631:5676](),[3.5676][3.7270:7297](),[3.7297][3.5704:5808](),[3.5704][3.5704:5808](),[3.5808][3.778:854](),[3.854][3.7369:7504](),[3.7369][3.7369:7504](),[3.7504][3.5808:5833](),[3.5808][3.5808:5833]()
    }
    fn process_command(
    self,
    cmd: &XmppCommand,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    info!("Got command");
    if let Some(_jid_data) = self.state.data.roster.get(&cmd.xmpp_to) {
    info!("Jid {} in roster", cmd.xmpp_to);
    } else {
    info!("Jid {} not in roster", cmd.xmpp_to);
    }
    future::ok(self)
  • replacement in src/xmpp/mod.rs at line 396
    [3.3817][3.5867:5899]()
    struct XmppProcessState<F, S> {
    [3.3817]
    [3.86]
    struct XmppState<F, S> {
  • replacement in src/xmpp/mod.rs at line 402
    [3.3920][3.5900:6053]()
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
  • replacement in src/xmpp/mod.rs at line 422
    [3.4357][3.6054:6085]()
    let conn = account.into();
    [3.4357]
    [3.4526]
    let conn = MaybeXmppConnection::new(account);
  • replacement in src/xmpp/mod.rs at line 424
    [3.4527][3.6086:6190]()
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
  • replacement in src/xmpp/mod.rs at line 430
    [3.4692][3.6191:6612]()
    conn.connect(signal.clone()).and_then(|conn| {
    info!("xmpp connected!");
    conn.processing(|_, _| Ok(false), cmd_recv.into_future())
    .then(|r| match r {
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    [3.4692]
    [3.7143]
    conn.connect(signal.clone())
    .and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    format_err!("Receive cmd error")
    })
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
    .then(|r| {
    match r {
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
  • replacement in src/xmpp/mod.rs at line 450
    [3.7225][3.6613:6696]()
    conn: conn.into(),
    }))
    [3.7225]
    [3.7302]
    conn.into(),
    )))
  • replacement in src/xmpp/mod.rs at line 453
    [3.7335][3.6697:8539](),[3.6783][3.13130:13156](),[3.9698][3.13130:13156](),[3.118][3.13130:13156](),[3.8539][3.13130:13156](),[3.7416][3.13130:13156](),[3.13130][3.13130:13156](),[3.13156][3.8540:9142]()
    future::err(format_err!("Command receiver is gone"))
    })
    as Box<dyn Future<Item = _, Error = _>>,
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(cmd) = cmd {
    Box::new(conn.process_command(&cmd).then(|r| {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: match r {
    Ok(conn) => conn.into(),
    Err(account) => account.into(),
    },
    }))
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::ok(future::Loop::Break(())))
    }
    }
    Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
    },
    Err((account, r)) => Box::new(match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    error!("Xmpp connection broken while get command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    [3.7335]
    [3.9142]
    future::ok(future::Loop::Break((None, conn.into())))
  • replacement in src/xmpp/mod.rs at line 455
    [3.9168][3.9168:9312]()
    Err(_) => future::err(format_err!("Command receiver is broken")),
    }),
    })
    })
    [3.9168]
    [3.10245]
    }
    Err(e) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    future::err(e)
    }
    }
    })
    })
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }