Send stanzas via send queue
[?]
Jan 2, 2019, 9:18 AM
DKXSFTDY6FVCBUAFNKOFBKKIKWQI7L7WSD3MRVEW7WFRT3R5VIFQCDependencies
- [2]
SA2IOFGYAdd items to roster - [3]
VS6AHRWIMove XMPP to separate dir - [4]
UMTLHH77Process commands in the separate function - [5]
FWJDW3G5Allow process xmpp incoming stanzas with futures - [6]
QWE26TMVupdate deps - [7]
EBETRYK7Add counter for id. Check for jid in roster - [8]
CBWCXUZZPrepare adding new items to roster - [9]
XGP44R5HRework stopping xmpp connection - [10]
UAT5MV5ODirectly use id for initial roster request - [11]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [12]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [13]
3FYEOGCIMove additional rarely changed data to separate structure - [14]
BWDUANCVSecond part of processing result is only about stop_future - [15]
SU4DNVCBStart to processing roster data
Change contents
- replacement in src/xmpp/mod.rs at line 8
use std::collections::HashMap;use std::collections::{HashMap, VecDeque}; - edit in src/xmpp/mod.rs at line 21
/// stanzas to sendsend_queue: VecDeque<minidom::Element>, - replacement in src/xmpp/mod.rs at line 216
state: XmppState { client, data },state: XmppState { client, mut data }, - replacement in src/xmpp/mod.rs at line 219
client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let xmpp = XmppConnection {state: XmppState { client, data },account,};Box::new(xmpp.xmpp_processing(&event).then(|r| match r {Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(send_element).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account, - replacement in src/xmpp/mod.rs at line 232
Err(account) => future::err((account, Ok(Either::A(b)))),}))b,stop_condition,)))) - replacement in src/xmpp/mod.rs at line 236
} else {}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e); - replacement in src/xmpp/mod.rs at line 254[3.2970]→[3.2970:3319](∅→∅),[3.2043]→[3.3609:3644](∅→∅),[3.3319]→[3.3609:3644](∅→∅),[3.3609]→[3.3609:3644](∅→∅),[3.3644]→[3.3320:3402](∅→∅),[3.2206]→[3.3726:3759](∅→∅),[3.3402]→[3.3726:3759](∅→∅),[3.3726]→[3.3726:3759](∅→∅),[3.3759]→[3.3403:3610](∅→∅),[3.3610]→[3.3857:3936](∅→∅),[3.2773]→[3.3857:3936](∅→∅),[3.3936]→[3.2842:2868](∅→∅),[3.2842]→[3.2842:2868](∅→∅),[3.2868]→[3.3611:4185](∅→∅)
}Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}),Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}),})Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),},)) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let xmpp = XmppConnection {state: XmppState { client, data },account,};Box::new(xmpp.xmpp_processing(&event).then(|r| match r {Ok(mut xmpp) => {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => {future::ok(future::Loop::Continue((xmpp,b,stop_condition,)))}Err(_e) => future::err((xmpp.account,Ok(Either::A(b)),)),}}Err(account) => {future::err((account, Ok(Either::A(b))))}}))as Box<dyn Future<Item = _, Error = _>>} else {Box::new(future::err((account, Ok(Either::A(b)))))}}Ok(Either::B((t, a))) => {Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))})}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => {Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))})}}),)} - replacement in src/xmpp/mod.rs at line 516[3.4553]→[3.4553:4591](∅→∅),[3.4591]→[2.2940:2966](∅→∅),[2.2966]→[3.4618:4692](∅→∅),[3.4618]→[3.4618:4692](∅→∅)
fn process_command(self,cmd: XmppCommand,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {fn process_command(&mut self, cmd: XmppCommand) { - edit in src/xmpp/mod.rs at line 520
Box::new(future::ok(self)) as Box<dyn Future<Item = _, Error = _>> - edit in src/xmpp/mod.rs at line 522
let XmppConnection {account,state: XmppState { client, mut data },} = self; - replacement in src/xmpp/mod.rs at line 523
data.counter += 1;let id_add_roster = format!("id_add_roster{}", data.counter);self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter); - replacement in src/xmpp/mod.rs at line 526
let account2 = account.clone();self.state.data.pending_add_roster_ids.insert(id_add_roster, (cmd.xmpp_to, cmd.message)); - edit in src/xmpp/mod.rs at line 531
use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 532
Box::new(client.send(add_roster).map_err(|e| {error!("Error on send adding to roster: {}", e);account2}).and_then(move |client| {data.pending_add_roster_ids.insert(id_add_roster, (cmd.xmpp_to, cmd.message));future::ok(XmppConnection {account,state: XmppState { client, data },})}),)self.state.data.send_queue.push_back(add_roster); - replacement in src/xmpp/mod.rs at line 578[3.5994]→[3.5994:6216](∅→∅),[3.5015]→[3.7143:7225](∅→∅),[3.6612]→[3.7143:7225](∅→∅),[3.6216]→[3.7143:7225](∅→∅),[3.7143]→[3.7143:7225](∅→∅),[3.7225]→[3.6217:6300](∅→∅),[3.5093]→[3.7302:7335](∅→∅),[3.6696]→[3.7302:7335](∅→∅),[3.6300]→[3.7302:7335](∅→∅),[3.7302]→[3.7302:7335](∅→∅),[3.7335]→[3.6301:6478](∅→∅)
Ok((conn, r)) => match r {Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))})as Box<dyn Future<Item = _, Error = _>>,Ok((mut conn, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))}} - replacement in src/xmpp/mod.rs at line 592
Box::new(conn.process_command(cmd).then(|r| {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: match r {Ok(conn) => conn.into(),Err(account) => account.into(),},}))conn.process_command(cmd);future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(), - edit in src/xmpp/mod.rs at line 599
as Box<dyn Future<Item = _, Error = _>> - replacement in src/xmpp/mod.rs at line 600
Box::new(future::ok(future::Loop::Break(())))future::ok(future::Loop::Break(())) - replacement in src/xmpp/mod.rs at line 603
Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),Err(_) => future::err(format_err!("Command receiver is broken")), - replacement in src/xmpp/mod.rs at line 605
Err((account, r)) => Box::new(match r {Err((account, r)) => match r { - replacement in src/xmpp/mod.rs at line 630
}),},