Use shared future for signal everywhere

[?]
Dec 29, 2018, 6:45 PM
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC

Dependencies

  • [2] 2L3JHRUL Create separate functions to process incoming XMPP stanzas
  • [3] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [4] FVVPKFTL Initial commit
  • [5] L77O4T7M Formatting and fixes
  • [6] O2GM5J4F Don't split xmpp receiving and sending
  • [7] TDOR5XQU Accept destination
  • [8] NDDQQP2P Update deps
  • [9] OGMBXBKP Move online to XmppConnection
  • [10] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [11] EOHEZXX3 Move request processing to structure
  • [12] VS6AHRWI Move XMPP to separate dir
  • [13] BTOZT4JP Use failure
  • [14] X6L47BHQ Use different structure for established xmpp connection
  • [15] ZI4GJ72V Add message to xmpp command
  • [16] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [17] 5OBTKGDL Update deps
  • [18] QWE26TMV update deps
  • [19] FDHRCKH5 Unneded Box
  • [20] 3GEU7TC7 Welcome to 2018!

Change contents

  • edit in src/xmpp/mod.rs at line 11
    [3.34][3.34:106](),[3.434][3.210:211](),[3.106][3.210:211](),[3.210][3.210:211](),[3.3][3.3:31]()
    account: std::rc::Rc<config::Account>,
    inner: Option<Client>,
    }
    pub struct XmppConnection {
  • replacement in src/xmpp/mod.rs at line 12
    [3.74][3.107:126](),[3.375][3.346:349](),[3.126][3.346:349](),[3.77][3.346:349](),[3.346][3.346:349](),[3.349][3.127:356]()
    inner: Client,
    }
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    inner: Some(from.inner),
    }
    }
    [3.74]
    [3.356]
    inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
  • replacement in src/xmpp/mod.rs at line 25
    [3.579][3.479:563]()
    fn connect<E: 'static>(self) -> impl Future<Item = XmppConnection, Error = E> {
    [3.579]
    [3.653]
    fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
  • replacement in src/xmpp/mod.rs at line 30
    [3.782][3.624:744]()
    Box::new(future::ok(XmppConnection { account, inner }))
    as Box<Future<Item = _, Error = E>>
    [3.782]
    [3.944]
    Box::new(future::ok(MaybeXmppConnection {
    account,
    inner: Some(inner),
    })) as Box<Future<Item = _, Error = E>>
  • replacement in src/xmpp/mod.rs at line 46
    [3.1522][3.745:1142]()
    XmppConnection {
    inner: client,
    account,
    }
    .online()
    .and_then(XmppConnection::self_presence)
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
    [3.1522]
    [3.1842]
    Self::online(client.split(), account)
    .and_then(Self::self_presence)
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
  • edit in src/xmpp/mod.rs at line 55
    [3.1874][3.1143:1145]()
    }
  • edit in src/xmpp/mod.rs at line 56
    [3.1875][3.1146:1168](),[3.1168][2.0:1615](),[2.1615][3.533:555](),[3.533][3.533:555](),[3.555][2.1616:2063](),[2.2063][3.736:758](),[3.736][3.736:758](),[3.758][2.2064:2654]()
    impl XmppConnection {
    /// base XMPP processing
    fn xmpp_processing(&mut self, event: &Event) {}
    /// process event from xmpp stream
    /// returns from future when condition met
    /// or stop future was resolved
    fn processing<S, F, T, E>(
    self,
    stop_condition: S,
    stop_future: F,
    ) -> impl Future<
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
    >
    where
    F: Future<Item = T, Error = E>,
    S: FnMut(&mut Self, &Event) -> bool,
    {
    future::loop_fn(
    (self, stop_future, stop_condition),
    |(xmpp, stop_future, mut stop_condition)| {
    let XmppConnection { inner, account } = xmpp;
    inner.into_future().select2(stop_future).then(|r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    inner: client,
    account,
    };
    xmpp.xmpp_processing(&event);
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    } else {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
    Err(Either::B((e, a))) => {
    if let Some(inner) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection { inner, account },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    })
    },
    )
    }
  • replacement in src/xmpp/mod.rs at line 58
    [2.2767][2.2767:2865]()
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
    [2.2767]
    [2.2865]
    fn online(
    (sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
    account: std::rc::Rc<config::Account>,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
  • replacement in src/xmpp/mod.rs at line 63
    [2.2899][2.2899:3086]()
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
    [2.2899]
    [2.3086]
    (sink, stream, account),
    |(sink, stream, account)| {
    stream.into_future().then(|r| match r {
    Ok((event, stream)) => match event {
  • replacement in src/xmpp/mod.rs at line 69
    [2.3180][2.3180:3256]()
    future::ok(future::Loop::Break(XmppConnection {
    [2.3180]
    [2.3256]
    future::ok(future::Loop::Break(MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 71
    [2.3297][2.3297:3344]()
    inner: client,
    [2.3297]
    [2.3344]
    inner: Some((sink, stream)),
  • replacement in src/xmpp/mod.rs at line 76
    [2.3513][2.3513:3595]()
    future::ok(future::Loop::Continue((client, account)))
    [2.3513]
    [2.3595]
    future::ok(future::Loop::Continue((sink, stream, account)))
  • replacement in src/xmpp/mod.rs at line 93
    [3.745][3.888:942](),[3.942][3.1587:1649]()
    let XmppConnection { account, inner } = self;
    let client = inner;
    use tokio::prelude::Sink;
    [3.745]
    [3.784]
    let MaybeXmppConnection { account, inner } = self;
    if let Some((sink, stream)) = inner {
    use tokio::prelude::Sink;
  • replacement in src/xmpp/mod.rs at line 97
    [3.785][3.1650:3187]()
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    let account2 = account.clone();
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
    [3.785]
    [3.2608]
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    Box::new(
    sink.send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .join(
    future::loop_fn((account.clone(), stream), |(account, stream)| {
    stream
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, stream)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from")
    .map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(stream))
    } else {
    future::ok(future::Loop::Continue((
    account, stream,
    )))
    }
  • replacement in src/xmpp/mod.rs at line 130
    [3.2657][3.3188:3286]()
    future::ok(future::Loop::Continue((account, client)))
    [3.2657]
    [3.2743]
    future::err("Got wrong event".to_owned())
  • edit in src/xmpp/mod.rs at line 132
    [3.2785][3.3287:3414]()
    } else {
    future::err("Got wrong event".to_owned())
  • replacement in src/xmpp/mod.rs at line 133
    [3.2823][3.3415:3996]()
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
    [3.2823]
    [3.3783]
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e)),
    )
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(MaybeXmppConnection {
    account,
    inner: Some(inner),
    }),
    }),
    )
    } else {
    warn!("Don't gen connection on self-presence");
    Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
    }
  • replacement in src/xmpp/mod.rs at line 181
    [3.4311][3.4311:4355](),[3.4311][3.4311:4355](),[3.4311][3.4311:4355](),[3.4311][3.4311:4355]()
    F: future::Future<Item = ()> + 'static,
    [3.4311]
    [3.465]
    F: future::Future + Clone + 'static,
    <F as hyper::rt::Future>::Error: std::fmt::Display,
  • edit in src/xmpp/mod.rs at line 185
    [3.4357][3.519:594](),[3.136][3.4480:4481](),[3.557][3.4480:4481](),[3.594][3.4480:4481](),[3.4480][3.4480:4481]()
    let signal = signal.map_err(|_| format_err!("Wrong shutdown signal"));
  • replacement in src/xmpp/mod.rs at line 209
    [3.5285][3.4189:4283]()
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
    [3.5285]
    [3.5372]
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
  • replacement in src/xmpp/mod.rs at line 218
    [3.5743][3.4284:4324]()
    x.1.into(),
    [3.5743]
    [3.5776]
    x.1,
  • replacement in src/xmpp/mod.rs at line 224
    [3.6018][3.4325:4419]()
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
    [3.6018]
    [3.6105]
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
  • replacement in src/main.rs at line 63
    [3.4954][3.4954:5020]()
    .map_err(|e| error!("Cann't get CTRL+C signal: {}", e.0))
    [3.4954]
    [3.5020]
    .map_err(|e| {
    error!("Cann't get CTRL+C signal: {}", e.0);
    e.0
    })
  • replacement in src/main.rs at line 100
    [3.6389][3.6389:6536](),[3.6389][3.6389:6536](),[3.6389][3.6389:6536](),[3.6389][3.6389:6536](),[3.6389][3.6389:6536]()
    let result = ctrt.block_on(xmpp_process(
    ctrl_c.clone().map(|_| ()),
    recv,
    config.account,
    ));
    [3.6389]
    [3.6536]
    let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));