Move xmpp client into XmppState
[?]
Dec 31, 2018, 6:15 PM
OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIACDependencies
- [2]
ALP2YJIURename XmppState to XmppProcessState - [3]
EOHEZXX3Move request processing to structure - [4]
TDOR5XQUAccept destination - [5]
5IKA4GO7Rename xmpp client field from "inner" to "client" - [6]
HOAZX2PBReorganize roster processing. Output roster - [7]
QWE26TMVupdate deps - [8]
5OBTKGDLUpdate deps - [9]
O2GM5J4FDon't split xmpp receiving and sending - [10]
AGIW6YR3Use shared future for signal everywhere - [11]
PFC7OJQFQuery roster - [12]
VS6AHRWIMove XMPP to separate dir - [13]
AYQZ2UIAUpdate deps - [14]
QYY3KRGLUse failure instead Box<dyn Error> - [15]
HU3NZX5ZProcess self-presence via new processing code - [16]
XGP44R5HRework stopping xmpp connection - [17]
H7R7Y3FQUse new processing code to wait online - [18]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [19]
5A5UVGNMMove receiver closing logic out of xmpp processing - [20]
V5HDBSZMUse jid for receiver address - [21]
IK3YDPTYUpdate deps - [22]
4LRBIGVTShow info about xmpp errors - [23]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [24]
NDDQQP2PUpdate deps - [25]
OGMBXBKPMove online to XmppConnection - [26]
X6L47BHQUse different structure for established xmpp connection - [27]
ZI4GJ72VAdd message to xmpp command - [28]
PVCRPP3BSome servers don't send to in initial presence - [29]
BTOZT4JPUse failure - [*]
FVVPKFTLInitial commit
Change contents
- edit in src/xmpp/stanzas.rs at line 2
use xmpp_parsers::iq::Iq; - edit in src/xmpp/stanzas.rs at line 4
use xmpp_parsers::roster::Roster; - edit in src/xmpp/stanzas.rs at line 14[3.419]
pub fn make_get_roster(id: &str) -> Element {let mut get_roster = Iq::from_get(Roster {ver: None,items: vec![],});get_roster.id = Some(id.to_string());get_roster.into()} - edit in src/xmpp/mod.rs at line 9
const ID_GET_ROSTER: &str = "id_get_roster0"; - replacement in src/xmpp/mod.rs at line 12
pub struct MaybeXmppConnection {struct XmppState {client: Client,}struct MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 18
inner: Option<Client>,state: Option<XmppState>, - replacement in src/xmpp/mod.rs at line 21
pub struct XmppConnection {struct XmppConnection { - replacement in src/xmpp/mod.rs at line 23
inner: Client,state: XmppState, - replacement in src/xmpp/mod.rs at line 30
inner: Some(from.inner),state: Some(from.state), - replacement in src/xmpp/mod.rs at line 39
inner: None,state: None, - replacement in src/xmpp/mod.rs at line 50
F: future::Future + 'static,F: future::Future + Clone + 'static, - replacement in src/xmpp/mod.rs at line 54
let MaybeXmppConnection { account, inner } = self;let MaybeXmppConnection { account, state } = self; - replacement in src/xmpp/mod.rs at line 56
if let Some(inner) = inner {Box::new(future::ok(XmppConnection { account, inner }))if let Some(state) = state {Box::new(future::ok(XmppConnection { account, state })) - edit in src/xmpp/mod.rs at line 62
.clone() - replacement in src/xmpp/mod.rs at line 64
future::loop_fn(account, |account| {future::loop_fn(account, move |account| { - edit in src/xmpp/mod.rs at line 72
let stop_future2 = stop_future.clone();let stop_future3 = stop_future.clone(); - replacement in src/xmpp/mod.rs at line 79
inner: client,state: XmppState { client }, - replacement in src/xmpp/mod.rs at line 82
.online().and_then(XmppConnection::self_presence).processing(XmppConnection::online, stop_future.clone()).map(|(conn, _)| conn).map_err(|(acc, _)| acc).and_then(|conn| conn.initial_roster(stop_future2)).and_then(|conn| conn.self_presence(stop_future3)) - replacement in src/xmpp/mod.rs at line 114
fn xmpp_processing(&mut self, _event: &Event) {}fn xmpp_processing(&mut self, event: &Event) {info!("Incoming xmpp event: {:?}", event);} - replacement in src/xmpp/mod.rs at line 126
Item = (Self, Result<Either<F, T>, E>),Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),Item = (Self, Result<Either<F, T>, failure::Error>),Error = (std::rc::Rc<config::Account>,Result<Either<F, T>, failure::Error>,), - replacement in src/xmpp/mod.rs at line 134
S: FnMut(&mut Self, &Event) -> bool,E: Into<failure::Error>,S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>, - replacement in src/xmpp/mod.rs at line 140[3.4612]→[2.1139:1277](∅→∅),[2.1277]→[3.4752:4866](∅→∅),[3.4752]→[3.4752:4866](∅→∅),[3.4866]→[2.1278:1457](∅→∅),[2.1457]→[3.4945:5003](∅→∅),[3.4945]→[3.4945:5003](∅→∅),[3.5003]→[2.1458:1615](∅→∅)
let XmppConnection { inner, account } = xmpp;inner.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {inner: client,account,};xmpp.xmpp_processing(&event);if stop_condition(&mut xmpp, &event) {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))let XmppConnection { state, account } = xmpp;state.client.into_future().select2(stop_future).then(|r| match r {Ok(Either::A(((event, client), b))) => {if let Some(event) = event {let mut xmpp = XmppConnection {state: XmppState { client },account,};xmpp.xmpp_processing(&event);match stop_condition(&mut xmpp, event) {Ok(true) => {future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))}Ok(false) => future::ok(future::Loop::Continue((xmpp,b,stop_condition,))),Err(e) => future::err((xmpp.account, Err(e))),}} else {future::err((account, Ok(Either::A(b))))}}Ok(Either::B((t, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Ok(Either::B(t)),))) - replacement in src/xmpp/mod.rs at line 178
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))future::err((account, Ok(Either::B(t)))) - edit in src/xmpp/mod.rs at line 180
} else {future::err((account, Ok(Either::A(b)))) - replacement in src/xmpp/mod.rs at line 181[3.4531]→[3.5637:5706](∅→∅),[3.5706]→[2.1747:1809](∅→∅),[2.1809]→[3.5769:5830](∅→∅),[3.5769]→[3.5769:5830](∅→∅),[3.5830]→[2.1810:1877](∅→∅),[2.1877]→[3.5898:6082](∅→∅),[3.5898]→[3.5898:6082](∅→∅)
}Ok(Either::B((t, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Ok(Either::B(t)),)))} else {future::err((account, Ok(Either::B(t))))Err(Either::A((e, b))) => {warn!("XMPP error: {}", e.0);future::err((account, Ok(Either::A(b)))) - replacement in src/xmpp/mod.rs at line 185[3.5000]→[3.6083:6242](∅→∅),[3.6242]→[2.1878:1940](∅→∅),[2.1940]→[3.6305:6366](∅→∅),[3.6305]→[3.6305:6366](∅→∅),[3.6366]→[2.1941:2048](∅→∅),[2.2048]→[3.6481:6546](∅→∅),[3.6481]→[3.6481:6546](∅→∅),[3.6546]→[2.2049:2108](∅→∅)
}Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),Err(Either::B((e, a))) => {if let Some(inner) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection { inner, account },Err(e),)))} else {future::err((account, Err(e)))Err(Either::B((e, a))) => {if let Some(client) = a.into_inner() {future::ok(future::Loop::Break((XmppConnection {state: XmppState { client },account,},Err(e.into()),)))} else {future::err((account, Err(e.into())))} - replacement in src/xmpp/mod.rs at line 198
}})}) - replacement in src/xmpp/mod.rs at line 205
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {Box::new(future::loop_fn((self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: client,}))}Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((client, account)))}_ => {warn!("Disconnected");future::err(account)fn online(&mut self, event: Event) -> Result<bool, failure::Error> {match event {Event::Online => {info!("Online!");Ok(true)}Event::Stanza(s) => {warn!("Stanza before online: {:?}", s);Ok(false)}_ => {error!("Disconnected while online");Err(format_err!("Disconnected while online"))}}}fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {if let Event::Stanza(s) = event {use try_from::TryInto;match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {Ok(iq) => {if let Some(id) = iq.id {if id == ID_GET_ROSTER {match iq.payload {xmpp_parsers::iq::IqType::Error(_e) => {Err(format_err!("Get error instead of roster"))}xmpp_parsers::iq::IqType::Result(Some(result)) => {match result.try_into()as Result<xmpp_parsers::roster::Roster, _>{Ok(roster) => {info!("Got roster:");for i in roster.items {info!(" >>> {:?}", i);}Ok(true)}Err(e) => Err(format_err!("Cann't parse roster: {}", e)),}}_ => Err(format_err!("Unknown result of roster")),}} else {Ok(false) - replacement in src/xmpp/mod.rs at line 252
},Err((e, _)) => {error!("xmpp receive error: {}", e);future::err(account)} else {Err(format_err!("Iq stanza without id")) - replacement in src/xmpp/mod.rs at line 255
})},))}Err(_e) => Ok(false),}} else {Err(format_err!("Wrong event while waiting roster"))} - replacement in src/xmpp/mod.rs at line 263
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {let XmppConnection { account, inner } = self;let client = inner;fn initial_roster<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection { account, state } = self;use tokio::prelude::Sink;let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);let account2 = account.clone();info!("Quering roster... {:?}", get_roster);state.client.send(get_roster).map_err(move |e| {error!("Error on querying roster: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {state: XmppState { client },account,}.processing(XmppConnection::process_initial_roster, stop_future)}).then(|r| match r {Err((account, e)) => {error!("Cann't wait roster: {}",e.err().map_or_else(|| std::borrow::Cow::Borrowed("None"),|e| e.to_string().into()));future::err(account)}Ok((conn, _)) => future::ok(conn),})}fn self_presence<F, E>(self,stop_future: F,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>whereF: Future<Error = E>,E: Into<failure::Error>,{let XmppConnection { account, state } = self; - edit in src/xmpp/mod.rs at line 319
info!("Sending presence..."); - replacement in src/xmpp/mod.rs at line 320
Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client))} else {future::ok(future::Loop::Continue((account, client)))}} else {future::err("Got wrong event".to_owned())}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),)info!("Sending presence... {:?}", presence);state.client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);(account2, Err(failure::SyncFailure::new(e).into()))}).and_then(move |client| {XmppConnection {state: XmppState { client },account,}.processing(move |conn, event| {if let Event::Stanza(s) = event {if s.name() == "presence"&& s.attr("from").map_or(false, |f| f == conn.account.jid)&& s.attr("to").map_or(false, |f| f == conn.account.jid){Ok(true)} else {Ok(false)}} else {Err(format_err!("Wrong event while waiting self-presence"))}},stop_future,)}).then(|r| match r {Err((account, _e)) => {error!("Cann't wait self-presence");future::err(account)}Ok((conn, _)) => future::ok(conn),}) - replacement in src/xmpp/mod.rs at line 365
struct XmppProcessState<F, S> {struct XmppState<F, S> { - replacement in src/xmpp/mod.rs at line 371
impl<F, S> XmppProcessState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {XmppProcessState {impl<F, S> XmppState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {XmppState { - replacement in src/xmpp/mod.rs at line 393
future::loop_fn(XmppProcessState::new(cmd_recv, signal, conn), |s| {let XmppProcessState {future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState { - replacement in src/xmpp/mod.rs at line 416
future::ok(future::Loop::Continue(XmppProcessState::new(future::ok(future::Loop::Continue(XmppState::new( - replacement in src/xmpp/mod.rs at line 428
future::err(format_err!("Cmd error"))future::err(e) - edit in Cargo.toml at line 23
try_from = "=0.2.2" # dependency of xmpp-parsers - edit in Cargo.lock at line 1110
"try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",