Wait for commands via new processing code
[?]
Dec 31, 2018, 10:41 PM
UIXIQHDY7E4DLLRQP4ATXF625QC4KGMHWOV24UEQ6WD7G2IPM4HACDependencies
- [2]
BWDUANCVSecond part of processing result is only about stop_future - [3]
PFC7OJQFQuery roster - [4]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [5]
QYY3KRGLUse failure instead Box<dyn Error> - [6]
XGP44R5HRework stopping xmpp connection - [7]
V5HDBSZMUse jid for receiver address - [8]
IK3YDPTYUpdate deps - [9]
X6L47BHQUse different structure for established xmpp connection - [10]
ZI4GJ72VAdd message to xmpp command - [11]
OANBCLN5Move xmpp client into XmppState - [12]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [13]
NDDQQP2PUpdate deps - [14]
EOHEZXX3Move request processing to structure - [15]
TDOR5XQUAccept destination - [16]
H7R7Y3FQUse new processing code to wait online - [17]
HU3NZX5ZProcess self-presence via new processing code - [18]
3GEU7TC7Welcome to 2018! - [19]
OGMBXBKPMove online to XmppConnection - [20]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [21]
CP4MZO6VLeftover commands are processed via stoppable receiver - [22]
L77O4T7MFormatting and fixes - [23]
WJNXI6Z4Fill roster - [24]
4LRBIGVTShow info about xmpp errors - [25]
AGIW6YR3Use shared future for signal everywhere - [26]
PVCRPP3BSome servers don't send to in initial presence - [27]
ALP2YJIURename XmppState to XmppProcessState - [28]
QWE26TMVupdate deps - [29]
AYQZ2UIAUpdate deps - [30]
QTCUURXNAdd additional requirement for command stream - [31]
VS6AHRWIMove XMPP to separate dir - [32]
5A5UVGNMMove receiver closing logic out of xmpp processing - [33]
5OBTKGDLUpdate deps - [34]
BTOZT4JPUse failure - [35]
UWY5EVZ6Add dummy roster data
Change contents
- edit in src/xmpp/mod.rs at line 7
use std::collections::HashMap; - edit in src/xmpp/mod.rs at line 14
roster: HashMap<jid::Jid, ()>, - edit in src/xmpp/mod.rs at line 31
}}}impl From<config::Account> for MaybeXmppConnection {fn from(from: config::Account) -> MaybeXmppConnection {MaybeXmppConnection {account: std::rc::Rc::new(from),state: None, - replacement in src/xmpp/mod.rs at line 44
impl MaybeXmppConnection {fn new(account: config::Account) -> MaybeXmppConnection {impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 47[3.182]→[3.479:527](∅→∅),[3.478]→[3.479:527](∅→∅),[3.182]→[3.479:527](∅→∅),[3.153]→[3.479:527](∅→∅),[3.182]→[3.479:527](∅→∅),[3.802]→[3.479:527](∅→∅),[3.478]→[3.479:527](∅→∅),[3.453]→[3.479:527](∅→∅)
account: std::rc::Rc::new(account),account: from, - edit in src/xmpp/mod.rs at line 51
} - edit in src/xmpp/mod.rs at line 53
impl MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 55
/// don't connect if stop_future resolved/// don't connect only if stop_future resolved - replacement in src/xmpp/mod.rs at line 90
state: XmppState {client,roster: HashMap::new(),},state: XmppState { client }, - edit in src/xmpp/mod.rs at line 94
.map(|(conn, _)| conn) - edit in src/xmpp/mod.rs at line 96
.and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}) - replacement in src/xmpp/mod.rs at line 131
/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_future/// or stop future was resolved - replacement in src/xmpp/mod.rs at line 137
Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),Item = (Self, Result<Either<F, T>, failure::Error>),Error = (std::rc::Rc<config::Account>,Result<Either<F, T>, failure::Error>,), - replacement in src/xmpp/mod.rs at line 145
S: FnMut(&mut Self, Event) -> Result<bool, ()>,E: Into<failure::Error>,S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>, - replacement in src/xmpp/mod.rs at line 151[3.4612]→[2.2340:2566](∅→∅),[2.2566]→[3.534:708](∅→∅),[3.539]→[3.534:708](∅→∅),[3.534]→[3.534:708](∅→∅),[3.708]→[2.2567:2636](∅→∅),[2.2636]→[3.777:907](∅→∅),[3.587]→[3.777:907](∅→∅),[3.777]→[3.777:907](∅→∅),[3.907]→[2.2637:3025](∅→∅)
let XmppConnection {state: XmppState { client, roster },account,} = xmpp;client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, roster },account,};xmpp.xmpp_processing(&event);match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))let XmppConnection { state, account } = xmpp;state.client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client },account,};xmpp.xmpp_processing(&event);match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(e) => future::err((xmpp.account, Err(e))), - replacement in src/xmpp/mod.rs at line 175
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),} else {future::err((account, Ok(Either::A(b)))) - replacement in src/xmpp/mod.rs at line 178
} else {}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0); - replacement in src/xmpp/mod.rs at line 196[3.3311]→[3.1479:1548](∅→∅),[3.1548]→[2.3150:3213](∅→∅),[2.3213]→[3.1611:1672](∅→∅),[3.650]→[3.1611:1672](∅→∅),[3.1611]→[3.1611:1672](∅→∅),[3.1672]→[2.3214:3416](∅→∅),[2.3416]→[3.1874:2058](∅→∅),[3.718]→[3.1874:2058](∅→∅),[3.1874]→[3.1874:2058](∅→∅)
}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Err(e.into()),)))} else {future::err((account, Err(e.into())))} - replacement in src/xmpp/mod.rs at line 209[3.2248]→[3.2248:2270](∅→∅),[3.2270]→[2.3417:3606](∅→∅),[2.3606]→[3.2270:2318](∅→∅),[3.808]→[3.2270:2318](∅→∅),[3.2270]→[3.2270:2318](∅→∅),[3.2318]→[2.3607:3670](∅→∅),[2.3670]→[3.2381:2442](∅→∅),[3.871]→[3.2381:2442](∅→∅),[3.2381]→[3.2381:2442](∅→∅),[3.2442]→[2.3671:3873](∅→∅),[2.3873]→[3.743:783](∅→∅),[3.939]→[3.743:783](∅→∅),[3.783]→[3.2691:2756](∅→∅),[3.2691]→[3.2691:2756](∅→∅),[3.2756]→[3.784:843](∅→∅),[3.2511]→[3.5513:5539](∅→∅),[3.4667]→[3.5513:5539](∅→∅),[3.3907]→[3.5513:5539](∅→∅),[3.2511]→[3.5513:5539](∅→∅),[3.843]→[3.5513:5539](∅→∅),[3.2108]→[3.5513:5539](∅→∅),[3.6612]→[3.5513:5539](∅→∅),[3.2822]→[3.5513:5539](∅→∅),[3.1938]→[3.5513:5539](∅→∅),[3.6307]→[3.5513:5539](∅→∅),[3.1272]→[3.5513:5539](∅→∅),[3.5513]→[3.5513:5539](∅→∅),[3.5539]→[3.2823:2864](∅→∅)
}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, roster },account,},Err(e),)))} else {future::err((account, Err(e)))}}})}) - replacement in src/xmpp/mod.rs at line 215
/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {/// returns error if something went wrongfn online(&mut self, event: Event) -> Result<bool, failure::Error> { - replacement in src/xmpp/mod.rs at line 228
Err(())Err(format_err!("Disconnected while online")) - replacement in src/xmpp/mod.rs at line 233
fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> { - replacement in src/xmpp/mod.rs at line 242
error!("Get error instead of roster");Err(())Err(format_err!("Get error instead of roster")) - edit in src/xmpp/mod.rs at line 254
}Err(e) => {error!("Cann't parse roster: {}", e);Err(()) - edit in src/xmpp/mod.rs at line 255
Err(e) => Err(format_err!("Cann't parse roster: {}", e)), - edit in src/xmpp/mod.rs at line 257
}_ => {error!("Unknown result of roster");Err(()) - edit in src/xmpp/mod.rs at line 258
_ => Err(format_err!("Unknown result of roster")), - replacement in src/xmpp/mod.rs at line 264
error!("Iq stanza without id");Err(())Err(format_err!("Iq stanza without id")) - replacement in src/xmpp/mod.rs at line 270
error!("Wrong event while waiting roster");Err(())Err(format_err!("Wrong event while waiting roster")) - edit in src/xmpp/mod.rs at line 280
E: Into<failure::Error>, - replacement in src/xmpp/mod.rs at line 282
let XmppConnection {account,state: XmppState { client, roster },} = self;let XmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 289
clientstate.client - replacement in src/xmpp/mod.rs at line 294
account2(account2, Err(failure::SyncFailure::new(e).into())) - replacement in src/xmpp/mod.rs at line 298
state: XmppState { client, roster },state: XmppState { client }, - replacement in src/xmpp/mod.rs at line 302
.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn), - replacement in src/xmpp/mod.rs at line 326
let XmppConnection {account,state: XmppState { client, roster },} = self;let XmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 333
clientstate.client - replacement in src/xmpp/mod.rs at line 338
account2(account2, Err(failure::SyncFailure::new(e).into())) - replacement in src/xmpp/mod.rs at line 342
state: XmppState { client, roster },state: XmppState { client }, - replacement in src/xmpp/mod.rs at line 357
error!("Wrong event while waiting self-presence");Err(())Err(format_err!("Wrong event while waiting self-presence")) - replacement in src/xmpp/mod.rs at line 362
.map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),})}).then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn), - replacement in src/xmpp/mod.rs at line 376
struct XmppState<F, S> {struct XmppProcessState<F, S> { - replacement in src/xmpp/mod.rs at line 382
impl<F, S> XmppState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {XmppState {impl<F, S> XmppProcessState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {XmppProcessState { - replacement in src/xmpp/mod.rs at line 402
let conn = MaybeXmppConnection::new(account);let conn = account.into(); - replacement in src/xmpp/mod.rs at line 404
future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState {future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {let XmppProcessState { - replacement in src/xmpp/mod.rs at line 410[3.4692]→[3.12432:12500](∅→∅),[3.8720]→[3.4761:4955](∅→∅),[3.6088]→[3.4761:4955](∅→∅),[3.6123]→[3.4761:4955](∅→∅),[3.5985]→[3.4761:4955](∅→∅),[3.12500]→[3.4761:4955](∅→∅),[3.5884]→[3.4761:4955](∅→∅),[3.9107]→[3.4761:4955](∅→∅),[3.4761]→[3.4761:4955](∅→∅),[3.4955]→[3.595:675](∅→∅),[3.675]→[3.12501:12582](∅→∅),[3.8802]→[3.731:755](∅→∅),[3.6145]→[3.731:755](∅→∅),[3.6205]→[3.731:755](∅→∅),[3.6042]→[3.731:755](∅→∅),[3.12582]→[3.731:755](∅→∅),[3.5941]→[3.731:755](∅→∅),[3.9189]→[3.731:755](∅→∅),[3.731]→[3.731:755](∅→∅),[3.723]→[3.5120:5146](∅→∅),[3.482]→[3.5120:5146](∅→∅),[3.755]→[3.5120:5146](∅→∅),[3.5120]→[3.5120:5146](∅→∅),[3.5146]→[3.12583:12779](∅→∅),[3.12779]→[3.10536:10614](∅→∅),[3.10614]→[3.12857:13049](∅→∅),[3.6240]→[3.12857:13049](∅→∅),[3.12857]→[3.12857:13049](∅→∅),[3.13049]→[2.9617:9698](∅→∅)
conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");cmd_recv.into_future().map_err(|_| {error!("Got error on recv cmd");format_err!("Receive cmd error")}).map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))}).then(|r| {match r {Ok((cmd, cmd_recv, conn)) => {if let Some(_cmd) = cmd {info!("Got cmd");// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(cmd_recv,signal,conn.into(),)))} else {future::ok(future::Loop::Break((None, conn.into())))conn.connect(signal.clone()).and_then(|conn| {info!("xmpp connected!");conn.processing(|_, _| Ok(false), cmd_recv.into_future()).then(|r| match r {Ok((conn, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))} - replacement in src/xmpp/mod.rs at line 426[3.13156]→[3.7148:7170](∅→∅),[3.7148]→[3.7148:7170](∅→∅),[3.7170]→[3.13157:13189](∅→∅),[3.9409]→[3.6240:6342](∅→∅),[3.7219]→[3.6240:6342](∅→∅),[3.6812]→[3.6240:6342](∅→∅),[3.7116]→[3.6240:6342](∅→∅),[3.13189]→[3.6240:6342](∅→∅),[3.7036]→[3.6240:6342](∅→∅),[3.9796]→[3.6240:6342](∅→∅),[3.6240]→[3.6240:6342](∅→∅),[3.6342]→[2.9699:9738](∅→∅),[3.9472]→[3.6572:6627](∅→∅),[3.7356]→[3.6572:6627](∅→∅),[2.9738]→[3.6572:6627](∅→∅),[3.10654]→[3.6572:6627](∅→∅),[3.6875]→[3.6572:6627](∅→∅),[3.4095]→[3.6572:6627](∅→∅),[3.331]→[3.6572:6627](∅→∅),[3.3926]→[3.6572:6627](∅→∅),[3.954]→[3.6572:6627](∅→∅),[3.7253]→[3.6572:6627](∅→∅),[3.4726]→[3.6572:6627](∅→∅),[3.6303]→[3.6572:6627](∅→∅),[3.13229]→[3.6572:6627](∅→∅),[3.7173]→[3.6572:6627](∅→∅),[3.892]→[3.6572:6627](∅→∅),[3.9859]→[3.6572:6627](∅→∅),[3.6572]→[3.6572:6627](∅→∅),[3.6627]→[3.893:900](∅→∅),[3.900]→[2.9739:10245](∅→∅)
}Err(e) => {// got cmd error, its baderror!("Cmd error: {}", e);future::err(e)}}})}).and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {if let Some(cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| format_err!("cmd receiver last error")),) as Box<Future<Item = (), Error = failure::Error>>} else {Box::new(future::ok(()))}Ok(Either::B((cmd, cmd_recv))) => {if let Some(_cmd) = cmd {info!("Got command");future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::ok(future::Loop::Break(()))}}Err(_) => future::err(format_err!("Command receiver is broken")),},Err((account, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::err(format_err!("Command receiver is gone"))}}Ok(Either::B((cmd, cmd_recv))) => {if let Some(_cmd) = cmd {error!("Xmpp connection broken while get command");future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: account.into(),}))} else {future::ok(future::Loop::Break(()))}}Err(_) => future::err(format_err!("Command receiver is broken")),},})})