Move online to XmppConnection

[?]
Dec 29, 2018, 2:27 PM
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC

Dependencies

  • [2] ZI4GJ72V Add message to xmpp command
  • [3] FVVPKFTL Initial commit
  • [4] O2GM5J4F Don't split xmpp receiving and sending
  • [5] BTOZT4JP Use failure
  • [6] EOHEZXX3 Move request processing to structure
  • [7] QWE26TMV update deps
  • [8] PBRUH4BJ Rename optional XmppConnection to MaybeXmppConnection
  • [9] TDOR5XQU Accept destination
  • [10] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [11] NDDQQP2P Update deps
  • [12] 5OBTKGDL Update deps
  • [13] VS6AHRWI Move XMPP to separate dir
  • [14] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [15] UCY2DO3D Try to read request body
  • [16] X6L47BHQ Use different structure for established xmpp connection
  • [17] ZJ4QKTJR Fixed body reading
  • [18] 3GEU7TC7 Welcome to 2018!

Change contents

  • edit in src/xmpp/mod.rs at line 9
    [3.434]
    [3.210]
    pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    inner: Option<Client>,
    }
  • replacement in src/xmpp/mod.rs at line 17
    [3.74][3.0:77]()
    inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
    [3.74]
    [3.346]
    inner: Client,
  • replacement in src/xmpp/mod.rs at line 20
    [3.349][3.78:182]()
    impl XmppConnection {
    fn new(account: config::Account) -> XmppConnection {
    XmppConnection {
    [3.349]
    [3.479]
    impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
    MaybeXmppConnection {
    account: from.account,
    inner: Some(from.inner),
    }
    }
    }
    impl MaybeXmppConnection {
    fn new(account: config::Account) -> MaybeXmppConnection {
    MaybeXmppConnection {
  • replacement in src/xmpp/mod.rs at line 39
    [3.579][3.183:257]()
    fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
    [3.579]
    [3.653]
    fn connect<E: 'static>(self) -> impl Future<Item = XmppConnection, Error = E> {
  • replacement in src/xmpp/mod.rs at line 41
    [3.690][3.258:312]()
    let XmppConnection { account, inner } = self;
    [3.690]
    [3.744]
    let MaybeXmppConnection { account, inner } = self;
  • replacement in src/xmpp/mod.rs at line 44
    [3.782][3.313:475]()
    Box::new(future::ok(XmppConnection {
    account,
    inner: Some(inner),
    })) as Box<Future<Item = _, Error = E>>
    [3.782]
    [3.944]
    Box::new(future::ok(XmppConnection { account, inner }))
    as Box<Future<Item = _, Error = E>>
  • replacement in src/xmpp/mod.rs at line 58
    [3.1522][3.476:581](),[3.643][3.1627:1842](),[3.581][3.1627:1842](),[3.1627][3.1627:1842]()
    Self::online(client.split(), account)
    .and_then(Self::self_presence)
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
    [3.1522]
    [3.1842]
    XmppConnection {
    inner: client,
    account,
    }
    .online()
    .and_then(XmppConnection::self_presence)
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
  • edit in src/xmpp/mod.rs at line 71
    [3.1874]
    [3.1874]
    }
  • edit in src/xmpp/mod.rs at line 73
    [3.1875]
    [3.1875]
    impl XmppConnection {
  • replacement in src/xmpp/mod.rs at line 76
    [3.1988][3.1988:2003](),[3.1988][3.1988:2003](),[3.1988][3.1988:2003](),[3.2003][3.582:664](),[3.664][3.528:575](),[3.119][3.528:575](),[3.2085][3.528:575](),[3.575][3.665:739]()
    fn online(
    (sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
    account: std::rc::Rc<config::Account>,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    [3.1988]
    [3.739]
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
  • replacement in src/xmpp/mod.rs at line 78
    [3.773][3.773:963]()
    (sink, stream, account),
    |(sink, stream, account)| {
    stream.into_future().then(|r| match r {
    Ok((event, stream)) => match event {
    [3.773]
    [3.963]
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
  • replacement in src/xmpp/mod.rs at line 86
    [3.1174][3.1174:1235]()
    inner: Some((sink, stream)),
    [3.1174]
    [3.1235]
    inner: client,
  • replacement in src/xmpp/mod.rs at line 91
    [3.1404][3.1404:1492]()
    future::ok(future::Loop::Continue((sink, stream, account)))
    [3.1404]
    [3.1492]
    future::ok(future::Loop::Continue((client, account)))
  • replacement in src/xmpp/mod.rs at line 109
    [3.942][3.1843:1927]()
    if let Some((sink, stream)) = inner {
    use tokio::prelude::Sink;
    [3.942]
    [3.784]
    let client = inner;
    use tokio::prelude::Sink;
  • replacement in src/xmpp/mod.rs at line 112
    [3.785][3.1928:3847]()
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    Box::new(
    sink.send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .join(
    future::loop_fn((account.clone(), stream), |(account, stream)| {
    stream
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, stream)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from")
    .map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(stream))
    } else {
    future::ok(future::Loop::Continue((
    account, stream,
    )))
    }
    [3.785]
    [3.2608]
    let presence = stanzas::make_presence(&account);
    info!("Sending presence...");
    let account2 = account.clone();
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
  • replacement in src/xmpp/mod.rs at line 141
    [3.2657][3.3848:3934]()
    future::err("Got wrong event".to_owned())
    [3.2657]
    [3.2743]
    future::ok(future::Loop::Continue((account, client)))
  • edit in src/xmpp/mod.rs at line 143
    [3.2785]
    [3.2785]
    } else {
    future::err("Got wrong event".to_owned())
  • replacement in src/xmpp/mod.rs at line 146
    [3.2823][3.3935:4794]()
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e)),
    )
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection {
    account,
    inner: Some(inner),
    }),
    }),
    )
    } else {
    warn!("Don't gen connection on self-presence");
    Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
    }
    [3.2823]
    [3.3783]
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
  • replacement in src/xmpp/mod.rs at line 164
    [3.60][2.0:77]()
    pub struct XmppCommand {
    pub xmpp_to: String,
    pub message: String,
    }
    [3.60]
    [3.3816]
    pub struct XmppCommand;
  • replacement in src/xmpp/mod.rs at line 169
    [3.3891][3.4795:4821]()
    conn: XmppConnection,
    [3.3891]
    [3.3917]
    conn: MaybeXmppConnection,
  • replacement in src/xmpp/mod.rs at line 173
    [3.133][3.4822:4900]()
    fn new(cmd_recv: S, signal: F, conn: XmppConnection) -> XmppState<F, S> {
    [3.133]
    [3.4038]
    fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
  • replacement in src/xmpp/mod.rs at line 193
    [3.4481][3.4901:4946]()
    let conn = XmppConnection::new(account);
    [3.4481]
    [3.4526]
    let conn = MaybeXmppConnection::new(account);
  • replacement in src/xmpp/mod.rs at line 217
    [3.5285][3.4947:5034]()
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    [3.5285]
    [3.5372]
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
  • replacement in src/xmpp/mod.rs at line 226
    [3.5743][3.5035:5068]()
    x.1,
    [3.5743]
    [3.5776]
    x.1.into(),
  • replacement in src/xmpp/mod.rs at line 232
    [3.6018][3.5069:5156]()
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    [3.6018]
    [3.6105]
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
  • replacement in src/xmpp/mod.rs at line 244
    [3.900][3.5157:5226]()
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, XmppConnection)| {
    [3.900]
    [3.969]
    .and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
  • edit in src/main.rs at line 22
    [3.3921]
    [3.3953]
    use hyper::service::service_fn;
  • replacement in src/main.rs at line 28
    [3.4066][2.78:122]()
    use tokio::prelude::{Future, Sink, Stream};
    [3.4066]
    [3.4092]
    use tokio::prelude::Sink;
  • edit in src/main.rs at line 38
    [3.4260][2.123:481]()
    struct ServiceCmd {
    cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
    }
    impl hyper::service::Service for ServiceCmd {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
    type Future =
    Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
  • edit in src/main.rs at line 39
    [3.399][2.482:5634]()
    fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
    let xmpp_to_opt = req.headers().get("X-XMPP-To");
    let xmpp_to_res: Result<String, std::borrow::Cow<str>> = xmpp_to_opt.map_or_else(
    || Err("none".into()),
    |xmpp_to| {
    xmpp_to.to_str().map(|x| x.to_owned()).map_err(|e| {
    format!("\"{}\" {}", String::from_utf8_lossy(xmpp_to.as_bytes()), e).into()
    })
    },
    );
    match xmpp_to_res {
    Err(err) => {
    warn!("Unknown destination: {}", err);
    Box::new(tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!("Unknown destination: {}", err)))
    .map_err(|e| {
    Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
    }),
    )) as Box<Future<Item = _, Error = _> + Send + 'static>
    }
    Ok(xmpp_to) => {
    info!("Got request. Reading body...");
    let cmd_send = self.cmd_send.clone();
    Box::new(
    req.into_body()
    .map_err(|e| {
    Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
    })
    .fold(String::new(), |mut acc, ch| {
    std::str::from_utf8(&*ch).map(|s| {
    acc.push_str(s);
    acc
    })
    })
    .and_then(move |message: String| {
    if !message.is_empty() {
    Box::new(
    cmd_send
    .clone()
    .send(XmppCommand { xmpp_to, message })
    .then(|r| match r {
    Ok(_) => tokio::prelude::future::ok(Response::new(
    Body::from("Accepted"),
    )),
    Err(e) => {
    error!("Command sent error: {}", e);
    tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!(
    "Command sent error: {}",
    e
    ))),
    )
    }
    })
    .map_err(|e| {
    Box::new(e)
    as Box<
    dyn std::error::Error + Sync + Send + 'static,
    >
    }),
    )
    } else {
    warn!("Empty message");
    Box::new(tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from("Empty message"))
    .map_err(|e| {
    Box::new(e)
    as Box<
    dyn std::error::Error + Sync + Send + 'static,
    >
    }),
    ))
    as Box<Future<Item = _, Error = _> + Send + 'static>
    }
    }),
    ) as Box<Future<Item = _, Error = _> + Send + 'static>
    }
    }
    }
    }
    struct MakeServiceCmd {
    cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
    }
    impl<Ctx> hyper::service::MakeService<Ctx> for MakeServiceCmd {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
    type Service = ServiceCmd;
    type Future = tokio::prelude::future::FutureResult<ServiceCmd, Self::MakeError>;
    type MakeError = hyper::http::Error;
    fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
    tokio::prelude::future::ok(ServiceCmd {
    cmd_send: self.cmd_send.clone(),
    })
    }
    }
  • replacement in src/main.rs at line 69
    [3.5155][2.5635:5679]()
    .serve(MakeServiceCmd { cmd_send })
    [3.5155]
    [3.5935]
    .serve(move || {
    let cmd_send = cmd_send.clone();
    service_fn(move |_req: Request<Body>| {
    info!("Got request");
    cmd_send.clone().send(XmppCommand {}).then(|r| match r {
    Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),
    Err(e) => {
    error!("Command sent error: {}", e);
    tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!("Command sent error: {}", e))),
    )
    }
    })
    })
    })