Rework stopping xmpp connection

[?]
Dec 29, 2018, 8:22 PM
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC

Dependencies

  • [2] AGIW6YR3 Use shared future for signal everywhere
  • [3] NDDQQP2P Update deps
  • [4] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [5] 3GEU7TC7 Welcome to 2018!
  • [6] BTOZT4JP Use failure
  • [7] QWE26TMV update deps
  • [8] O2GM5J4F Don't split xmpp receiving and sending
  • [9] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [10] OGMBXBKP Move online to XmppConnection
  • [11] X6L47BHQ Use different structure for established xmpp connection
  • [12] 2L3JHRUL Create separate functions to process incoming XMPP stanzas
  • [13] TDOR5XQU Accept destination
  • [14] VS6AHRWI Move XMPP to separate dir

Change contents

  • edit in src/xmpp/mod.rs at line 11
    [3.34]
    [3.31]
    account: std::rc::Rc<config::Account>,
    inner: Option<Client>,
    }
    pub struct XmppConnection {
  • replacement in src/xmpp/mod.rs at line 17
    [3.74][2.0:77]()
    inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
    [3.74]
    [3.356]
    inner: Client,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    inner: Some(from.inner),
    }
    }
  • replacement in src/xmpp/mod.rs at line 38
    [3.554][3.554:579](),[3.554][3.554:579](),[3.579][2.78:152]()
    /// Error shoud be !
    fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
    [3.554]
    [3.653]
    /// don't connect if stop_future resolved
    fn connect<F>(
    self,
    stop_future: F,
    ) -> impl Future<Item = XmppConnection, Error = failure::Error>
    where
    F: future::Future + 'static,
    <F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
    {
  • replacement in src/xmpp/mod.rs at line 51
    [3.782][2.153:320]()
    Box::new(future::ok(MaybeXmppConnection {
    account,
    inner: Some(inner),
    })) as Box<Future<Item = _, Error = E>>
    [3.782]
    [3.944]
    Box::new(future::ok(XmppConnection { account, inner }))
    as Box<Future<Item = _, Error = _>>
  • replacement in src/xmpp/mod.rs at line 54
    [3.961][3.961:1476](),[3.961][3.961:1476](),[3.961][3.961:1476](),[3.961][3.961:1476](),[3.961][3.961:1476]()
    Box::new(future::loop_fn(account, |account| {
    info!("xmpp initialization...");
    let mut res_client = Client::new(&account.jid, &account.password);
    while let Err(e) = res_client {
    error!("Cann't init xmpp client: {}", e);
    res_client = Client::new(&account.jid, &account.password);
    }
    let client = res_client.expect("Cann't init xmpp client");
    info!("xmpp initialized");
    [3.961]
    [3.1476]
    Box::new(
    stop_future
    .select2(
    future::loop_fn(account, |account| {
    info!("xmpp initialization...");
    let res_client = Client::new(&account.jid, &account.password);
    match res_client {
    Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
    as Box<Future<Item = _, Error = _>>,
    Ok(client) => {
    info!("xmpp initialized");
  • replacement in src/xmpp/mod.rs at line 66
    [3.1477][3.1477:1522](),[3.1477][3.1477:1522](),[3.1477][3.1477:1522](),[3.1477][3.1477:1522](),[3.1477][3.1477:1522](),[3.1522][2.321:426]()
    // future to wait for online
    Self::online(client.split(), account)
    .and_then(Self::self_presence)
    [3.1477]
    [2.426]
    // future to wait for online
    Box::new(
    XmppConnection {
    inner: client,
    account,
    }
    .online()
    .and_then(XmppConnection::self_presence)
    .then(
    |r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    },
    ),
    )
    }
    }
    })
    .map_err(|_: ()| ()),
    )
  • replacement in src/xmpp/mod.rs at line 87
    [2.466][2.466:641](),[3.1142][3.1842:1858](),[2.641][3.1842:1858](),[3.1842][3.1842:1858]()
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
    }))
    [2.466]
    [3.1858]
    Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
    Ok(Either::B((x, _a))) => future::ok(x),
    Err(Either::A((e, _b))) => future::err(e.into()),
    Err(Either::B((_, _a))) => {
    future::err(format_err!("Cann't initiate XMPP connection"))
    }
    }),
    )
  • edit in src/xmpp/mod.rs at line 96
    [3.1868]
    [3.1868]
    }
    }
    impl XmppConnection {
    /// base XMPP processing
    fn xmpp_processing(&mut self, _event: &Event) {}
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved
    fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E>,
    S: FnMut(&mut Self, &Event) -> bool,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection { inner, account } = xmpp;
    inner.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    inner: client,
    account,
    };
    xmpp.xmpp_processing(&event);
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    } else {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
    Err(Either::B((e, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    })
    },
    )
  • replacement in src/xmpp/mod.rs at line 167
    [3.2767][2.642:860]()
    fn online(
    (sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
    account: std::rc::Rc<config::Account>,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    [3.2767]
    [3.2865]
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
  • replacement in src/xmpp/mod.rs at line 169
    [3.2899][2.861:1051]()
    (sink, stream, account),
    |(sink, stream, account)| {
    stream.into_future().then(|r| match r {
    Ok((event, stream)) => match event {
    [3.2899]
    [3.3086]
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
  • replacement in src/xmpp/mod.rs at line 175
    [3.3180][2.1052:1133]()
    future::ok(future::Loop::Break(MaybeXmppConnection {
    [3.3180]
    [3.3256]
    future::ok(future::Loop::Break(XmppConnection {
  • replacement in src/xmpp/mod.rs at line 177
    [3.3297][2.1134:1195]()
    inner: Some((sink, stream)),
    [3.3297]
    [3.3344]
    inner: client,
  • replacement in src/xmpp/mod.rs at line 182
    [3.3513][2.1196:1284]()
    future::ok(future::Loop::Continue((sink, stream, account)))
    [3.3513]
    [3.3595]
    future::ok(future::Loop::Continue((client, account)))
  • replacement in src/xmpp/mod.rs at line 199
    [3.745][2.1285:1428]()
    let MaybeXmppConnection { account, inner } = self;
    if let Some((sink, stream)) = inner {
    use tokio::prelude::Sink;
    [3.745]
    [3.784]
    let XmppConnection { account, inner } = self;
    let client = inner;
    use tokio::prelude::Sink;
  • replacement in src/xmpp/mod.rs at line 203
    [3.785][2.1429:3348]()
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    Box::new(
    sink.send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .join(
    future::loop_fn((account.clone(), stream), |(account, stream)| {
    stream
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, stream)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from")
    .map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(stream))
    } else {
    future::ok(future::Loop::Continue((
    account, stream,
    )))
    }
    [3.785]
    [3.2608]
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    let account2 = account.clone();
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
  • replacement in src/xmpp/mod.rs at line 232
    [3.2657][2.3349:3435]()
    future::err("Got wrong event".to_owned())
    [3.2657]
    [3.2743]
    future::ok(future::Loop::Continue((account, client)))
  • edit in src/xmpp/mod.rs at line 234
    [3.2785]
    [3.2785]
    } else {
    future::err("Got wrong event".to_owned())
  • replacement in src/xmpp/mod.rs at line 237
    [3.2823][2.3436:4300]()
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e)),
    )
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(MaybeXmppConnection {
    account,
    inner: Some(inner),
    }),
    }),
    )
    } else {
    warn!("Don't gen connection on self-presence");
    Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
    }
    [3.2823]
    [3.3783]
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
  • replacement in src/xmpp/mod.rs at line 280
    [2.4342][2.4342:4398]()
    <F as hyper::rt::Future>::Error: std::fmt::Display,
    [2.4342]
    [3.465]
    <F as hyper::rt::Future>::Error: std::fmt::Display + Into<failure::Error> + Send,
  • replacement in src/xmpp/mod.rs at line 291
    [3.4692][3.4692:4761](),[3.4692][3.4692:4761](),[3.4692][3.4692:4761](),[3.4692][3.4692:4761]()
    signal
    .select2(conn.connect().and_then(|conn| {
    [3.4692]
    [3.4761]
    conn.connect(signal.clone())
    .and_then(|conn| {
  • replacement in src/xmpp/mod.rs at line 300
    [3.675][3.675:731]()
    .map(|f| (f, conn))
    }))
    [3.675]
    [3.731]
    .map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
    })
  • replacement in src/xmpp/mod.rs at line 304
    [3.5146][3.5146:5285](),[3.5146][3.5146:5285](),[3.5146][3.5146:5285](),[3.5146][3.5146:5285](),[3.5285][2.4399:4486](),[3.3375][3.5372:5743](),[3.4283][3.5372:5743](),[2.4486][3.5372:5743](),[3.5034][3.5372:5743](),[3.5372][3.5372:5743](),[3.5743][2.4487:4520](),[3.3416][3.5776:6018](),[3.4324][3.5776:6018](),[2.4520][3.5776:6018](),[3.5068][3.5776:6018](),[3.5776][3.5776:6018](),[3.6018][2.4521:4608](),[3.3511][3.6105:6169](),[3.4419][3.6105:6169](),[2.4608][3.6105:6169](),[3.5156][3.6105:6169](),[3.6105][3.6105:6169]()
    Ok(Either::A((_x, b))) => {
    info!("Got signal");
    // got signal, breaks
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    }
    Ok(Either::B((x, a))) => {
    info!("Got cmd");
    // got cmd, continue
    Box::new(future::ok(future::Loop::Continue(XmppState::new(
    (x.0).1,
    a,
    x.1,
    )))) as Box<Future<Item = _, Error = _>>
    }
    Err(Either::A((e, b))) => {
    // got signal error, breaks
    error!("Signal error: {}", e);
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    [3.5146]
    [3.6169]
    Ok((cmd, cmd_recv, conn)) => {
    if let Some(_cmd) = cmd {
    info!("Got cmd");
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(
    cmd_recv,
    signal,
    conn.into(),
    )))
    } else {
    future::ok(future::Loop::Break((None, conn.into())))
    }
  • replacement in src/xmpp/mod.rs at line 317
    [3.6191][3.6191:6240](),[3.6191][3.6191:6240](),[3.6191][3.6191:6240](),[3.6191][3.6191:6240](),[3.6191][3.6191:6240]()
    Err(Either::B((e, _a))) => {
    [3.6191]
    [3.6240]
    Err(e) => {
  • replacement in src/xmpp/mod.rs at line 320
    [3.6342][3.756:892]()
    Box::new(future::err(format_err!("Cmd error")))
    as Box<Future<Item = _, Error = _>>
    [3.6342]
    [3.6572]
    future::err(format_err!("Cmd error"))
  • replacement in src/xmpp/mod.rs at line 335
    [3.1354][3.1354:1422](),[3.1354][3.1354:1422](),[3.1354][3.1354:1422](),[3.1354][3.1354:1422]()
    Box::new(future::err(format_err!("cmd receiver gone")))
    [3.1354]
    [3.1422]
    Box::new(future::ok(()))