Use env logger. Implement command loop

[?]
Oct 28, 2018, 6:54 AM
PHPCAQ6ZOH64J22YO3LAHZDZ6IWRKDT4CRRC4Y2I5ADERWPBRWIQC

Dependencies

  • [2] YGC7ZD7N Use logger
  • [3] HKSQO7JZ Enable hyper http server and configuration
  • [4] 6DSWVNSY Launch single-thread executor on own thread
  • [5] AVBHYHOA Stop xmpp thread by shudown
  • [6] FVVPKFTL Initial commit

Change contents

  • replacement in src/xmpp.rs at line 0
    [3.10][3.11:39]()
    use tokio::prelude::future;
    [3.10]
    [3.39]
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::{Future, Stream};
    use tokio_channel::mpsc::Receiver;
    pub struct XmppCommand;
    struct XmppState<F> {
    cmd_recv: Receiver<XmppCommand>,
    signal: F,
    }
  • replacement in src/xmpp.rs at line 11
    [3.40][3.40:134]()
    pub fn xmpp_process<F>(signal: F) -> impl future::Future<Item = (), Error = tokio::io::Error>
    [3.40]
    [3.134]
    impl<F> XmppState<F> {
    fn new(cmd_recv: Receiver<XmppCommand>, signal: F) -> XmppState<F> {
    XmppState { cmd_recv, signal }
    }
    }
    pub fn xmpp_process<F>(
    signal: F,
    cmd_recv: Receiver<XmppCommand>,
    ) -> impl future::Future<Item = (), Error = tokio::io::Error>
  • replacement in src/xmpp.rs at line 24
    [3.176][3.176:276]()
    signal.map_err(|_| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Wrong shutdown signal"))
    [3.176]
    [3.276]
    let signal = signal
    .map_err(|_| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Wrong shutdown signal"));
    future::loop_fn(XmppState::new(cmd_recv, signal), |s| {
    s.signal.select2(s.cmd_recv.into_future()).then(|r| {
    match r {
    Ok(Either::A((_x, _b))) => {
    // got signal, breaks
    future::ok(future::Loop::Break(()))
    }
    Ok(Either::B((x, a))) => {
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(x.1, a)))
    }
    Err(Either::A((e, _b))) => {
    // got signal error, breaks
    error!("Signal error: {}", e);
    future::ok(future::Loop::Break(()))
    }
    Err(Either::B((_e, _a))) => {
    // got cmd error, its bad
    future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "Cmd error",
    ))
    }
    }
    })
    })
  • edit in src/main.rs at line 1
    [3.21]
    [3.23]
    #![deny(missing_docs)]
    //! XMPP client service
  • edit in src/main.rs at line 13
    [3.121]
    [3.121]
    extern crate tokio_channel;
  • replacement in src/main.rs at line 18
    [3.193][3.193:228](),[3.193][3.193:228]()
    use hyper::service::service_fn_ok;
    [3.193]
    [3.228]
    use hyper::service::service_fn;
  • edit in src/main.rs at line 21
    [3.275]
    [3.275]
    use tokio::runtime::current_thread;
  • edit in src/main.rs at line 23
    [3.304]
    [3.304]
    use tokio::prelude::Sink;
  • replacement in src/main.rs at line 29
    [3.338][3.338:434]()
    fn hello_world(_req: Request<Body>) -> Response<Body> {
    Response::new(Body::from("Test"))
    }
    [3.280]
    [3.434]
    mod xmpp;
    use xmpp::{xmpp_process, XmppCommand};
  • edit in src/main.rs at line 57
    [3.1137]
    [3.1137]
    let (cmd_send, cmd_recv) = tokio_channel::mpsc::channel(10);
  • replacement in src/main.rs at line 61
    [3.1187][3.1187:1293](),[3.1187][3.1187:1293]()
    .serve(|| service_fn_ok(hello_world))
    .with_graceful_shutdown(ctrl_c.clone().map(|_| ()))
    [3.1187]
    [2.150]
    .serve(move || {
    let cmd_send = cmd_send.clone();
    service_fn(move |_req: Request<Body>| {
    info!("Got request");
    cmd_send.clone().send(XmppCommand {}).then(|r| match r {
    Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),
    Err(e) => {
    error!("Command sent error: {}", e);
    tokio::prelude::future::result(
    Response::builder()
    .status(hyper::StatusCode::BAD_REQUEST)
    .body(Body::from(format!("Command sent error: {}", e))),
    )
    }
    })
    })
    }).with_graceful_shutdown(ctrl_c.clone().map(|_| ()))
  • edit in src/main.rs at line 79
    [2.203]
    [3.176]
    let mut rt = Runtime::new().expect("Cann't start tokio");
    rt.spawn(http_server);
  • replacement in src/main.rs at line 84
    [3.177][2.204:237]()
    hyper::rt::run(http_server);
    [3.177]
    [3.61]
    let xmpp_join = std::thread::spawn(move || -> Result<(), tokio::io::Error> {
    // Launch single-threaded runtime
    let mut ctrt = current_thread::Runtime::new()?;
    ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()), cmd_recv))
    });
    info!("Server started");
    rt.shutdown_on_idle().wait().expect("Shutdown error");
    info!("Server stopper");
    xmpp_join
    .join()
    .expect("Join xmpp thread")
    .expect("Result xmpp thread");
  • edit in Cargo.toml at line 9
    [3.2097]
    [3.2097]
    tokio-channel = "0.1"
  • edit in Cargo.lock at line 1052
    [3.35655]
    [3.35655]
    "tokio-channel 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
  • edit in Cargo.lock at line 1325
    [3.45320]
    [3.45320]
    ]
    [[package]]
    name = "tokio-channel"
    version = "0.1.0"
    source = "registry+https://github.com/rust-lang/crates.io-index"
    dependencies = [
    "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
  • edit in Cargo.lock at line 1932
    [3.84002]
    [3.84002]
    "checksum tokio-channel 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "530d0fb87416dd531600f7cccc438bb35c5b91883065c9e6dca7cdecee991cfa"