Allow process xmpp incoming stanzas with futures
[?]
Jan 1, 2019, 8:35 AM
FWJDW3G5KT66GHFS7IPPSFS3G5COH2BNN57GHTVT533FE7467QZACDependencies
- [2]
UIXIQHDYWait for commands via new processing code - [3]
AGIW6YR3Use shared future for signal everywhere - [4]
QTCUURXNAdd additional requirement for command stream - [5]
IK3YDPTYUpdate deps - [6]
QYY3KRGLUse failure instead Box<dyn Error> - [7]
BWDUANCVSecond part of processing result is only about stop_future - [8]
NDDQQP2PUpdate deps - [9]
UWY5EVZ6Add dummy roster data - [10]
XGP44R5HRework stopping xmpp connection - [11]
5OBTKGDLUpdate deps - [12]
OANBCLN5Move xmpp client into XmppState - [13]
QWE26TMVupdate deps - [14]
TDOR5XQUAccept destination - [15]
EOHEZXX3Move request processing to structure - [16]
V5HDBSZMUse jid for receiver address - [17]
4LRBIGVTShow info about xmpp errors - [18]
AYQZ2UIAUpdate deps - [19]
HU3NZX5ZProcess self-presence via new processing code - [20]
OGMBXBKPMove online to XmppConnection - [21]
PVCRPP3BSome servers don't send to in initial presence - [22]
CP4MZO6VLeftover commands are processed via stoppable receiver - [23]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [24]
5A5UVGNMMove receiver closing logic out of xmpp processing - [25]
L77O4T7MFormatting and fixes - [26]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [27]
X6L47BHQUse different structure for established xmpp connection - [28]
VS6AHRWIMove XMPP to separate dir - [29]
ZI4GJ72VAdd message to xmpp command - [30]
H7R7Y3FQUse new processing code to wait online - [31]
BTOZT4JPUse failure - [32]
ALP2YJIURename XmppState to XmppProcessState - [33]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [34]
WJNXI6Z4Fill roster
Change contents
- edit in src/xmpp/mod.rs at line 7
use std::collections::HashMap; - edit in src/xmpp/mod.rs at line 16
roster: HashMap<jid::Jid, ()>, - replacement in src/xmpp/mod.rs at line 38
impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {impl MaybeXmppConnection {fn new(account: config::Account) -> MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 41
account: std::rc::Rc::new(from),account: std::rc::Rc::new(account), - edit in src/xmpp/mod.rs at line 45
} - edit in src/xmpp/mod.rs at line 46[3.129]→[2.233:372](∅→∅),[2.372]→[3.772:802](∅→∅),[3.772]→[3.772:802](∅→∅),[3.802]→[2.373:400](∅→∅),[2.400]→[3.578:603](∅→∅),[3.527]→[3.578:603](∅→∅),[3.603]→[3.499:515](∅→∅),[3.535]→[3.499:515](∅→∅),[3.111]→[3.499:515](∅→∅),[3.829]→[3.499:515](∅→∅),[3.174]→[3.499:515](∅→∅),[3.499]→[3.499:515](∅→∅),[3.515]→[2.401:403](∅→∅),[2.403]→[3.515:516](∅→∅),[3.515]→[3.515:516](∅→∅),[3.516]→[2.404:431](∅→∅)
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from,state: None,}}}impl MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 47
/// don't connect only if stop_future resolved/// don't connect if stop_future resolved - replacement in src/xmpp/mod.rs at line 82
state: XmppState { client },state: XmppState {client,roster: HashMap::new(),}, - edit in src/xmpp/mod.rs at line 89
.map(|(conn, _)| conn) - edit in src/xmpp/mod.rs at line 90
.and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}) - replacement in src/xmpp/mod.rs at line 124
fn xmpp_processing(&mut self, event: &Event) {fn xmpp_processing(self,event: &Event,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> { - edit in src/xmpp/mod.rs at line 129
future::ok(self) - replacement in src/xmpp/mod.rs at line 134
/// or stop future was resolved/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_future - replacement in src/xmpp/mod.rs at line 142
Item = (Self, Result<Either<F, T>, failure::Error>),Error = (std::rc::Rc<config::Account>,Result<Either<F, T>, failure::Error>,),Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>), - replacement in src/xmpp/mod.rs at line 146[3.4335]→[3.4335:4375](∅→∅),[3.4335]→[3.4335:4375](∅→∅),[3.4335]→[3.4335:4375](∅→∅),[3.4335]→[3.4335:4375](∅→∅),[3.4335]→[3.4335:4375](∅→∅),[3.4375]→[2.842:943](∅→∅)
F: Future<Item = T, Error = E>,E: Into<failure::Error>,S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,F: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static, - replacement in src/xmpp/mod.rs at line 154
let XmppConnection { state, account } = xmpp;state.client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client },account,};xmpp.xmpp_processing(&event);match stop_condition(&mut xmpp, event) {let XmppConnection {state: XmppState { client, roster },account,} = xmpp;client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let xmpp = XmppConnection {state: XmppState { client, roster },account,};Box::new(xmpp.xmpp_processing(&event).then(|r| match r {Ok(mut xmpp) => match stop_condition(&mut xmpp, event) { - replacement in src/xmpp/mod.rs at line 175[2.2096]→[2.2096:2179](∅→∅),[2.2179]→[3.3025:3059](∅→∅),[3.3025]→[3.3025:3059](∅→∅),[3.3059]→[2.2180:2290](∅→∅),[2.2290]→[3.3255:3285](∅→∅),[3.3149]→[3.3255:3285](∅→∅),[3.742]→[3.3255:3285](∅→∅),[3.1375]→[3.3255:3285](∅→∅),[3.3255]→[3.3255:3285](∅→∅),[3.3285]→[2.2291:2940](∅→∅)
Err(e) => future::err((xmpp.account, Err(e))),}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),},Err(account) => future::err((account, Ok(Either::A(b)))),})) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(future::err((account, Ok(Either::A(b))))) - replacement in src/xmpp/mod.rs at line 182[2.2966]→[2.2966:3076](∅→∅),[2.3076]→[3.1409:1478](∅→∅),[3.1409]→[3.1409:1478](∅→∅),[3.1478]→[3.3285:3311](∅→∅),[3.3285]→[3.3285:3311](∅→∅),[3.3311]→[2.3077:3695](∅→∅),[3.2179]→[3.4505:4531](∅→∅),[2.3695]→[3.4505:4531](∅→∅),[3.2179]→[3.4505:4531](∅→∅),[3.5636]→[3.4505:4531](∅→∅),[3.2058]→[3.4505:4531](∅→∅),[3.5335]→[3.4505:4531](∅→∅),[3.4505]→[3.4505:4531](∅→∅),[3.4531]→[2.3696:3719](∅→∅)
Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Err(e.into()),)))} else {future::err((account, Err(e.into())))}}})}Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}),Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Err(e),)))} else {future::err((account, Err(e)))}),}) - replacement in src/xmpp/mod.rs at line 215
/// returns error if something went wrongfn online(&mut self, event: Event) -> Result<bool, failure::Error> {/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> { - replacement in src/xmpp/mod.rs at line 228
Err(format_err!("Disconnected while online"))Err(()) - replacement in src/xmpp/mod.rs at line 233
fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> { - replacement in src/xmpp/mod.rs at line 242
Err(format_err!("Get error instead of roster"))error!("Get error instead of roster");Err(()) - replacement in src/xmpp/mod.rs at line 256
Err(e) => Err(format_err!("Cann't parse roster: {}", e)),Err(e) => {error!("Cann't parse roster: {}", e);Err(())} - replacement in src/xmpp/mod.rs at line 262
_ => Err(format_err!("Unknown result of roster")),_ => {error!("Unknown result of roster");Err(())} - replacement in src/xmpp/mod.rs at line 271
Err(format_err!("Iq stanza without id"))error!("Iq stanza without id");Err(()) - replacement in src/xmpp/mod.rs at line 278
Err(format_err!("Wrong event while waiting roster"))error!("Wrong event while waiting roster");Err(()) - replacement in src/xmpp/mod.rs at line 288
F: Future<Error = E>,E: Into<failure::Error>,F: Future<Error = E> + 'static,E: 'static, - replacement in src/xmpp/mod.rs at line 291
let XmppConnection { account, state } = self;let XmppConnection {account,state: XmppState { client, roster },} = self; - replacement in src/xmpp/mod.rs at line 301
state.clientclient - replacement in src/xmpp/mod.rs at line 305
(account2, Err(failure::SyncFailure::new(e).into()))account2 - replacement in src/xmpp/mod.rs at line 309
state: XmppState { client },state: XmppState { client, roster }, - replacement in src/xmpp/mod.rs at line 313
}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn),.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}) - replacement in src/xmpp/mod.rs at line 327
F: Future<Error = E>,E: Into<failure::Error>,F: Future<Error = E> + 'static,E: Into<failure::Error> + 'static, - replacement in src/xmpp/mod.rs at line 330
let XmppConnection { account, state } = self;let XmppConnection {account,state: XmppState { client, roster },} = self; - replacement in src/xmpp/mod.rs at line 340
state.clientclient - replacement in src/xmpp/mod.rs at line 344
(account2, Err(failure::SyncFailure::new(e).into()))account2 - replacement in src/xmpp/mod.rs at line 348
state: XmppState { client },state: XmppState { client, roster }, - replacement in src/xmpp/mod.rs at line 363
Err(format_err!("Wrong event while waiting self-presence"))error!("Wrong event while waiting self-presence");Err(()) - edit in src/xmpp/mod.rs at line 369
.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}) - edit in src/xmpp/mod.rs at line 376
.then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn),}) - replacement in src/xmpp/mod.rs at line 382
struct XmppProcessState<F, S> {struct XmppState<F, S> { - replacement in src/xmpp/mod.rs at line 388
impl<F, S> XmppProcessState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {XmppProcessState {impl<F, S> XmppState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {XmppState { - replacement in src/xmpp/mod.rs at line 408
let conn = account.into();let conn = MaybeXmppConnection::new(account); - replacement in src/xmpp/mod.rs at line 410
future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {let XmppProcessState {future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState { - replacement in src/xmpp/mod.rs at line 416
conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");conn.processing(|_, _| Ok(false), cmd_recv.into_future()).then(|r| match r {Ok((conn, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))}conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");cmd_recv.into_future().map_err(|_| {error!("Got error on recv cmd");format_err!("Receive cmd error")}).map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))}).then(|r| {match r {Ok((cmd, cmd_recv, conn)) => {if let Some(_cmd) = cmd {info!("Got cmd");// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(cmd_recv,signal,conn.into(),)))} else {future::ok(future::Loop::Break((None, conn.into()))) - replacement in src/xmpp/mod.rs at line 441
Ok(Either::B((cmd, cmd_recv))) => {if let Some(_cmd) = cmd {info!("Got command");future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::ok(future::Loop::Break(()))}}Err(_) => future::err(format_err!("Command receiver is broken")),},Err((account, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::err(format_err!("Command receiver is gone"))}}Ok(Either::B((cmd, cmd_recv))) => {if let Some(_cmd) = cmd {error!("Xmpp connection broken while get command");future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::ok(future::Loop::Break(()))}}Err(_) => future::err(format_err!("Command receiver is broken")),},})})}Err(e) => {// got cmd error, its baderror!("Cmd error: {}", e);future::err(e)}}})}).and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {if let Some(cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| format_err!("cmd receiver last error")),) as Box<Future<Item = (), Error = failure::Error>>} else {Box::new(future::ok(()))}