Move xmpp client into XmppState

[?]
Dec 31, 2018, 6:15 PM
OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC

Dependencies

  • [2] ALP2YJIU Rename XmppState to XmppProcessState
  • [3] EOHEZXX3 Move request processing to structure
  • [4] TDOR5XQU Accept destination
  • [5] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [6] HOAZX2PB Reorganize roster processing. Output roster
  • [7] QWE26TMV update deps
  • [8] 5OBTKGDL Update deps
  • [9] O2GM5J4F Don't split xmpp receiving and sending
  • [10] AGIW6YR3 Use shared future for signal everywhere
  • [11] PFC7OJQF Query roster
  • [12] VS6AHRWI Move XMPP to separate dir
  • [13] AYQZ2UIA Update deps
  • [14] QYY3KRGL Use failure instead Box<dyn Error>
  • [15] HU3NZX5Z Process self-presence via new processing code
  • [16] XGP44R5H Rework stopping xmpp connection
  • [17] H7R7Y3FQ Use new processing code to wait online
  • [18] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [19] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [20] V5HDBSZM Use jid for receiver address
  • [21] IK3YDPTY Update deps
  • [22] 4LRBIGVT Show info about xmpp errors
  • [23] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [24] NDDQQP2P Update deps
  • [25] OGMBXBKP Move online to XmppConnection
  • [26] X6L47BHQ Use different structure for established xmpp connection
  • [27] ZI4GJ72V Add message to xmpp command
  • [28] PVCRPP3B Some servers don't send to in initial presence
  • [29] BTOZT4JP Use failure
  • [*] FVVPKFTL Initial commit

Change contents

  • edit in src/xmpp/stanzas.rs at line 2
    [3.57]
    [3.57]
    use xmpp_parsers::iq::Iq;
  • edit in src/xmpp/stanzas.rs at line 4
    [3.141]
    [3.141]
    use xmpp_parsers::roster::Roster;
  • edit in src/xmpp/stanzas.rs at line 14
    [3.419]
    pub fn make_get_roster(id: &str) -> Element {
    let mut get_roster = Iq::from_get(Roster {
    ver: None,
    items: vec![],
    });
    get_roster.id = Some(id.to_string());
    get_roster.into()
    }
  • edit in src/xmpp/mod.rs at line 9
    [3.434]
    [3.320]
    const ID_GET_ROSTER: &str = "id_get_roster0";
  • replacement in src/xmpp/mod.rs at line 12
    [3.321][3.321:354]()
    pub struct MaybeXmppConnection {
    [3.321]
    [3.354]
    struct XmppState {
    client: Client,
    }
    struct MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 18
    [3.397][2.0:27]()
    inner: Option<Client>,
    [3.397]
    [3.425]
    state: Option<XmppState>,
  • replacement in src/xmpp/mod.rs at line 21
    [3.106][3.73:101](),[3.73][3.73:101]()
    pub struct XmppConnection {
    [3.106]
    [3.31]
    struct XmppConnection {
  • replacement in src/xmpp/mod.rs at line 23
    [3.74][2.28:47]()
    inner: Client,
    [3.74]
    [3.448]
    state: XmppState,
  • replacement in src/xmpp/mod.rs at line 30
    [3.627][2.48:85]()
    inner: Some(from.inner),
    [3.627]
    [3.666]
    state: Some(from.state),
  • replacement in src/xmpp/mod.rs at line 39
    [3.527][2.86:111]()
    inner: None,
    [3.527]
    [3.499]
    state: None,
  • replacement in src/xmpp/mod.rs at line 50
    [3.1011][2.112:149]()
    F: future::Future + 'static,
    [3.1011]
    [3.1056]
    F: future::Future + Clone + 'static,
  • replacement in src/xmpp/mod.rs at line 54
    [3.690][2.150:209]()
    let MaybeXmppConnection { account, inner } = self;
    [3.690]
    [3.744]
    let MaybeXmppConnection { account, state } = self;
  • replacement in src/xmpp/mod.rs at line 56
    [3.745][2.210:315]()
    if let Some(inner) = inner {
    Box::new(future::ok(XmppConnection { account, inner }))
    [3.745]
    [3.1302]
    if let Some(state) = state {
    Box::new(future::ok(XmppConnection { account, state }))
  • edit in src/xmpp/mod.rs at line 62
    [3.1405]
    [3.1434]
    .clone()
  • replacement in src/xmpp/mod.rs at line 64
    [3.1464][2.316:377]()
    future::loop_fn(account, |account| {
    [3.1464]
    [3.1530]
    future::loop_fn(account, move |account| {
  • edit in src/xmpp/mod.rs at line 72
    [3.2010]
    [3.1476]
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
  • replacement in src/xmpp/mod.rs at line 79
    [2.435][2.435:494]()
    inner: client,
    [2.435]
    [2.494]
    state: XmppState { client },
  • replacement in src/xmpp/mod.rs at line 82
    [2.589][2.589:720]()
    .online()
    .and_then(XmppConnection::self_presence)
    [2.589]
    [2.720]
    .processing(XmppConnection::online, stop_future.clone())
    .map(|(conn, _)| conn)
    .map_err(|(acc, _)| acc)
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
  • replacement in src/xmpp/mod.rs at line 114
    [3.3788][2.917:970]()
    fn xmpp_processing(&mut self, _event: &Event) {}
    [3.3788]
    [3.3896]
    fn xmpp_processing(&mut self, event: &Event) {
    info!("Incoming xmpp event: {:?}", event);
    }
  • replacement in src/xmpp/mod.rs at line 126
    [3.4137][2.971:1092]()
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    [3.4137]
    [3.4319]
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
  • replacement in src/xmpp/mod.rs at line 134
    [3.4375][2.1093:1138]()
    S: FnMut(&mut Self, &Event) -> bool,
    [3.4375]
    [3.4476]
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
  • replacement in src/xmpp/mod.rs at line 140
    [3.4612][2.1139:1277](),[2.1277][3.4752:4866](),[3.4752][3.4752:4866](),[3.4866][2.1278:1457](),[2.1457][3.4945:5003](),[3.4945][3.4945:5003](),[3.5003][2.1458:1615]()
    let XmppConnection { inner, account } = xmpp;
    inner.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    inner: client,
    account,
    };
    xmpp.xmpp_processing(&event);
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    [3.4612]
    [2.1615]
    let XmppConnection { state, account } = xmpp;
    state
    .client
    .into_future()
    .select2(stop_future)
    .then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client },
    account,
    };
    xmpp.xmpp_processing(&event);
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(e) => future::err((xmpp.account, Err(e))),
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Ok(Either::B(t)),
    )))
  • replacement in src/xmpp/mod.rs at line 178
    [2.1652][2.1652:1746]()
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    [2.1652]
    [3.5504]
    future::err((account, Ok(Either::B(t))))
  • edit in src/xmpp/mod.rs at line 180
    [3.5534][3.5534:5636](),[3.5534][3.5534:5636]()
    } else {
    future::err((account, Ok(Either::A(b))))
  • replacement in src/xmpp/mod.rs at line 181
    [3.4531][3.5637:5706](),[3.5706][2.1747:1809](),[2.1809][3.5769:5830](),[3.5769][3.5769:5830](),[3.5830][2.1810:1877](),[2.1877][3.5898:6082](),[3.5898][3.5898:6082]()
    }
    Ok(Either::B((t, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    [3.4531]
    [3.4974]
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
  • replacement in src/xmpp/mod.rs at line 185
    [3.5000][3.6083:6242](),[3.6242][2.1878:1940](),[2.1940][3.6305:6366](),[3.6305][3.6305:6366](),[3.6366][2.1941:2048](),[2.2048][3.6481:6546](),[3.6481][3.6481:6546](),[3.6546][2.2049:2108]()
    }
    Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
    Err(Either::B((e, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    [3.5000]
    [3.5513]
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client },
    account,
    },
    Err(e.into()),
    )))
    } else {
    future::err((account, Err(e.into())))
    }
  • replacement in src/xmpp/mod.rs at line 198
    [3.2678][3.5539:5580](),[3.2678][3.5539:5580](),[3.5539][3.5539:5580]()
    }
    })
    [3.5539]
    [3.5580]
    })
  • replacement in src/xmpp/mod.rs at line 205
    [3.6737][2.2109:3094]()
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
    Box::new(future::loop_fn(
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
    Some(Event::Online) => {
    info!("Online");
    future::ok(future::Loop::Break(XmppConnection {
    account,
    inner: client,
    }))
    }
    Some(Event::Stanza(s)) => {
    info!("xmpp stanza: {:?}", s);
    future::ok(future::Loop::Continue((client, account)))
    }
    _ => {
    warn!("Disconnected");
    future::err(account)
    [3.6737]
    [3.5558]
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(format_err!("Disconnected while online"))
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
    if let Event::Stanza(s) = event {
    use try_from::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if let Some(id) = iq.id {
    if id == ID_GET_ROSTER {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    Err(format_err!("Get error instead of roster"))
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into()
    as Result<xmpp_parsers::roster::Roster, _>
    {
    Ok(roster) => {
    info!("Got roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    }
    Ok(true)
    }
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
    }
    }
    _ => Err(format_err!("Unknown result of roster")),
    }
    } else {
    Ok(false)
  • replacement in src/xmpp/mod.rs at line 252
    [3.5584][2.3095:3261]()
    },
    Err((e, _)) => {
    error!("xmpp receive error: {}", e);
    future::err(account)
    [3.5584]
    [3.8819]
    } else {
    Err(format_err!("Iq stanza without id"))
  • replacement in src/xmpp/mod.rs at line 255
    [3.8841][2.3262:3307]()
    })
    },
    ))
    [3.8841]
    [3.3783]
    }
    Err(_e) => Ok(false),
    }
    } else {
    Err(format_err!("Wrong event while waiting roster"))
    }
  • replacement in src/xmpp/mod.rs at line 263
    [3.10352][2.3308:3485]()
    fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    let XmppConnection { account, inner } = self;
    let client = inner;
    [3.10352]
    [3.10624]
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection { account, state } = self;
    use tokio::prelude::Sink;
    let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    state
    .client
    .send(get_roster)
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client },
    account,
    }
    .processing(XmppConnection::process_initial_roster, stop_future)
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
    }
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection { account, state } = self;
  • edit in src/xmpp/mod.rs at line 319
    [3.10716][2.3486:3524]()
    info!("Sending presence...");
  • replacement in src/xmpp/mod.rs at line 320
    [3.10756][2.3525:5862]()
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
    } else {
    future::ok(future::Loop::Continue((account, client)))
    }
    } else {
    future::err("Got wrong event".to_owned())
    }
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
    [3.10756]
    [3.12130]
    info!("Sending presence... {:?}", presence);
    state
    .client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection {
    state: XmppState { client },
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    if s.name() == "presence"
    && s.attr("from").map_or(false, |f| f == conn.account.jid)
    && s.attr("to").map_or(false, |f| f == conn.account.jid)
    {
    Ok(true)
    } else {
    Ok(false)
    }
    } else {
    Err(format_err!("Wrong event while waiting self-presence"))
    }
    },
    stop_future,
    )
    })
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
  • replacement in src/xmpp/mod.rs at line 365
    [3.3817][2.5863:5895]()
    struct XmppProcessState<F, S> {
    [3.3817]
    [3.86]
    struct XmppState<F, S> {
  • replacement in src/xmpp/mod.rs at line 371
    [3.3920][2.5896:6049]()
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
  • replacement in src/xmpp/mod.rs at line 393
    [3.4527][2.6050:6154]()
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
  • replacement in src/xmpp/mod.rs at line 416
    [3.12779][2.6155:6240]()
    future::ok(future::Loop::Continue(XmppProcessState::new(
    [3.12779]
    [3.12857]
    future::ok(future::Loop::Continue(XmppState::new(
  • replacement in src/xmpp/mod.rs at line 428
    [3.6342][2.6241:6303]()
    future::err(format_err!("Cmd error"))
    [3.6342]
    [3.6572]
    future::err(e)
  • edit in Cargo.toml at line 23
    [3.7111]
    [3.8552]
    try_from = "=0.2.2" # dependency of xmpp-parsers
  • edit in Cargo.lock at line 1110
    [3.44807]
    [3.44807]
    "try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",