Use shared future for signal everywhere
[?]
Dec 29, 2018, 6:45 PM
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQACDependencies
- [2]
2L3JHRULCreate separate functions to process incoming XMPP stanzas - [3]
5A5UVGNMMove receiver closing logic out of xmpp processing - [4]
FVVPKFTLInitial commit - [5]
L77O4T7MFormatting and fixes - [6]
O2GM5J4FDon't split xmpp receiving and sending - [7]
TDOR5XQUAccept destination - [8]
NDDQQP2PUpdate deps - [9]
OGMBXBKPMove online to XmppConnection - [10]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [11]
EOHEZXX3Move request processing to structure - [12]
VS6AHRWIMove XMPP to separate dir - [13]
BTOZT4JPUse failure - [14]
X6L47BHQUse different structure for established xmpp connection - [15]
ZI4GJ72VAdd message to xmpp command - [16]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [17]
5OBTKGDLUpdate deps - [18]
QWE26TMVupdate deps - [19]
FDHRCKH5Unneded Box - [20]
3GEU7TC7Welcome to 2018!
Change contents
- edit in src/xmpp/mod.rs at line 11[3.34]→[3.34:106](∅→∅),[3.434]→[3.210:211](∅→∅),[3.106]→[3.210:211](∅→∅),[3.210]→[3.210:211](∅→∅),[3.3]→[3.3:31](∅→∅)
account: std::rc::Rc<config::Account>,inner: Option<Client>,}pub struct XmppConnection { - replacement in src/xmpp/mod.rs at line 12[3.74]→[3.107:126](∅→∅),[3.375]→[3.346:349](∅→∅),[3.126]→[3.346:349](∅→∅),[3.77]→[3.346:349](∅→∅),[3.346]→[3.346:349](∅→∅),[3.349]→[3.127:356](∅→∅)
inner: Client,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,inner: Some(from.inner),}}inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>, - replacement in src/xmpp/mod.rs at line 25
fn connect<E: 'static>(self) -> impl Future<Item = XmppConnection, Error = E> {fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> { - replacement in src/xmpp/mod.rs at line 30
Box::new(future::ok(XmppConnection { account, inner }))as Box<Future<Item = _, Error = E>>Box::new(future::ok(MaybeXmppConnection {account,inner: Some(inner),})) as Box<Future<Item = _, Error = E>> - replacement in src/xmpp/mod.rs at line 46
XmppConnection {inner: client,account,}.online().and_then(XmppConnection::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})Self::online(client.split(), account).and_then(Self::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}) - edit in src/xmpp/mod.rs at line 55
} - edit in src/xmpp/mod.rs at line 56[3.1875]→[3.1146:1168](∅→∅),[3.1168]→[2.0:1615](∅→∅),[2.1615]→[3.533:555](∅→∅),[3.533]→[3.533:555](∅→∅),[3.555]→[2.1616:2063](∅→∅),[2.2063]→[3.736:758](∅→∅),[3.736]→[3.736:758](∅→∅),[3.758]→[2.2064:2654](∅→∅)
impl XmppConnection {/// base XMPP processingfn xmpp_processing(&mut self, event: &Event) {}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolvedfn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E>,S: FnMut(&mut Self, &Event) -> bool,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {inner: client,account,};xmpp.xmpp_processing(&event);if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::B((e, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Err(e),)))} else {future::err((account, Err(e)))}}})},)} - replacement in src/xmpp/mod.rs at line 58
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {fn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: std::rc::Rc<config::Account>,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> { - replacement in src/xmpp/mod.rs at line 63
(self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {(sink, stream, account),|(sink, stream, account)| {stream.into_future().then(|r| match r {Ok((event, stream)) => match event { - replacement in src/xmpp/mod.rs at line 69
future::ok(future::Loop::Break(XmppConnection {future::ok(future::Loop::Break(MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 71
inner: client,inner: Some((sink, stream)), - replacement in src/xmpp/mod.rs at line 76
future::ok(future::Loop::Continue((client, account)))future::ok(future::Loop::Continue((sink, stream, account))) - replacement in src/xmpp/mod.rs at line 93
let XmppConnection { account, inner } = self;let client = inner;use tokio::prelude::Sink;let MaybeXmppConnection { account, inner } = self;if let Some((sink, stream)) = inner {use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 97
let presence = stanzas::make_presence(&account);info!("Sending presence...");let account2 = account.clone();Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))let presence = stanzas::make_presence(&account);info!("Sending presence...");Box::new(sink.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).join(future::loop_fn((account.clone(), stream), |(account, stream)| {stream.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, stream)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(stream))} else {future::ok(future::Loop::Continue((account, stream,)))} - replacement in src/xmpp/mod.rs at line 130
future::ok(future::Loop::Continue((account, client)))future::err("Got wrong event".to_owned()) - edit in src/xmpp/mod.rs at line 132
} else {future::err("Got wrong event".to_owned()) - replacement in src/xmpp/mod.rs at line 133
}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e)),).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(MaybeXmppConnection {account,inner: Some(inner),}),}),)} else {warn!("Don't gen connection on self-presence");Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>} - replacement in src/xmpp/mod.rs at line 181[3.4311]→[3.4311:4355](∅→∅),[3.4311]→[3.4311:4355](∅→∅),[3.4311]→[3.4311:4355](∅→∅),[3.4311]→[3.4311:4355](∅→∅)
F: future::Future<Item = ()> + 'static,F: future::Future + Clone + 'static,<F as hyper::rt::Future>::Error: std::fmt::Display, - edit in src/xmpp/mod.rs at line 185[3.4357]→[3.519:594](∅→∅),[3.136]→[3.4480:4481](∅→∅),[3.557]→[3.4480:4481](∅→∅),[3.594]→[3.4480:4481](∅→∅),[3.4480]→[3.4480:4481](∅→∅)
let signal = signal.map_err(|_| format_err!("Wrong shutdown signal")); - replacement in src/xmpp/mod.rs at line 209
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1)))) - replacement in src/xmpp/mod.rs at line 218
x.1.into(),x.1, - replacement in src/xmpp/mod.rs at line 224
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1)))) - replacement in src/main.rs at line 63
.map_err(|e| error!("Cann't get CTRL+C signal: {}", e.0)).map_err(|e| {error!("Cann't get CTRL+C signal: {}", e.0);e.0}) - replacement in src/main.rs at line 100[3.6389]→[3.6389:6536](∅→∅),[3.6389]→[3.6389:6536](∅→∅),[3.6389]→[3.6389:6536](∅→∅),[3.6389]→[3.6389:6536](∅→∅),[3.6389]→[3.6389:6536](∅→∅)
let result = ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()),recv,config.account,));let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));