Second part of processing result is only about stop_future
[?]
Dec 31, 2018, 8:21 PM
BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQCDependencies
- [2]
CP4MZO6VLeftover commands are processed via stoppable receiver - [3]
V5HDBSZMUse jid for receiver address - [4]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [5]
AGIW6YR3Use shared future for signal everywhere - [6]
X6L47BHQUse different structure for established xmpp connection - [7]
HU3NZX5ZProcess self-presence via new processing code - [8]
PFC7OJQFQuery roster - [9]
ALP2YJIURename XmppState to XmppProcessState - [10]
VS6AHRWIMove XMPP to separate dir - [11]
QTCUURXNAdd additional requirement for command stream - [12]
BTOZT4JPUse failure - [13]
O2GM5J4FDon't split xmpp receiving and sending - [14]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [15]
QWE26TMVupdate deps - [16]
TDOR5XQUAccept destination - [17]
OANBCLN5Move xmpp client into XmppState - [18]
UWY5EVZ6Add dummy roster data - [19]
NDDQQP2PUpdate deps - [20]
AYQZ2UIAUpdate deps - [21]
XGP44R5HRework stopping xmpp connection - [22]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [23]
4LRBIGVTShow info about xmpp errors - [24]
IK3YDPTYUpdate deps - [25]
WJNXI6Z4Fill roster - [26]
PVCRPP3BSome servers don't send to in initial presence - [27]
OGMBXBKPMove online to XmppConnection - [*]
FVVPKFTLInitial commit
Change contents
- edit in src/xmpp/stanzas.rs at line 2
use xmpp_parsers::iq::Iq; - edit in src/xmpp/stanzas.rs at line 4
use xmpp_parsers::roster::Roster; - edit in src/xmpp/stanzas.rs at line 14[3.419]
pub fn make_get_roster(id: &str) -> Element {let mut get_roster = Iq::from_get(Roster {ver: None,items: vec![],});get_roster.id = Some(id.to_string());get_roster.into()} - edit in src/xmpp/mod.rs at line 7
use std::collections::HashMap; - edit in src/xmpp/mod.rs at line 11
const ID_GET_ROSTER: &str = "id_get_roster0"; - replacement in src/xmpp/mod.rs at line 14
pub struct MaybeXmppConnection {struct XmppState {client: Client,roster: HashMap<jid::Jid, ()>,}struct MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 21
inner: Option<Client>,state: Option<XmppState>, - replacement in src/xmpp/mod.rs at line 24
pub struct XmppConnection {struct XmppConnection { - replacement in src/xmpp/mod.rs at line 26
inner: Client,state: XmppState, - replacement in src/xmpp/mod.rs at line 33
inner: Some(from.inner),state: Some(from.state), - replacement in src/xmpp/mod.rs at line 42
inner: None,state: None, - replacement in src/xmpp/mod.rs at line 53
F: future::Future + 'static,F: future::Future + Clone + 'static, - replacement in src/xmpp/mod.rs at line 57
let MaybeXmppConnection { account, inner } = self;let MaybeXmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 59
if let Some(inner) = inner {Box::new(future::ok(XmppConnection { account, inner }))if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state })) - edit in src/xmpp/mod.rs at line 65
.clone() - replacement in src/xmpp/mod.rs at line 67
future::loop_fn(account, |account| {future::loop_fn(account, move |account| { - edit in src/xmpp/mod.rs at line 76
let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone(); - replacement in src/xmpp/mod.rs at line 82
inner: client,state: XmppState {client,roster: HashMap::new(),}, - replacement in src/xmpp/mod.rs at line 88
.online().and_then(XmppConnection::self_presence).processing(XmppConnection::online, stop_future.clone()).map_err(|(acc, _)| acc).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account),}).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)) - replacement in src/xmpp/mod.rs at line 124
fn xmpp_processing(&mut self, _event: &Event) {}fn xmpp_processing(&mut self, event: &Event) {info!("Incoming xmpp event: {:?}", event);} - replacement in src/xmpp/mod.rs at line 130[3.3983]→[3.3983:4019](∅→∅),[3.3983]→[3.3983:4019](∅→∅),[3.3983]→[3.3983:4019](∅→∅),[3.3983]→[3.3983:4019](∅→∅)
/// or stop future was resolved/// or stop future was resolved./// Return item if connection was preserved or error otherwise./// Second part is a state of stop_future - replacement in src/xmpp/mod.rs at line 143
S: FnMut(&mut Self, &Event) -> bool,S: FnMut(&mut Self, Event) -> Result<bool, ()>, - replacement in src/xmpp/mod.rs at line 148
let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {let XmppConnection {state: XmppState { client, roster },account,} = xmpp;client.into_future().select2(stop_future).then(|r| match r { - replacement in src/xmpp/mod.rs at line 156
inner: client,state: XmppState { client, roster }, - replacement in src/xmpp/mod.rs at line 160
if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))} else {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => {future::ok(future::Loop::Continue((xmpp, b, stop_condition)))}Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))), - replacement in src/xmpp/mod.rs at line 174
if let Some(inner) = a.into_inner() {if let Some(client) = a.into_inner() { - replacement in src/xmpp/mod.rs at line 176
XmppConnection { inner, account },XmppConnection {state: XmppState { client, roster },account,}, - replacement in src/xmpp/mod.rs at line 186
Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b))))} - replacement in src/xmpp/mod.rs at line 191
if let Some(inner) = a.into_inner() {if let Some(client) = a.into_inner() { - replacement in src/xmpp/mod.rs at line 193
XmppConnection { inner, account },XmppConnection {state: XmppState { client, roster },account,}, - replacement in src/xmpp/mod.rs at line 209
/// returns error if something went wrongfn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {Box::new(future::loop_fn((self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: client,}))/// returns error if something went wrong and xmpp connection is brokenfn online(&mut self, event: Event) -> Result<bool, ()> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(())}}}fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {if let Event::Stanza(s) = event {use try_from::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if let Some(id) = iq.id {if id == ID_GET_ROSTER {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {error!("Get error instead of roster");Err(())}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into()as Result<xmpp_parsers::roster::Roster, _>{Ok(roster) => {info!("Got roster:");for i in roster.items {info!(" >>> {:?}", i);}Ok(true)}Err(e) => {error!("Cann't parse roster: {}", e);Err(())}}}_ => {error!("Unknown result of roster");Err(())}}} else {Ok(false) - replacement in src/xmpp/mod.rs at line 264[3.1479]→[3.1479:1829](∅→∅),[3.5337]→[3.5558:5584](∅→∅),[3.6802]→[3.5558:5584](∅→∅),[3.1829]→[3.5558:5584](∅→∅),[3.3094]→[3.5558:5584](∅→∅),[3.8724]→[3.5558:5584](∅→∅),[3.5558]→[3.5558:5584](∅→∅),[3.5584]→[3.1830:1996](∅→∅)
Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((client, account)))}_ => {warn!("Disconnected");future::err(account)}},Err((e, _)) => {error!("xmpp receive error: {}", e);future::err(account)} else {error!("Iq stanza without id");Err(()) - edit in src/xmpp/mod.rs at line 268
}Err(_e) => Ok(false),}} else {error!("Wrong event while waiting roster");Err(())}}fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,{let XmppConnection {account,state: XmppState { client, roster },} = self;use tokio::prelude::Sink;let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);client.send(get_roster).map_err(move |e| {error!("Error on querying roster: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, roster },account,}.processing(XmppConnection::process_initial_roster, stop_future).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account), - replacement in src/xmpp/mod.rs at line 312
},))}) - replacement in src/xmpp/mod.rs at line 315
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {let XmppConnection { account, inner } = self;let client = inner;fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection {account,state: XmppState { client, roster },} = self; - edit in src/xmpp/mod.rs at line 330
info!("Sending presence..."); - replacement in src/xmpp/mod.rs at line 331
Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()info!("Sending presence... {:?}", presence);client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);account2}).and_then(move |client| {XmppConnection {state: XmppState { client, roster },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {if s.name() == "presence"&& s.attr("from").map_or(false, |f| f == conn.account.jid)&& s.attr("to").map_or(false, |f| f == conn.account.jid){Ok(true)} else {Ok(false)}} else {error!("Wrong event while waiting self-presence");Err(())}},stop_future,).map_err(|(account, _)| account).and_then(|(conn, r)| match r {Ok(Either::A(_)) => future::ok(conn),Ok(Either::B(_)) => future::err(conn.account),Err(_e) => future::err(conn.account), - replacement in src/xmpp/mod.rs at line 368
.and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))} else {future::ok(future::Loop::Continue((account, client)))}} else {future::err("Got wrong event".to_owned())}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)}) - replacement in src/xmpp/mod.rs at line 432
future::ok(future::Loop::Break(()))future::ok(future::Loop::Break((None, conn.into()))) - replacement in src/xmpp/mod.rs at line 438
future::err(format_err!("Cmd error"))future::err(e) - edit in src/xmpp/mod.rs at line 443
.and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {if let Some(cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| format_err!("cmd receiver last error")),) as Box<Future<Item = (), Error = failure::Error>>} else {Box::new(future::ok(()))}}) - edit in Cargo.toml at line 23
try_from = "=0.2.2" # dependency of xmpp-parsers - edit in Cargo.lock at line 1110
"try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",