Rename XmppState to XmppProcessState

[?]
Dec 31, 2018, 5:57 PM
ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC

Dependencies

  • [2] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [3] TDOR5XQU Accept destination
  • [4] QWE26TMV update deps
  • [5] EOHEZXX3 Move request processing to structure
  • [6] VS6AHRWI Move XMPP to separate dir
  • [7] XGP44R5H Rework stopping xmpp connection
  • [8] 5OBTKGDL Update deps
  • [9] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [10] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [11] HU3NZX5Z Process self-presence via new processing code
  • [12] PVCRPP3B Some servers don't send to in initial presence
  • [13] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [14] NDDQQP2P Update deps
  • [15] V5HDBSZM Use jid for receiver address
  • [16] PFC7OJQF Query roster
  • [17] ZI4GJ72V Add message to xmpp command
  • [18] O2GM5J4F Don't split xmpp receiving and sending
  • [19] HOAZX2PB Reorganize roster processing. Output roster
  • [20] X6L47BHQ Use different structure for established xmpp connection
  • [21] AYQZ2UIA Update deps
  • [22] QYY3KRGL Use failure instead Box<dyn Error>
  • [23] AGIW6YR3 Use shared future for signal everywhere
  • [24] BTOZT4JP Use failure
  • [25] IK3YDPTY Update deps
  • [26] 3GEU7TC7 Welcome to 2018!
  • [27] 4LRBIGVT Show info about xmpp errors
  • [28] OGMBXBKP Move online to XmppConnection
  • [*] FVVPKFTL Initial commit

Change contents

  • edit in src/xmpp/stanzas.rs at line 2
    [3.57][2.0:26]()
    use xmpp_parsers::iq::Iq;
  • edit in src/xmpp/stanzas.rs at line 3
    [3.141][2.27:61]()
    use xmpp_parsers::roster::Roster;
  • edit in src/xmpp/stanzas.rs at line 12
    [3.419][2.62:272]()
    pub fn make_get_roster(id: &str) -> Element {
    let mut get_roster = Iq::from_get(Roster {
    ver: None,
    items: vec![],
    });
    get_roster.id = Some(id.to_string());
    get_roster.into()
    }
  • edit in src/xmpp/mod.rs at line 9
    [3.434][2.273:320]()
    const ID_GET_ROSTER: &str = "id_get_roster0";
  • replacement in src/xmpp/mod.rs at line 12
    [2.397][2.397:425]()
    client: Option<Client>,
    [2.397]
    [2.425]
    inner: Option<Client>,
  • replacement in src/xmpp/mod.rs at line 17
    [3.74][2.428:448]()
    client: Client,
    [3.74]
    [2.448]
    inner: Client,
  • replacement in src/xmpp/mod.rs at line 24
    [2.627][2.627:666]()
    client: Some(from.client),
    [2.627]
    [2.666]
    inner: Some(from.inner),
  • replacement in src/xmpp/mod.rs at line 33
    [3.527][2.803:829]()
    client: None,
    [3.527]
    [3.499]
    inner: None,
  • replacement in src/xmpp/mod.rs at line 44
    [2.1011][2.1011:1056]()
    F: future::Future + Clone + 'static,
    [2.1011]
    [2.1056]
    F: future::Future + 'static,
  • replacement in src/xmpp/mod.rs at line 48
    [3.690][2.1133:1193]()
    let MaybeXmppConnection { account, client } = self;
    [3.690]
    [3.744]
    let MaybeXmppConnection { account, inner } = self;
  • replacement in src/xmpp/mod.rs at line 50
    [3.745][2.1194:1302]()
    if let Some(client) = client {
    Box::new(future::ok(XmppConnection { account, client }))
    [3.745]
    [2.1302]
    if let Some(inner) = inner {
    Box::new(future::ok(XmppConnection { account, inner }))
  • edit in src/xmpp/mod.rs at line 56
    [2.1405][2.1405:1434]()
    .clone()
  • replacement in src/xmpp/mod.rs at line 57
    [2.1464][2.1464:1530]()
    future::loop_fn(account, move |account| {
    [2.1464]
    [2.1530]
    future::loop_fn(account, |account| {
  • edit in src/xmpp/mod.rs at line 66
    [3.1477][2.2011:2164]()
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
  • replacement in src/xmpp/mod.rs at line 68
    [2.2275][2.2275:2842]()
    XmppConnection { client, account }
    .processing(XmppConnection::online, stop_future.clone())
    .map(|(conn, _)| conn)
    .map_err(|(acc, _)| acc)
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    .then(|r| match r {
    [2.2275]
    [2.2842]
    XmppConnection {
    inner: client,
    account,
    }
    .online()
    .and_then(XmppConnection::self_presence)
    .then(
    |r| match r {
  • replacement in src/xmpp/mod.rs at line 78
    [2.3042][2.3042:3090]()
    }),
    [2.3042]
    [2.3090]
    },
    ),
  • replacement in src/xmpp/mod.rs at line 101
    [2.3788][2.3788:3896]()
    fn xmpp_processing(&mut self, event: &Event) {
    info!("Incoming xmpp event: {:?}", event);
    }
    [2.3788]
    [2.3896]
    fn xmpp_processing(&mut self, _event: &Event) {}
  • replacement in src/xmpp/mod.rs at line 111
    [2.4137][2.4137:4319]()
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
    [2.4137]
    [2.4319]
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
  • replacement in src/xmpp/mod.rs at line 116
    [2.4375][2.4375:4476]()
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
    [2.4375]
    [2.4476]
    S: FnMut(&mut Self, &Event) -> bool,
  • replacement in src/xmpp/mod.rs at line 121
    [2.4612][2.4612:4752]()
    let XmppConnection { client, account } = xmpp;
    client.into_future().select2(stop_future).then(|r| match r {
    [2.4612]
    [2.4752]
    let XmppConnection { inner, account } = xmpp;
    inner.into_future().select2(stop_future).then(|r| match r {
  • replacement in src/xmpp/mod.rs at line 125
    [2.4866][2.4866:4945]()
    let mut xmpp = XmppConnection { client, account };
    [2.4866]
    [2.4945]
    let mut xmpp = XmppConnection {
    inner: client,
    account,
    };
  • replacement in src/xmpp/mod.rs at line 130
    [2.5003][2.5003:5504]()
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    }
    Err(e) => future::err((xmpp.account, Err(e))),
    [2.5003]
    [2.5504]
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    } else {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
  • replacement in src/xmpp/mod.rs at line 140
    [2.5706][2.5706:5769]()
    if let Some(client) = a.into_inner() {
    [2.5706]
    [2.5769]
    if let Some(inner) = a.into_inner() {
  • replacement in src/xmpp/mod.rs at line 142
    [2.5830][2.5830:5898]()
    XmppConnection { client, account },
    [2.5830]
    [2.5898]
    XmppConnection { inner, account },
  • replacement in src/xmpp/mod.rs at line 151
    [2.6242][2.6242:6305]()
    if let Some(client) = a.into_inner() {
    [2.6242]
    [2.6305]
    if let Some(inner) = a.into_inner() {
  • replacement in src/xmpp/mod.rs at line 153
    [2.6366][2.6366:6481]()
    XmppConnection { client, account },
    Err(e.into()),
    [2.6366]
    [2.6481]
    XmppConnection { inner, account },
    Err(e),
  • replacement in src/xmpp/mod.rs at line 157
    [2.6546][2.6546:6612]()
    future::err((account, Err(e.into())))
    [2.6546]
    [3.5513]
    future::err((account, Err(e)))
  • replacement in src/xmpp/mod.rs at line 167
    [2.6737][2.6737:7230](),[3.2924][3.3693:3694](),[2.7230][3.3693:3694](),[3.3693][3.3693:3694](),[3.3694][2.7231:8352](),[3.4798][3.2496:2542](),[2.8352][3.2496:2542](),[3.2496][3.2496:2542](),[3.2542][2.8353:8406](),[3.4885][3.2645:2687](),[2.8406][3.2645:2687](),[3.2645][3.2645:2687](),[3.2687][2.8407:8505](),[2.8505][3.2813:2851](),[3.2813][3.2813:2851](),[3.2851][2.8506:8724]()
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(format_err!("Disconnected while online"))
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
    if let Event::Stanza(s) = event {
    use try_from::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if let Some(id) = iq.id {
    if id == ID_GET_ROSTER {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    Err(format_err!("Get error instead of roster"))
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into()
    as Result<xmpp_parsers::roster::Roster, _>
    {
    Ok(roster) => {
    info!("Got roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    }
    Ok(true)
    }
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
    }
    }
    _ => Err(format_err!("Unknown result of roster")),
    }
    } else {
    Ok(false)
    [2.6737]
    [3.5558]
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
    Box::new(future::loop_fn(
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
    Some(Event::Online) => {
    info!("Online");
    future::ok(future::Loop::Break(XmppConnection {
    account,
    inner: client,
    }))
    }
    Some(Event::Stanza(s)) => {
    info!("xmpp stanza: {:?}", s);
    future::ok(future::Loop::Continue((client, account)))
    }
    _ => {
    warn!("Disconnected");
    future::err(account)
  • replacement in src/xmpp/mod.rs at line 188
    [3.5584][2.8725:8819]()
    } else {
    Err(format_err!("Iq stanza without id"))
    [3.5584]
    [2.8819]
    },
    Err((e, _)) => {
    error!("xmpp receive error: {}", e);
    future::err(account)
  • replacement in src/xmpp/mod.rs at line 193
    [2.8841][2.8841:8911](),[2.8911][3.5554:5571](),[3.5554][3.5554:5571](),[3.5571][2.8912:8977](),[2.8977][3.5710:5720](),[3.5710][3.5710:5720](),[3.5720][2.8978:10350]()
    }
    Err(_e) => Ok(false),
    }
    } else {
    Err(format_err!("Wrong event while waiting roster"))
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection { account, client } = self;
    use tokio::prelude::Sink;
    let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(get_roster)
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection { client, account }
    .processing(XmppConnection::process_initial_roster, stop_future)
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
    [2.8841]
    [3.3783]
    })
    },
    ))
  • replacement in src/xmpp/mod.rs at line 198
    [2.10352][2.10352:10624]()
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection { account, client } = self;
    [2.10352]
    [2.10624]
    fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    let XmppConnection { account, inner } = self;
    let client = inner;
  • edit in src/xmpp/mod.rs at line 204
    [2.10716]
    [2.10716]
    info!("Sending presence...");
  • replacement in src/xmpp/mod.rs at line 206
    [2.10756][2.10756:12130]()
    info!("Sending presence... {:?}", presence);
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection { client, account }.processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    if s.name() == "presence"
    && s.attr("from").map_or(false, |f| f == conn.account.jid)
    && s.attr("to").map_or(false, |f| f == conn.account.jid)
    {
    Ok(true)
    } else {
    Ok(false)
    }
    } else {
    Err(format_err!("Wrong event while waiting self-presence"))
    }
    },
    stop_future,
    )
    })
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
    [2.10756]
    [2.12130]
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
    } else {
    future::ok(future::Loop::Continue((account, client)))
    }
    } else {
    future::err("Got wrong event".to_owned())
    }
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
  • replacement in src/xmpp/mod.rs at line 257
    [3.3817][3.61:86]()
    struct XmppState<F, S> {
    [3.3817]
    [3.86]
    struct XmppProcessState<F, S> {
  • replacement in src/xmpp/mod.rs at line 263
    [3.3920][3.104:133](),[3.133][2.12169:12252](),[3.5851][3.4038:4058](),[3.211][3.4038:4058](),[3.4137][3.4038:4058](),[3.5748][3.4038:4058](),[3.597][3.4038:4058](),[3.4900][3.4038:4058](),[2.12252][3.4038:4058](),[3.8859][3.4038:4058](),[3.4038][3.4038:4058]()
    impl<F, S> XmppState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
    XmppState {
    [3.3920]
    [3.4058]
    impl<F, S> XmppProcessState<F, S> {
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
    XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 285
    [3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617](),[3.4527][3.4527:4617]()
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    [3.4527]
    [3.4617]
    future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {
    let XmppProcessState {
  • replacement in src/xmpp/mod.rs at line 308
    [2.12779][2.12779:12857]()
    future::ok(future::Loop::Continue(XmppState::new(
    [2.12779]
    [2.12857]
    future::ok(future::Loop::Continue(XmppProcessState::new(
  • replacement in src/xmpp/mod.rs at line 320
    [3.6342][2.13190:13229]()
    future::err(e)
    [3.6342]
    [3.6572]
    future::err(format_err!("Cmd error"))
  • edit in Cargo.toml at line 23
    [3.7111][2.13540:13589]()
    try_from = "=0.2.2" # dependency of xmpp-parsers
  • edit in Cargo.lock at line 1110
    [3.44807][2.18562:18637]()
    "try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",