Sending messages works!
[?]
Jan 2, 2019, 7:50 PM
77USPY5IJSK7YN355CRGZQ2ZIG5WHRCD7JWKE3BVVBYIU6QLB5ZACDependencies
- [2]
XOAM22TTSimplify xmpp incoming stanzas processing without futures - [3]
BWDUANCVSecond part of processing result is only about stop_future - [4]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [5]
CBWCXUZZPrepare adding new items to roster - [6]
OANBCLN5Move xmpp client into XmppState - [7]
XGP44R5HRework stopping xmpp connection - [8]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [9]
VS6AHRWIMove XMPP to separate dir - [10]
QWE26TMVupdate deps - [11]
SA2IOFGYAdd items to roster - [12]
DKXSFTDYSend stanzas via send queue - [13]
EBETRYK7Add counter for id. Check for jid in roster - [14]
UMTLHH77Process commands in the separate function - [15]
QTCUURXNAdd additional requirement for command stream - [16]
SU4DNVCBStart to processing roster data - [17]
FWJDW3G5Allow process xmpp incoming stanzas with futures - [18]
ALP2YJIURename XmppState to XmppProcessState - [19]
UAT5MV5ODirectly use id for initial roster request - [20]
3FYEOGCIMove additional rarely changed data to separate structure
Change contents
- edit in src/xmpp/stanzas.rs at line 3
use xmpp_parsers::message::{Body, Message, MessageType}; - edit in src/xmpp/stanzas.rs at line 37
}pub fn make_ask_subscribe(jid: jid::Jid) -> Element {let mut presence = Presence::new(PresenceType::Subscribe);presence.to = Some(jid);presence.into() - edit in src/xmpp/stanzas.rs at line 44[3.272]
pub fn make_chat_message(jid: jid::Jid, text: String) -> Element {let mut message = Message::new(Some(jid));message.bodies.insert(String::new(), Body(text));message.type_ = MessageType::Chat;message.into()} - replacement in src/xmpp/mod.rs at line 8
use std::collections::{HashMap, VecDeque};use std::collections::HashMap; - replacement in src/xmpp/mod.rs at line 15
roster: HashMap<jid::Jid, ()>,roster: HashMap<jid::Jid, (xmpp_parsers::roster::Subscription, Vec<String>)>, - edit in src/xmpp/mod.rs at line 21
/// stanzas to sendsend_queue: VecDeque<minidom::Element>, - replacement in src/xmpp/mod.rs at line 147
/// Returns false on errorfn xmpp_processing(&mut self, event: &Event) -> bool {fn xmpp_processing(mut self,event: &Event,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> { - replacement in src/xmpp/mod.rs at line 159
if let Some((_, (jid, _message))) =if let Some((_, (jid, message))) = - edit in src/xmpp/mod.rs at line 164
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&jid) {rdata.1.push(message);} else {self.state.data.roster.insert(jid,(xmpp_parsers::roster::Subscription::None, vec![message]),);}} else {warn!("Wrong payload when adding {} to roster: {:?}",jid, iq.payload); - replacement in src/xmpp/mod.rs at line 184
self.state.data.roster.extend(roster.items.into_iter().map(|i| {for i in roster.items {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid){info!("Update {} in roster", i.jid);rdata.0 = i.subscription;if !rdata.1.is_empty() {let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", i.jid);let jid = i.jid.clone();self.state.data.send_queue.extend(rdata.1.drain(..).map(|message| {stanzas::make_chat_message(jid.clone(), message)}),)} else {info!("Not subscribed to {}", i.jid);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(i.jid));}}} else { - replacement in src/xmpp/mod.rs at line 213
(i.jid, ())}));self.state.data.roster.insert(i.jid, (i.subscription, vec![]));}} - replacement in src/xmpp/mod.rs at line 222
truefuture::ok(self) - replacement in src/xmpp/mod.rs at line 224
Event::Online => true,Event::Online => future::ok(self), - replacement in src/xmpp/mod.rs at line 227
falsefuture::err(self.account) - replacement in src/xmpp/mod.rs at line 255
state: XmppState { client, mut data },state: XmppState { client, data }, - replacement in src/xmpp/mod.rs at line 258
if let Some(send_element) = data.send_queue.pop_front() {use tokio::prelude::Sink;info!("Sending {:?}", send_element);Box::new(client.send(send_element).select2(stop_future).then(move |r| match r {Ok(Either::A((client, b))) => {Box::new(future::ok(future::Loop::Continue((XmppConnection {state: XmppState { client, data },account,client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let xmpp = XmppConnection {state: XmppState { client, data },account,};Box::new(xmpp.xmpp_processing(&event).then(|r| match r {Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))), - replacement in src/xmpp/mod.rs at line 281
b,stop_condition,))))Err(account) => future::err((account, Ok(Either::A(b)))),})) - replacement in src/xmpp/mod.rs at line 284
}Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Ok(Either::B(t))))}})),Err(Either::A((e, b))) => {warn!("XMPP sending error: {}", e);} else { - replacement in src/xmpp/mod.rs at line 287[3.2970]→[3.1807:2979](∅→∅),[3.2979]→[2.172:244](∅→∅),[2.244]→[3.3047:3222](∅→∅),[3.3047]→[3.3047:3222](∅→∅),[3.3222]→[2.245:1169](∅→∅),[2.1169]→[3.4515:4611](∅→∅),[3.4515]→[3.4515:4611](∅→∅),[3.4611]→[2.1170:1351](∅→∅),[2.1351]→[3.4936:4981](∅→∅),[3.4936]→[3.4936:4981](∅→∅),[3.4981]→[2.1352:1433](∅→∅),[2.1433]→[3.5072:5203](∅→∅),[3.5072]→[3.5072:5203](∅→∅),[3.5203]→[2.1434:1509](∅→∅),[2.1509]→[3.5287:5840](∅→∅),[3.5287]→[3.5287:5840](∅→∅),[3.5840]→[2.1510:1548](∅→∅),[2.1548]→[3.5879:6039](∅→∅),[3.5879]→[3.5879:6039](∅→∅),[3.6039]→[2.1549:1626](∅→∅),[2.1626]→[3.6126:6220](∅→∅),[3.6126]→[3.6126:6220](∅→∅),[3.6220]→[2.1627:1702](∅→∅),[2.1702]→[3.6304:6837](∅→∅),[3.6304]→[3.6304:6837](∅→∅),[3.6837]→[2.1703:1741](∅→∅),[2.1741]→[3.6876:6982](∅→∅),[3.6876]→[3.6876:6982](∅→∅)
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {Ok(client) => future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),))),Err(se) => {warn!("XMPP sending error: {}", se);future::err((account, Err(e)))}})),},)) as Box<dyn Future<Item = _, Error = _>>} else {Box::new(client.into_future().select2(stop_future).then(move |r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client, data },account,};if xmpp.xmpp_processing(&event) {match stop_condition(&mut xmpp, event) {Ok(true) => future::ok(future::Loop::Break((xmpp,Ok(Either::A(b)),))),Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(_e) => {future::err((xmpp.account, Ok(Either::A(b))))}}} else {future::err((xmpp.account, Ok(Either::A(b))))}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}}Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))}Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}}}),)}}Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))}),Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);Box::new(future::err((account, Ok(Either::A(b)))))}Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client, data },account,},Err(e),)))} else {future::err((account, Err(e)))}),}) - replacement in src/xmpp/mod.rs at line 359
self.state.data.roster.insert(i.jid, ());self.state.data.roster.insert(i.jid, (i.subscription, vec![])); - replacement in src/xmpp/mod.rs at line 491
fn process_command(&mut self, cmd: XmppCommand) {fn process_command(self,cmd: XmppCommand,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> { - replacement in src/xmpp/mod.rs at line 496
if self.state.data.roster.get(&cmd.xmpp_to).is_some() {if let Some(ref mut rdata) = self.state.data.roster.get_mut(&cmd.xmpp_to) { - edit in src/xmpp/mod.rs at line 498
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Box::new(future::ok(self)) as Box<dyn Future<Item = _, Error = _>>================================let sub_to = match rdata.0 {xmpp_parsers::roster::Subscription::To => true,xmpp_parsers::roster::Subscription::Both => true,_ => false,};if sub_to {info!("Subscribed to {}", cmd.xmpp_to);self.state.data.send_queue.push_back(stanzas::make_chat_message(cmd.xmpp_to, cmd.message));} else {info!("Not subscribed to {}", cmd.xmpp_to);rdata.1.push(cmd.message);self.state.data.send_queue.push_back(stanzas::make_ask_subscribe(cmd.xmpp_to));}<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< - edit in src/xmpp/mod.rs at line 523
let XmppConnection {account,state: XmppState { client, mut data },} = self; - replacement in src/xmpp/mod.rs at line 528
self.state.data.counter += 1;let id_add_roster = format!("id_add_roster{}", self.state.data.counter);data.counter += 1;let id_add_roster = format!("id_add_roster{}", data.counter); - replacement in src/xmpp/mod.rs at line 531
self.state.data.pending_add_roster_ids.insert(id_add_roster, (cmd.xmpp_to, cmd.message));let account2 = account.clone(); - edit in src/xmpp/mod.rs at line 533
use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 535
self.state.data.send_queue.push_back(add_roster);Box::new(client.send(add_roster).map_err(|e| {error!("Error on send adding to roster: {}", e);account2}).and_then(move |client| {data.pending_add_roster_ids.insert(id_add_roster, (cmd.xmpp_to, cmd.message));future::ok(XmppConnection {account,state: XmppState { client, data },})}),) - replacement in src/xmpp/mod.rs at line 596
Ok((mut conn, r)) => match r {Ok(Either::A(f)) => {if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))}}Ok((conn, r)) => match r {Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),}))} else {future::err(format_err!("Command receiver is gone"))})as Box<dyn Future<Item = _, Error = _>>, - replacement in src/xmpp/mod.rs at line 609
conn.process_command(cmd);future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: conn.into(),Box::new(conn.process_command(cmd).then(|r| {future::ok(future::Loop::Continue(XmppProcessState {cmd_recv,signal,conn: match r {Ok(conn) => conn.into(),Err(account) => account.into(),},})) - edit in src/xmpp/mod.rs at line 619
as Box<dyn Future<Item = _, Error = _>> - replacement in src/xmpp/mod.rs at line 621
future::ok(future::Loop::Break(()))Box::new(future::ok(future::Loop::Break(()))) - replacement in src/xmpp/mod.rs at line 624
Err(_) => future::err(format_err!("Command receiver is broken")),Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))), - replacement in src/xmpp/mod.rs at line 626
Err((account, r)) => match r {Err((account, r)) => Box::new(match r { - replacement in src/xmpp/mod.rs at line 651
},}),