Move online to XmppConnection
[?]
Dec 29, 2018, 2:27 PM
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQCDependencies
- [2]
ZI4GJ72VAdd message to xmpp command - [3]
FVVPKFTLInitial commit - [4]
O2GM5J4FDon't split xmpp receiving and sending - [5]
BTOZT4JPUse failure - [6]
EOHEZXX3Move request processing to structure - [7]
QWE26TMVupdate deps - [8]
PBRUH4BJRename optional XmppConnection to MaybeXmppConnection - [9]
TDOR5XQUAccept destination - [10]
5A5UVGNMMove receiver closing logic out of xmpp processing - [11]
NDDQQP2PUpdate deps - [12]
5OBTKGDLUpdate deps - [13]
VS6AHRWIMove XMPP to separate dir - [14]
FV6BJ5K6Send self-presence and store account info in Rc so it willbe used in some future in parallel - [15]
UCY2DO3DTry to read request body - [16]
X6L47BHQUse different structure for established xmpp connection - [17]
ZJ4QKTJRFixed body reading - [18]
3GEU7TC7Welcome to 2018!
Change contents
- edit in src/xmpp/mod.rs at line 9
pub struct MaybeXmppConnection {account: std::rc::Rc<config::Account>,inner: Option<Client>,} - replacement in src/xmpp/mod.rs at line 17
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,inner: Client, - replacement in src/xmpp/mod.rs at line 20
impl XmppConnection {fn new(account: config::Account) -> XmppConnection {XmppConnection {impl From<XmppConnection> for MaybeXmppConnection {fn from(from: XmppConnection) -> MaybeXmppConnection {MaybeXmppConnection {account: from.account,inner: Some(from.inner),}}}impl MaybeXmppConnection {fn new(account: config::Account) -> MaybeXmppConnection {MaybeXmppConnection { - replacement in src/xmpp/mod.rs at line 39
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {fn connect<E: 'static>(self) -> impl Future<Item = XmppConnection, Error = E> { - replacement in src/xmpp/mod.rs at line 41
let XmppConnection { account, inner } = self;let MaybeXmppConnection { account, inner } = self; - replacement in src/xmpp/mod.rs at line 44
Box::new(future::ok(XmppConnection {account,inner: Some(inner),})) as Box<Future<Item = _, Error = E>>Box::new(future::ok(XmppConnection { account, inner }))as Box<Future<Item = _, Error = E>> - replacement in src/xmpp/mod.rs at line 58[3.1522]→[3.476:581](∅→∅),[3.643]→[3.1627:1842](∅→∅),[3.581]→[3.1627:1842](∅→∅),[3.1627]→[3.1627:1842](∅→∅)
Self::online(client.split(), account).and_then(Self::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})XmppConnection {inner: client,account,}.online().and_then(XmppConnection::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),}) - edit in src/xmpp/mod.rs at line 71
} - edit in src/xmpp/mod.rs at line 73
impl XmppConnection { - replacement in src/xmpp/mod.rs at line 76[3.1988]→[3.1988:2003](∅→∅),[3.1988]→[3.1988:2003](∅→∅),[3.1988]→[3.1988:2003](∅→∅),[3.2003]→[3.582:664](∅→∅),[3.664]→[3.528:575](∅→∅),[3.119]→[3.528:575](∅→∅),[3.2085]→[3.528:575](∅→∅),[3.575]→[3.665:739](∅→∅)
fn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: std::rc::Rc<config::Account>,) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> { - replacement in src/xmpp/mod.rs at line 78
(sink, stream, account),|(sink, stream, account)| {stream.into_future().then(|r| match r {Ok((event, stream)) => match event {(self.inner, self.account),|(client, account)| {client.into_future().then(|r| match r {Ok((event, client)) => match event { - replacement in src/xmpp/mod.rs at line 86
inner: Some((sink, stream)),inner: client, - replacement in src/xmpp/mod.rs at line 91
future::ok(future::Loop::Continue((sink, stream, account)))future::ok(future::Loop::Continue((client, account))) - replacement in src/xmpp/mod.rs at line 109
if let Some((sink, stream)) = inner {use tokio::prelude::Sink;let client = inner;use tokio::prelude::Sink; - replacement in src/xmpp/mod.rs at line 112
let presence = stanzas::make_presence(&account);info!("Sending presence...");Box::new(sink.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).join(future::loop_fn((account.clone(), stream), |(account, stream)| {stream.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, stream)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(stream))} else {future::ok(future::Loop::Continue((account, stream,)))}let presence = stanzas::make_presence(&account);info!("Sending presence...");let account2 = account.clone();Box::new(client.send(presence).map_err(|e| {error!("Error on send self-presence: {}", e);"Cann't send self-presence".to_owned()}).and_then(move |client| {future::loop_fn((account2.clone(), client), |(account, client)| {client.into_future().map_err(|(e, _)| {error!("Error on reading self-presence: {}", e);"Cann't read self-presence".to_owned()}).and_then(|(event, client)| match event {Some(event) => {if let tokio_xmpp::Event::Stanza(e) = event {info!("Get stanza: {:?}", e);if e.name() == "presence"&& e.attr("from").map_or(false, |f| f == account.jid)&& e.attr("to").map_or(false, |f| f == account.jid){info!("Self presence");future::ok(future::Loop::Break(client)) - replacement in src/xmpp/mod.rs at line 141
future::err("Got wrong event".to_owned())future::ok(future::Loop::Continue((account, client))) - edit in src/xmpp/mod.rs at line 143
} else {future::err("Got wrong event".to_owned()) - replacement in src/xmpp/mod.rs at line 146
None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e)),).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection {account,inner: Some(inner),}),}),)} else {warn!("Don't gen connection on self-presence");Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>}}None => future::err("Got closed stream".to_owned()),})}).map_err(|e| format!("waiting self-presence: {}", e))}).then(|r| match r {Err(e) => {error!("Self-presence waiting error: {}", e);future::err(account)}Ok(inner) => future::ok(XmppConnection { account, inner }),}),) - replacement in src/xmpp/mod.rs at line 164
pub struct XmppCommand {pub xmpp_to: String,pub message: String,}pub struct XmppCommand; - replacement in src/xmpp/mod.rs at line 169
conn: XmppConnection,conn: MaybeXmppConnection, - replacement in src/xmpp/mod.rs at line 173
fn new(cmd_recv: S, signal: F, conn: XmppConnection) -> XmppState<F, S> {fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> { - replacement in src/xmpp/mod.rs at line 193
let conn = XmppConnection::new(account);let conn = MaybeXmppConnection::new(account); - replacement in src/xmpp/mod.rs at line 217
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into())))) - replacement in src/xmpp/mod.rs at line 226
x.1,x.1.into(), - replacement in src/xmpp/mod.rs at line 232
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into())))) - replacement in src/xmpp/mod.rs at line 244
.and_then(|(opt_cmd_recv, _conn): (Option<S>, XmppConnection)| {.and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| { - edit in src/main.rs at line 22
use hyper::service::service_fn; - replacement in src/main.rs at line 28
use tokio::prelude::{Future, Sink, Stream};use tokio::prelude::Sink; - edit in src/main.rs at line 38
struct ServiceCmd {cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,}impl hyper::service::Service for ServiceCmd {type ReqBody = Body;type ResBody = Body;type Error = Box<dyn std::error::Error + Sync + Send + 'static>;type Future =Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>; - edit in src/main.rs at line 39
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {let xmpp_to_opt = req.headers().get("X-XMPP-To");let xmpp_to_res: Result<String, std::borrow::Cow<str>> = xmpp_to_opt.map_or_else(|| Err("none".into()),|xmpp_to| {xmpp_to.to_str().map(|x| x.to_owned()).map_err(|e| {format!("\"{}\" {}", String::from_utf8_lossy(xmpp_to.as_bytes()), e).into()})},);match xmpp_to_res {Err(err) => {warn!("Unknown destination: {}", err);Box::new(tokio::prelude::future::result(Response::builder().status(hyper::StatusCode::BAD_REQUEST).body(Body::from(format!("Unknown destination: {}", err))).map_err(|e| {Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>}),)) as Box<Future<Item = _, Error = _> + Send + 'static>}Ok(xmpp_to) => {info!("Got request. Reading body...");let cmd_send = self.cmd_send.clone();Box::new(req.into_body().map_err(|e| {Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>}).fold(String::new(), |mut acc, ch| {std::str::from_utf8(&*ch).map(|s| {acc.push_str(s);acc})}).and_then(move |message: String| {if !message.is_empty() {Box::new(cmd_send.clone().send(XmppCommand { xmpp_to, message }).then(|r| match r {Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"),)),Err(e) => {error!("Command sent error: {}", e);tokio::prelude::future::result(Response::builder().status(hyper::StatusCode::BAD_REQUEST).body(Body::from(format!("Command sent error: {}",e))),)}}).map_err(|e| {Box::new(e)as Box<dyn std::error::Error + Sync + Send + 'static,>}),)} else {warn!("Empty message");Box::new(tokio::prelude::future::result(Response::builder().status(hyper::StatusCode::BAD_REQUEST).body(Body::from("Empty message")).map_err(|e| {Box::new(e)as Box<dyn std::error::Error + Sync + Send + 'static,>}),))as Box<Future<Item = _, Error = _> + Send + 'static>}}),) as Box<Future<Item = _, Error = _> + Send + 'static>}}}}struct MakeServiceCmd {cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,}impl<Ctx> hyper::service::MakeService<Ctx> for MakeServiceCmd {type ReqBody = Body;type ResBody = Body;type Error = Box<dyn std::error::Error + Sync + Send + 'static>;type Service = ServiceCmd;type Future = tokio::prelude::future::FutureResult<ServiceCmd, Self::MakeError>;type MakeError = hyper::http::Error;fn make_service(&mut self, _ctx: Ctx) -> Self::Future {tokio::prelude::future::ok(ServiceCmd {cmd_send: self.cmd_send.clone(),})}} - replacement in src/main.rs at line 69
.serve(MakeServiceCmd { cmd_send }).serve(move || {let cmd_send = cmd_send.clone();service_fn(move |_req: Request<Body>| {info!("Got request");cmd_send.clone().send(XmppCommand {}).then(|r| match r {Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),Err(e) => {error!("Command sent error: {}", e);tokio::prelude::future::result(Response::builder().status(hyper::StatusCode::BAD_REQUEST).body(Body::from(format!("Command sent error: {}", e))),)}})})})