Process commands in the separate function
[?]
Jan 1, 2019, 9:28 AM
UMTLHH77LGABTVKULH6ONVSBTMSFGH3CJ6GPNTFWH73AWNJZV6LQCDependencies
- [2]
FWJDW3G5Allow process xmpp incoming stanzas with futures - [3]
5OBTKGDLUpdate deps - [4]
WJNXI6Z4Fill roster - [5]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [6]
ALP2YJIURename XmppState to XmppProcessState - [7]
CP4MZO6VLeftover commands are processed via stoppable receiver - [8]
XGP44R5HRework stopping xmpp connection - [9]
OGMBXBKPMove online to XmppConnection - [10]
QYY3KRGLUse failure instead Box<dyn Error> - [11]
UWY5EVZ6Add dummy roster data - [12]
QTCUURXNAdd additional requirement for command stream - [13]
L77O4T7MFormatting and fixes - [14]
ZI4GJ72VAdd message to xmpp command - [15]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [16]
BTOZT4JPUse failure - [17]
NDDQQP2PUpdate deps - [18]
AYQZ2UIAUpdate deps - [19]
5A5UVGNMMove receiver closing logic out of xmpp processing - [20]
HU3NZX5ZProcess self-presence via new processing code - [21]
IK3YDPTYUpdate deps - [22]
V5HDBSZMUse jid for receiver address - [23]
4LRBIGVTShow info about xmpp errors - [24]
AGIW6YR3Use shared future for signal everywhere - [25]
TDOR5XQUAccept destination - [26]
OANBCLN5Move xmpp client into XmppState - [27]
EOHEZXX3Move request processing to structure - [28]
UIXIQHDYWait for commands via new processing code - [29]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [30]
X6L47BHQUse different structure for established xmpp connection - [31]
QWE26TMVupdate deps - [32]
VS6AHRWIMove XMPP to separate dir - [33]
BWDUANCVSecond part of processing result is only about stop_future - [34]
PVCRPP3BSome servers don't send to in initial presence
Change contents
- edit in src/xmpp/mod.rs at line 7
use std::collections::HashMap; - edit in src/xmpp/mod.rs at line 14
roster: HashMap<jid::Jid, ()>, - replacement in src/xmpp/mod.rs at line 35
impl MaybeXmppConnection {fn new(account: config::Account) -> MaybeXmppConnection {impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 38
account: std::rc::Rc::new(account),account: std::rc::Rc::new(from),state: None,}}}impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {MaybeXmppConnection {account: from, - edit in src/xmpp/mod.rs at line 51
} - edit in src/xmpp/mod.rs at line 53
impl MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 55
/// don't connect if stop_future resolved/// don't connect only if stop_future resolved - replacement in src/xmpp/mod.rs at line 90
state: XmppState {client,roster: HashMap::new(),},state: XmppState { client }, - edit in src/xmpp/mod.rs at line 94
.map(|(conn, _)| conn) - edit in src/xmpp/mod.rs at line 96
.and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}) - replacement in src/xmpp/mod.rs at line 125
fn xmpp_processing(self,event: &Event,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {fn xmpp_processing(&mut self, event: &Event) { - edit in src/xmpp/mod.rs at line 127
future::ok(self) - replacement in src/xmpp/mod.rs at line 131
/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_future/// or stop future was resolved - replacement in src/xmpp/mod.rs at line 137
Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),Item = (Self, Result<Either<F, T>, failure::Error>),Error = (std::rc::Rc<config::Account>,Result<Either<F, T>, failure::Error>,), - replacement in src/xmpp/mod.rs at line 144
F: Future<Item = T, Error = E> + 'static,S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,T: 'static,E: 'static,F: Future<Item = T, Error = E>,E: Into<failure::Error>,S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>, - replacement in src/xmpp/mod.rs at line 151
let XmppConnection {state: XmppState { client, roster },account,} = xmpp;client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let xmpp = XmppConnection {state: XmppState { client, roster },account,};Box::new(xmpp.xmpp_processing(&event).then(|r| match r {Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {let XmppConnection { state, account } = xmpp;state.client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client },account,};xmpp.xmpp_processing(&event);match stop_condition(&mut xmpp, event) { - replacement in src/xmpp/mod.rs at line 173
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),},Err(account) => future::err((account, Ok(Either::A(b)))),})) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(future::err((account, Ok(Either::A(b)))))Err(e) => future::err((xmpp.account, Err(e))),}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Err(e.into()),)))} else {future::err((account, Err(e.into())))} - replacement in src/xmpp/mod.rs at line 209
}Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}),Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Err(e),)))} else {future::err((account, Err(e)))}),})}) - replacement in src/xmpp/mod.rs at line 215
/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {/// returns error if something went wrongfn online(&mut self, event: Event) -> Result<bool, failure::Error> { - replacement in src/xmpp/mod.rs at line 228
Err(())Err(format_err!("Disconnected while online")) - replacement in src/xmpp/mod.rs at line 233
fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> { - replacement in src/xmpp/mod.rs at line 242
error!("Get error instead of roster");Err(())Err(format_err!("Get error instead of roster")) - edit in src/xmpp/mod.rs at line 254
}Err(e) => {error!("Cann't parse roster: {}", e);Err(()) - edit in src/xmpp/mod.rs at line 255
Err(e) => Err(format_err!("Cann't parse roster: {}", e)), - edit in src/xmpp/mod.rs at line 257
}_ => {error!("Unknown result of roster");Err(()) - edit in src/xmpp/mod.rs at line 258
_ => Err(format_err!("Unknown result of roster")), - replacement in src/xmpp/mod.rs at line 264
error!("Iq stanza without id");Err(())Err(format_err!("Iq stanza without id")) - replacement in src/xmpp/mod.rs at line 270
error!("Wrong event while waiting roster");Err(())Err(format_err!("Wrong event while waiting roster")) - replacement in src/xmpp/mod.rs at line 279
F: Future<Error = E> + 'static,E: 'static,F: Future<Error = E>,E: Into<failure::Error>, - replacement in src/xmpp/mod.rs at line 282
let XmppConnection {account,state: XmppState { client, roster },} = self;let XmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 289
clientstate.client - replacement in src/xmpp/mod.rs at line 294
account2(account2, Err(failure::SyncFailure::new(e).into())) - replacement in src/xmpp/mod.rs at line 298
state: XmppState { client, roster },state: XmppState { client }, - replacement in src/xmpp/mod.rs at line 302
.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn), - replacement in src/xmpp/mod.rs at line 323
F: Future<Error = E> + 'static,E: Into<failure::Error> + 'static,F: Future<Error = E>,E: Into<failure::Error>, - replacement in src/xmpp/mod.rs at line 326
let XmppConnection {account,state: XmppState { client, roster },} = self;let XmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 333
clientstate.client - replacement in src/xmpp/mod.rs at line 338
account2(account2, Err(failure::SyncFailure::new(e).into())) - replacement in src/xmpp/mod.rs at line 342
state: XmppState { client, roster },state: XmppState { client }, - replacement in src/xmpp/mod.rs at line 357
error!("Wrong event while waiting self-presence");Err(())Err(format_err!("Wrong event while waiting self-presence")) - replacement in src/xmpp/mod.rs at line 362
.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})})>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>.then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn), - edit in src/xmpp/mod.rs at line 371
================================}fn process_command(self,_cmd: &XmppCommand,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {info!("Got command");future::ok(self)<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - replacement in src/xmpp/mod.rs at line 387
struct XmppState<F, S> {struct XmppProcessState<F, S> { - replacement in src/xmpp/mod.rs at line 393
impl<F, S> XmppState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {XmppState {impl<F, S> XmppProcessState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {XmppProcessState { - replacement in src/xmpp/mod.rs at line 413
let conn = MaybeXmppConnection::new(account);let conn = account.into(); - replacement in src/xmpp/mod.rs at line 415
future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState {future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {let XmppProcessState { - replacement in src/xmpp/mod.rs at line 421
conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");cmd_recv.into_future().map_err(|_| {error!("Got error on recv cmd");format_err!("Receive cmd error")}).map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))}).then(|r| {match r {Ok((cmd, cmd_recv, conn)) => {if let Some(_cmd) = cmd {info!("Got cmd");// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");conn.processing(|_, _| Ok(false), cmd_recv.into_future()).then(|r| match r {Ok((conn, r)) => match r {Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState { - replacement in src/xmpp/mod.rs at line 430
conn.into(),)))conn: conn.into(),})) - replacement in src/xmpp/mod.rs at line 433
future::ok(future::Loop::Break((None, conn.into())))future::err(format_err!("Command receiver is gone"))})as Box<dyn Future<Item = _, Error = _>>,Ok(Either::B((cmd, cmd_recv))) => {if let Some(cmd) = cmd {Box::new(conn.process_command(&cmd).then(|r| {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: match r {Ok(conn) => conn.into(),Err(account) => account.into(),},}))}))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(future::ok(future::Loop::Break(())))}}Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),},Err((account, r)) => Box::new(match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::err(format_err!("Command receiver is gone"))} - replacement in src/xmpp/mod.rs at line 467
}Err(e) => {// got cmd error, its baderror!("Cmd error: {}", e);future::err(e)}}})}).and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {if let Some(cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| format_err!("cmd receiver last error")),) as Box<Future<Item = (), Error = failure::Error>>} else {Box::new(future::ok(()))}Ok(Either::B((cmd, cmd_recv))) => {if let Some(_cmd) = cmd {error!("Xmpp connection broken while get command");future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::ok(future::Loop::Break(()))}}Err(_) => future::err(format_err!("Command receiver is broken")),}),})})