Add additional requirement for command stream
[?]
Dec 31, 2018, 7:15 PM
QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQCDependencies
- [2]
WJNXI6Z4Fill roster - [3]
HU3NZX5ZProcess self-presence via new processing code - [4]
O2GM5J4FDon't split xmpp receiving and sending - [5]
VS6AHRWIMove XMPP to separate dir - [6]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [7]
QYY3KRGLUse failure instead Box<dyn Error> - [8]
UWY5EVZ6Add dummy roster data - [9]
5A5UVGNMMove receiver closing logic out of xmpp processing - [10]
5OBTKGDLUpdate deps - [11]
L77O4T7MFormatting and fixes - [12]
NDDQQP2PUpdate deps - [13]
HOAZX2PBReorganize roster processing. Output roster - [14]
PFC7OJQFQuery roster - [15]
AYQZ2UIAUpdate deps - [16]
BTOZT4JPUse failure - [17]
QWE26TMVupdate deps - [18]
OGMBXBKPMove online to XmppConnection - [19]
AGIW6YR3Use shared future for signal everywhere - [20]
4LRBIGVTShow info about xmpp errors - [21]
ALP2YJIURename XmppState to XmppProcessState - [22]
OANBCLN5Move xmpp client into XmppState - [23]
XGP44R5HRework stopping xmpp connection - [24]
3GEU7TC7Welcome to 2018! - [25]
IK3YDPTYUpdate deps - [26]
TDOR5XQUAccept destination - [27]
X6L47BHQUse different structure for established xmpp connection - [28]
ZI4GJ72VAdd message to xmpp command - [29]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [30]
V5HDBSZMUse jid for receiver address - [31]
PVCRPP3BSome servers don't send to in initial presence - [*]
FVVPKFTLInitial commit
Change contents
- edit in src/xmpp/stanzas.rs at line 2
use xmpp_parsers::iq::Iq; - edit in src/xmpp/stanzas.rs at line 3
use xmpp_parsers::roster::Roster; - edit in src/xmpp/stanzas.rs at line 12
pub fn make_get_roster(id: &str) -> Element {let mut get_roster = Iq::from_get(Roster {ver: None,items: vec![],});get_roster.id = Some(id.to_string());get_roster.into()} - edit in src/xmpp/mod.rs at line 9
const ID_GET_ROSTER: &str = "id_get_roster0"; - replacement in src/xmpp/mod.rs at line 44
F: future::Future + Clone + 'static,F: future::Future + 'static, - edit in src/xmpp/mod.rs at line 56
.clone() - replacement in src/xmpp/mod.rs at line 57
future::loop_fn(account, move |account| {future::loop_fn(account, |account| { - edit in src/xmpp/mod.rs at line 65
let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone(); - replacement in src/xmpp/mod.rs at line 72
.processing(XmppConnection::online, stop_future.clone()).map(|(conn, _)| conn).map_err(|(acc, _)| acc).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)).online().and_then(XmppConnection::self_presence) - replacement in src/xmpp/mod.rs at line 101
fn xmpp_processing(&mut self, event: &Event) {info!("Incoming xmpp event: {:?}", event);}fn xmpp_processing(&mut self, _event: &Event) {} - replacement in src/xmpp/mod.rs at line 111
Item = (Self, Result<Either<F, T>, failure::Error>),Error = (std::rc::Rc<config::Account>,Result<Either<F, T>, failure::Error>,),Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>), - replacement in src/xmpp/mod.rs at line 116
E: Into<failure::Error>,S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,S: FnMut(&mut Self, &Event) -> bool, - replacement in src/xmpp/mod.rs at line 130[3.907]→[3.907:1295](∅→∅),[3.907]→[3.907:1295](∅→∅),[3.1295]→[3.3111:3145](∅→∅),[3.3111]→[3.3111:3145](∅→∅),[3.3145]→[3.1296:1375](∅→∅)
match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}Err(e) => future::err((xmpp.account, Err(e))),if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition))) - replacement in src/xmpp/mod.rs at line 154
Err(e.into()),Err(e), - replacement in src/xmpp/mod.rs at line 157
future::err((account, Err(e.into())))future::err((account, Err(e))) - replacement in src/xmpp/mod.rs at line 167[3.6737]→[3.4692:6307](∅→∅),[3.6307]→[2.940:1025](∅→∅),[2.1025]→[3.6307:6802](∅→∅),[3.6307]→[3.6307:6802](∅→∅)
fn online(&mut self, event: Event) -> Result<bool, failure::Error> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(format_err!("Disconnected while online"))}}}fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {if let Event::Stanza(s) = event {use try_from::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if let Some(id) = iq.id {if id == ID_GET_ROSTER {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {Err(format_err!("Get error instead of roster"))}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into()as Result<xmpp_parsers::roster::Roster, _>{Ok(roster) => {info!("Got roster:");for i in roster.items {info!(" >>> {:?}", i);self.state.roster.insert(i.jid, ());}Ok(true)}Err(e) => Err(format_err!("Cann't parse roster: {}", e)),}}_ => Err(format_err!("Unknown result of roster")),}} else {Ok(false)fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {Box::new(future::loop_fn((self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: client,}))}Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((client, account)))}_ => {warn!("Disconnected");future::err(account) - replacement in src/xmpp/mod.rs at line 188
} else {Err(format_err!("Iq stanza without id"))},Err((e, _)) => {error!("xmpp receive error: {}", e);future::err(account) - replacement in src/xmpp/mod.rs at line 193[3.8841]→[3.6898:7060](∅→∅),[3.8564]→[3.3783:3789](∅→∅),[3.5720]→[3.3783:3789](∅→∅),[3.7060]→[3.3783:3789](∅→∅),[3.5926]→[3.3783:3789](∅→∅),[3.3280]→[3.3783:3789](∅→∅),[3.3996]→[3.3783:3789](∅→∅),[3.5642]→[3.3783:3789](∅→∅),[3.4300]→[3.3783:3789](∅→∅),[3.4794]→[3.3783:3789](∅→∅),[3.3307]→[3.3783:3789](∅→∅),[3.10350]→[3.3783:3789](∅→∅),[3.5692]→[3.3783:3789](∅→∅),[3.8718]→[3.3783:3789](∅→∅),[3.3783]→[3.3783:3789](∅→∅),[3.3789]→[3.10351:10352](∅→∅),[3.10352]→[3.7061:7279](∅→∅),[3.7279]→[3.2865:2915](∅→∅),[3.2915]→[2.1026:1053](∅→∅),[2.1053]→[3.2964:2982](∅→∅),[3.2964]→[3.2964:2982](∅→∅),[3.2982]→[3.7333:7528](∅→∅),[3.7333]→[3.7333:7528](∅→∅),[3.7528]→[3.2983:2998](∅→∅),[3.2998]→[3.7562:7838](∅→∅),[3.7562]→[3.7562:7838](∅→∅),[3.7838]→[2.1054:1089](∅→∅),[3.3056]→[3.7887:8518](∅→∅),[2.1089]→[3.7887:8518](∅→∅),[3.7887]→[3.7887:8518](∅→∅)
}Err(_e) => Ok(false),}} else {Err(format_err!("Wrong event while waiting roster"))}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,inner: client,} = self;use tokio::prelude::Sink;let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(get_roster).map_err(move |e| {error!("Error on querying roster: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {inner: client,account,}.processing(XmppConnection::process_initial_roster, stop_future)}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn),})})},)) - replacement in src/xmpp/mod.rs at line 198[3.8525]→[3.8525:8742](∅→∅),[3.8525]→[3.8525:8742](∅→∅),[3.8525]→[3.8525:8742](∅→∅),[3.8742]→[3.3057:3107](∅→∅),[3.3107]→[2.1090:1117](∅→∅),[2.1117]→[3.3156:3174](∅→∅),[3.3156]→[3.3156:3174](∅→∅)
fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,inner: client,} = self;fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {let XmppConnection { account, inner } = self;let client = inner; - edit in src/xmpp/mod.rs at line 204
info!("Sending presence..."); - replacement in src/xmpp/mod.rs at line 206[3.10756]→[3.8797:8851](∅→∅),[3.8851]→[3.3175:3190](∅→∅),[3.3190]→[3.8885:9157](∅→∅),[3.8885]→[3.8885:9157](∅→∅),[3.9157]→[2.1118:1153](∅→∅),[3.3248]→[3.9206:10285](∅→∅),[2.1153]→[3.9206:10285](∅→∅),[3.9206]→[3.9206:10285](∅→∅)
info!("Sending presence... {:?}", presence);client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {inner: client,account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {if s.name() == "presence"&& s.attr("from").map_or(false, |f| f == conn.account.jid)&& s.attr("to").map_or(false, |f| f == conn.account.jid){Ok(true)} else {Ok(false)}} else {Err(format_err!("Wrong event while waiting self-presence"))}},stop_future,)}).then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn),})Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))} else {future::ok(future::Loop::Continue((account, client)))}} else {future::err("Got wrong event".to_owned())}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),) - replacement in src/xmpp/mod.rs at line 281[3.8651]→[3.465:518](∅→∅),[3.5896]→[3.465:518](∅→∅),[3.6054]→[3.465:518](∅→∅),[3.5793]→[3.465:518](∅→∅),[3.4398]→[3.465:518](∅→∅),[3.12380]→[3.465:518](∅→∅),[3.5737]→[3.465:518](∅→∅),[3.8987]→[3.465:518](∅→∅),[3.4355]→[3.465:518](∅→∅)
S: stream::Stream<Item = XmppCommand> + 'static,S: stream::Stream<Item = XmppCommand, Error = ()> + 'static, - replacement in src/xmpp/mod.rs at line 320
future::err(e)future::err(format_err!("Cmd error")) - edit in Cargo.toml at line 23
try_from = "=0.2.2" # dependency of xmpp-parsers - edit in Cargo.lock at line 1110
"try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",