Rework stopping xmpp connection
[?]
Dec 29, 2018, 8:22 PM
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QCDependencies
- [2]
AGIW6YR3Use shared future for signal everywhere - [3]
NDDQQP2PUpdate deps - [4]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [5]
3GEU7TC7Welcome to 2018! - [6]
BTOZT4JPUse failure - [7]
QWE26TMVupdate deps - [8]
O2GM5J4FDon't split xmpp receiving and sending - [9]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [10]
OGMBXBKPMove online to XmppConnection - [11]
X6L47BHQUse different structure for established xmpp connection - [12]
2L3JHRULCreate separate functions to process incoming XMPP stanzas - [13]
TDOR5XQUAccept destination - [14]
VS6AHRWIMove XMPP to separate dir
Change contents
- edit in src/xmpp/mod.rs at line 11
account: std::rc::Rc<config::Account>,inner: Option<Client>,}pub struct XmppConnection { - replacement in src/xmpp/mod.rs at line 17
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,inner: Client,}impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,inner: Some(from.inner),}} - replacement in src/xmpp/mod.rs at line 38
/// Error shoud be !fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {/// don't connect if stop_future resolvedfn connect<F>(self,stop_future: F,) -> impl Future<Item = XmppConnection, Error = failure::Error>whereF: future::Future + 'static,<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,{ - replacement in src/xmpp/mod.rs at line 51
Box::new(future::ok(MaybeXmppConnection {account,inner: Some(inner),})) as Box<Future<Item = _, Error = E>>Box::new(future::ok(XmppConnection { account, inner }))as Box<Future<Item = _, Error = _>> - replacement in src/xmpp/mod.rs at line 54[3.961]→[3.961:1476](∅→∅),[3.961]→[3.961:1476](∅→∅),[3.961]→[3.961:1476](∅→∅),[3.961]→[3.961:1476](∅→∅),[3.961]→[3.961:1476](∅→∅)
Box::new(future::loop_fn(account, |account| {info!("xmpp initialization...");let mut res_client = Client::new(&account.jid, &account.password);while let Err(e) = res_client {error!("Cann't init xmpp client: {}", e);res_client = Client::new(&account.jid, &account.password);}let client = res_client.expect("Cann't init xmpp client");info!("xmpp initialized");Box::new(stop_future.select2(future::loop_fn(account, |account| {info!("xmpp initialization...");let res_client = Client::new(&account.jid, &account.password);match res_client {Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))as Box<Future<Item = _, Error = _>>,Ok(client) => {info!("xmpp initialized"); - replacement in src/xmpp/mod.rs at line 66[3.1477]→[3.1477:1522](∅→∅),[3.1477]→[3.1477:1522](∅→∅),[3.1477]→[3.1477:1522](∅→∅),[3.1477]→[3.1477:1522](∅→∅),[3.1477]→[3.1477:1522](∅→∅),[3.1522]→[2.321:426](∅→∅)
// future to wait for onlineSelf::online(client.split(), account).and_then(Self::self_presence)// future to wait for onlineBox::new(XmppConnection {inner: client,account,}.online().and_then(XmppConnection::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),},),)}}}).map_err(|_: ()| ()),) - replacement in src/xmpp/mod.rs at line 87[2.466]→[2.466:641](∅→∅),[3.1142]→[3.1842:1858](∅→∅),[2.641]→[3.1842:1858](∅→∅),[3.1842]→[3.1842:1858](∅→∅)
Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})}))Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),Ok(Either::B((x, _a))) => future::ok(x),Err(Either::A((e, _b))) => future::err(e.into()),Err(Either::B((_, _a))) => {future::err(format_err!("Cann't initiate XMPP connection"))}}),) - edit in src/xmpp/mod.rs at line 96
}}impl XmppConnection {/// base XMPP processingfn xmpp_processing(&mut self, _event: &Event) {}/// process event from xmpp stream/// returns from future when condition met/// or stop future was resolvedfn processing<S, F, T, E>(self,stop_condition: S,stop_future: F,) -> impl Future<Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),>whereF: Future<Item = T, Error = E>,S: FnMut(&mut Self, &Event) -> bool,{future::loop_fn((self, stop_future, stop_condition),|(xmpp, stop_future, mut stop_condition)| {let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {inner: client,account,};xmpp.xmpp_processing(&event);if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::B((e, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Err(e),)))} else {future::err((account, Err(e)))}}})},) - replacement in src/xmpp/mod.rs at line 167
fn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: std::rc::Rc<config::Account>,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> { - replacement in src/xmpp/mod.rs at line 169
(sink, stream, account),|(sink, stream, account)| {stream.into_future().then(|r| match r {Ok((event, stream)) => match event {(self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event { - replacement in src/xmpp/mod.rs at line 175
future::ok(future::Loop::Break(MaybeXmppConnection {future::ok(future::Loop::Break(XmppConnection { - replacement in src/xmpp/mod.rs at line 177
inner: Some((sink, stream)),inner: client, - replacement in src/xmpp/mod.rs at line 182
future::ok(future::Loop::Continue((sink, stream, account)))future::ok(future::Loop::Continue((client, account))) - replacement in src/xmpp/mod.rs at line 199
let MaybeXmppConnection { account, inner } = self;if let Some((sink, stream)) = inner {use tokio::prelude::Sink;let XmppConnection { account, inner } = self;let client = inner;use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 203
let presence = stanzas::make_presence(&account);info!("Sending presence...");Box::new(sink.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).join(future::loop_fn((account.clone(), stream), |(account, stream)| {stream.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, stream)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(stream))} else {future::ok(future::Loop::Continue((account, stream,)))}let presence = stanzas::make_presence(&account);info!("Sending presence...");let account2 = account.clone();Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client)) - replacement in src/xmpp/mod.rs at line 232
future::err("Got wrong event".to_owned())future::ok(future::Loop::Continue((account, client))) - edit in src/xmpp/mod.rs at line 234
} else {future::err("Got wrong event".to_owned()) - replacement in src/xmpp/mod.rs at line 237
None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e)),).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(MaybeXmppConnection {account,inner: Some(inner),}),}),)} else {warn!("Don't gen connection on self-presence");Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),) - replacement in src/xmpp/mod.rs at line 280
<F as hyper::rt::Future>::Error: std::fmt::Display,<F as hyper::rt::Future>::Error: std::fmt::Display + Into<failure::Error> + Send, - replacement in src/xmpp/mod.rs at line 291[3.4692]→[3.4692:4761](∅→∅),[3.4692]→[3.4692:4761](∅→∅),[3.4692]→[3.4692:4761](∅→∅),[3.4692]→[3.4692:4761](∅→∅)
signal.select2(conn.connect().and_then(|conn| {conn.connect(signal.clone()).and_then(|conn| { - replacement in src/xmpp/mod.rs at line 300
.map(|f| (f, conn))})).map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))}) - replacement in src/xmpp/mod.rs at line 304[3.5146]→[3.5146:5285](∅→∅),[3.5146]→[3.5146:5285](∅→∅),[3.5146]→[3.5146:5285](∅→∅),[3.5146]→[3.5146:5285](∅→∅),[3.5285]→[2.4399:4486](∅→∅),[3.3375]→[3.5372:5743](∅→∅),[3.4283]→[3.5372:5743](∅→∅),[2.4486]→[3.5372:5743](∅→∅),[3.5034]→[3.5372:5743](∅→∅),[3.5372]→[3.5372:5743](∅→∅),[3.5743]→[2.4487:4520](∅→∅),[3.3416]→[3.5776:6018](∅→∅),[3.4324]→[3.5776:6018](∅→∅),[2.4520]→[3.5776:6018](∅→∅),[3.5068]→[3.5776:6018](∅→∅),[3.5776]→[3.5776:6018](∅→∅),[3.6018]→[2.4521:4608](∅→∅),[3.3511]→[3.6105:6169](∅→∅),[3.4419]→[3.6105:6169](∅→∅),[2.4608]→[3.6105:6169](∅→∅),[3.5156]→[3.6105:6169](∅→∅),[3.6105]→[3.6105:6169](∅→∅)
Ok(Either::A((_x, b))) => {info!("Got signal");// got signal, breaksBox::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Ok(Either::B((x, a))) => {info!("Got cmd");// got cmd, continueBox::new(future::ok(future::Loop::Continue(XmppState::new((x.0).1,a,x.1,)))) as Box<Future<Item = _, Error = _>>}Err(Either::A((e, b))) => {// got signal error, breakserror!("Signal error: {}", e);Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>Ok((cmd, cmd_recv, conn)) => {if let Some(_cmd) = cmd {info!("Got cmd");// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(cmd_recv,signal,conn.into(),)))} else {future::ok(future::Loop::Break((None, conn.into())))} - replacement in src/xmpp/mod.rs at line 317[3.6191]→[3.6191:6240](∅→∅),[3.6191]→[3.6191:6240](∅→∅),[3.6191]→[3.6191:6240](∅→∅),[3.6191]→[3.6191:6240](∅→∅),[3.6191]→[3.6191:6240](∅→∅)
Err(Either::B((e, _a))) => {Err(e) => { - replacement in src/xmpp/mod.rs at line 320
Box::new(future::err(format_err!("Cmd error")))as Box<Future<Item = _, Error = _>>future::err(format_err!("Cmd error")) - replacement in src/xmpp/mod.rs at line 335[3.1354]→[3.1354:1422](∅→∅),[3.1354]→[3.1354:1422](∅→∅),[3.1354]→[3.1354:1422](∅→∅),[3.1354]→[3.1354:1422](∅→∅)
Box::new(future::err(format_err!("cmd receiver gone")))Box::new(future::ok(()))