Prepare adding new items to roster

[?]
Jan 1, 2019, 6:01 PM
CBWCXUZZBPQYFROVZFCIWHW3CPNAGXO3PUARLGOF4DUF6VVCYKVQC

Dependencies

  • [2] 3ADA5BBX Add items to roster from iq of "set" type
  • [3] NDDQQP2P Update deps
  • [4] HU3NZX5Z Process self-presence via new processing code
  • [5] ZI4GJ72V Add message to xmpp command
  • [6] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [7] QYY3KRGL Use failure instead Box<dyn Error>
  • [8] QWE26TMV update deps
  • [9] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [10] UMTLHH77 Process commands in the separate function
  • [11] HCCX7VW6 Generate ids from counter
  • [12] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [13] AGIW6YR3 Use shared future for signal everywhere
  • [14] EBETRYK7 Add counter for id. Check for jid in roster
  • [15] OGMBXBKP Move online to XmppConnection
  • [16] L77O4T7M Formatting and fixes
  • [17] X6L47BHQ Use different structure for established xmpp connection
  • [18] BTOZT4JP Use failure
  • [19] ALP2YJIU Rename XmppState to XmppProcessState
  • [20] VS6AHRWI Move XMPP to separate dir
  • [21] 4LRBIGVT Show info about xmpp errors
  • [22] SU4DNVCB Start to processing roster data
  • [23] EOHEZXX3 Move request processing to structure
  • [24] TDOR5XQU Accept destination
  • [25] PFC7OJQF Query roster
  • [26] 3FYEOGCI Move additional rarely changed data to separate structure
  • [27] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [28] CP4MZO6V Leftover commands are processed via stoppable receiver
  • [29] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [30] V5HDBSZM Use jid for receiver address
  • [31] 5OBTKGDL Update deps
  • [32] XGP44R5H Rework stopping xmpp connection
  • [33] PVCRPP3B Some servers don't send to in initial presence
  • [34] OANBCLN5 Move xmpp client into XmppState
  • [35] QTCUURXN Add additional requirement for command stream
  • [36] UIXIQHDY Wait for commands via new processing code
  • [37] AYQZ2UIA Update deps
  • [38] BWDUANCV Second part of processing result is only about stop_future
  • [39] IK3YDPTY Update deps

Change contents

  • replacement in src/xmpp/stanzas.rs at line 4
    [3.141][3.27:61]()
    use xmpp_parsers::roster::Roster;
    [3.141]
    [3.141]
    use xmpp_parsers::roster::{Item, Roster};
  • edit in src/xmpp/stanzas.rs at line 17
    [3.156]
    [3.156]
    items: vec![],
  • edit in src/xmpp/stanzas.rs at line 19
    [3.175][3.175:198]()
    items: vec![],
  • edit in src/xmpp/stanzas.rs at line 22
    [3.270]
    [3.270]
    }
    pub fn make_add_roster(id: &str, jid: jid::Jid) -> Element {
    let mut add_roster = Iq::from_set(Roster {
    items: vec![Item {
    jid,
    name: None,
    subscription: xmpp_parsers::roster::Subscription::None,
    groups: vec![],
    }],
    ver: None,
    });
    add_roster.id = Some(id.to_string());
    add_roster.into()
  • edit in src/xmpp/mod.rs at line 13
    [3.46]
    [3.95]
    #[derive(Default)]
    struct XmppData {
    roster: HashMap<jid::Jid, ()>,
    _counter: usize,
    }
  • replacement in src/xmpp/mod.rs at line 22
    [3.393][3.47:82]()
    roster: HashMap<jid::Jid, ()>,
    [3.393]
    [3.428]
    data: XmppData,
  • replacement in src/xmpp/mod.rs at line 44
    [3.19][3.83:172]()
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
    [3.19]
    [3.132]
    impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 47
    [3.162][3.173:221]()
    account: std::rc::Rc::new(account),
    [3.162]
    [3.159]
    account: std::rc::Rc::new(from),
    state: None,
    }
    }
    }
    impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from,
  • edit in src/xmpp/mod.rs at line 60
    [3.200]
    [3.202]
    }
  • edit in src/xmpp/mod.rs at line 62
    [3.203]
    [3.516]
    impl MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 64
    [3.554][3.222:268]()
    /// don't connect if stop_future resolved
    [3.554]
    [3.876]
    /// don't connect only if stop_future resolved
  • replacement in src/xmpp/mod.rs at line 101
    [3.208][3.269:341]()
    roster: HashMap::new(),
    [3.208]
    [3.340]
    data: std::default::Default::default(),
  • replacement in src/xmpp/mod.rs at line 142
    [3.783][2.0:18]()
    mut self,
    [3.783]
    [3.797]
    self,
  • replacement in src/xmpp/mod.rs at line 145
    [3.894][3.342:548](),[3.548][2.19:626](),[2.626][3.641:871](),[3.641][3.641:871]()
    match event {
    Event::Stanza(stanza) => {
    info!("Incoming xmpp event: {:?}", stanza);
    let stanza = stanza.clone();
    use try_from::TryInto;
    if let Some(iq) = stanza.try_into().ok() as Option<xmpp_parsers::iq::Iq> {
    if let xmpp_parsers::iq::IqType::Set(element) = iq.payload {
    if let Some(roster) =
    element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
    {
    self.state
    .data
    .roster
    .extend(roster.items.into_iter().map(|i| (i.jid, ())));
    }
    }
    }
    future::ok(self)
    }
    Event::Online => future::ok(self),
    e => {
    warn!("Unexpected event {:?}", e);
    future::err(self.account)
    }
    }
    [3.894]
    [3.2124]
    info!("Incoming xmpp event: {:?}", event);
    future::ok(self)
  • replacement in src/xmpp/mod.rs at line 172
    [3.1389][3.872:929]()
    state: XmppState { client, roster },
    [3.1389]
    [3.1588]
    state: XmppState { client, data },
  • replacement in src/xmpp/mod.rs at line 175
    [3.1643][3.930:2043]()
    client.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, roster },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    [3.1643]
    [3.3609]
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    },
    Err(account) => future::err((account, Ok(Either::A(b)))),
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
  • replacement in src/xmpp/mod.rs at line 211
    [3.3644][3.2044:2206]()
    Err(account) => future::err((account, Ok(Either::A(b)))),
    })) as Box<dyn Future<Item = _, Error = _>>
    [3.3644]
    [3.3726]
    Ok(Either::B(t)),
    )))
  • edit in src/xmpp/mod.rs at line 214
    [3.3759]
    [3.3857]
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
  • replacement in src/xmpp/mod.rs at line 220
    [3.2868][3.2207:3484]()
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, roster },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
    [3.2868]
    [3.5580]
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
  • replacement in src/xmpp/mod.rs at line 272
    [3.5333][3.3535:3683]()
    self.state.data.roster.clear();
    info!("Got first roster:");
    [3.5333]
    [3.5399]
    info!("Got roster:");
  • edit in src/xmpp/mod.rs at line 275
    [3.5538]
    [3.5538]
    self.state.data.roster.insert(i.jid, ());
  • replacement in src/xmpp/mod.rs at line 316
    [3.5676][3.3684:3733]()
    state: XmppState { client, roster },
    [3.5676]
    [3.5827]
    state: XmppState { client, data },
  • replacement in src/xmpp/mod.rs at line 332
    [3.7312][3.3801:3858]()
    state: XmppState { client, roster },
    [3.7312]
    [3.7369]
    state: XmppState { client, data },
  • replacement in src/xmpp/mod.rs at line 355
    [3.6482][3.3859:3908]()
    state: XmppState { client, roster },
    [3.6482]
    [3.6633]
    state: XmppState { client, data },
  • replacement in src/xmpp/mod.rs at line 371
    [3.8426][3.3909:3966]()
    state: XmppState { client, roster },
    [3.8426]
    [3.8483]
    state: XmppState { client, data },
  • edit in src/xmpp/mod.rs at line 400
    [3.12136]
    [3.3789]
    fn process_command(
    self,
    cmd: &XmppCommand,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    info!("Got command");
    if self.state.data.roster.get(&cmd.xmpp_to).is_some() {
    info!("Jid {} in roster", cmd.xmpp_to);
    future::ok(self)
    } else {
    info!("Jid {} not in roster", cmd.xmpp_to);
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
    data.counter += 1;
    let id_add_roster = format!("id{}", data.counter);
    let add_roster = stanzas::make_add_roster(&id_add_roster, cmd.xmpp_to.clone());
    info!("Add jid to roster... {:?}", add_roster);
    future::ok(XmppConnection {
    account,
    state: XmppState { client, data },
    })
    }
    }
  • replacement in src/xmpp/mod.rs at line 432
    [3.3817][3.3967:3992]()
    struct XmppState<F, S> {
    [3.3817]
    [3.86]
    struct XmppProcessState<F, S> {
  • replacement in src/xmpp/mod.rs at line 438
    [3.3920][3.3993:4125]()
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 458
    [3.4357][3.4126:4176]()
    let conn = MaybeXmppConnection::new(account);
    [3.4357]
    [3.4526]
    let conn = account.into();
  • replacement in src/xmpp/mod.rs at line 460
    [3.4527][3.4177:4267]()
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 466
    [3.4692][3.4268:5015]()
    conn.connect(signal.clone())
    .and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    format_err!("Receive cmd error")
    })
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
    .then(|r| {
    match r {
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
    [3.4692]
    [3.7143]
    conn.connect(signal.clone()).and_then(|conn| {
    info!("xmpp connected!");
    conn.processing(|_, _| Ok(false), cmd_recv.into_future())
    .then(|r| match r {
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 475
    [3.7225][3.5016:5093]()
    conn.into(),
    )))
    [3.7225]
    [3.7302]
    conn: conn.into(),
    }))
  • replacement in src/xmpp/mod.rs at line 478
    [3.7335][3.5094:5175]()
    future::ok(future::Loop::Break((None, conn.into())))
    [3.7335]
    [3.9142]
    future::err(format_err!("Command receiver is gone"))
    })
    as Box<dyn Future<Item = _, Error = _>>,
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(cmd) = cmd {
    Box::new(conn.process_command(&cmd).then(|r| {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: match r {
    Ok(conn) => conn.into(),
    Err(account) => account.into(),
    },
    }))
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::ok(future::Loop::Break(())))
    }
  • replacement in src/xmpp/mod.rs at line 498
    [3.9168][3.5176:5939]()
    }
    Err(e) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    future::err(e)
    }
    }
    })
    })
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
    if let Some(cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    Box::new(
    cmd_recv
    .for_each(|_cmd| future::ok(()))
    .map_err(|_| format_err!("cmd receiver last error")),
    ) as Box<Future<Item = (), Error = failure::Error>>
    } else {
    Box::new(future::ok(()))
    }
    [3.9168]
    [3.10245]
    Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
    },
    Err((account, r)) => Box::new(match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
    Ok(Either::B((cmd, cmd_recv))) => {
    if let Some(_cmd) = cmd {
    error!("Xmpp connection broken while get command");
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: account.into(),
    }))
    } else {
    future::ok(future::Loop::Break(()))
    }
    }
    Err(_) => future::err(format_err!("Command receiver is broken")),
    }),
    })
    })