Manage xmpp connection in cmd processing loop

[?]
Oct 28, 2018, 1:32 PM
F5UQL2FBQIM7TRVPVCFT3JBPZ6O75SKL6IRWR7RX6BWPVNRA7PFAC

Dependencies

  • [2] 7OTSYQ4Z Process leftover commands
  • [3] FVVPKFTL Initial commit
  • [4] AVBHYHOA Stop xmpp thread by shudown
  • [5] PHPCAQ6Z Use env logger. Implement command loop
  • [6] 6DSWVNSY Launch single-thread executor on own thread

Change contents

  • edit in src/xmpp.rs at line 3
    [3.117]
    [3.117]
    pub struct XmppConnection {
    jid: String,
    password: String,
    inner: Option<()>,
    }
    impl XmppConnection {
    fn new(jid: &str, password: &str) -> XmppConnection {
    XmppConnection {
    jid: jid.to_owned(),
    password: password.to_owned(),
    inner: None,
    }
    }
  • edit in src/xmpp.rs at line 19
    [3.118]
    [3.118]
    /// connects if nothing connected
    /// Error shoud be !
    fn connect<E>(self) -> impl Future<Item = Self, Error = E> {
    let inner = self.inner.unwrap_or_else(|| ());
    future::ok(XmppConnection {
    jid: self.jid,
    password: self.password,
    inner: Some(inner),
    })
    }
    }
  • edit in src/xmpp.rs at line 37
    [3.217]
    [3.217]
    conn: XmppConnection,
  • replacement in src/xmpp.rs at line 41
    [3.243][3.243:355]()
    fn new(cmd_recv: Receiver<XmppCommand>, signal: F) -> XmppState<F> {
    XmppState { cmd_recv, signal }
    [3.243]
    [3.355]
    fn new(cmd_recv: Receiver<XmppCommand>, signal: F, conn: XmppConnection) -> XmppState<F> {
    XmppState {
    cmd_recv,
    signal,
    conn,
    }
  • replacement in src/xmpp.rs at line 50
    [3.364][3.364:388]()
    pub fn xmpp_process<F>(
    [3.364]
    [3.388]
    pub fn xmpp_process<F: 'static>(
  • edit in src/xmpp.rs at line 53
    [3.440]
    [3.440]
    jid: &str,
    password: &str,
  • replacement in src/xmpp.rs at line 62
    [3.627][3.627:771](),[3.771][2.0:44](),[2.44][3.816:858](),[3.816][3.816:858](),[3.858][2.45:113](),[2.113][3.914:1113](),[3.914][3.914:1113](),[3.1113][2.114:158](),[2.158][3.1158:1257](),[3.1158][3.1158:1257](),[3.1257][2.159:227](),[2.227][3.1313:1591](),[3.1313][3.1313:1591]()
    future::loop_fn(XmppState::new(cmd_recv, signal), |s| {
    s.signal.select2(s.cmd_recv.into_future()).then(|r| {
    match r {
    Ok(Either::A((_x, b))) => {
    // got signal, breaks
    future::ok(future::Loop::Break(b.into_inner()))
    }
    Ok(Either::B((x, a))) => {
    // got cmd, continue
    future::ok(future::Loop::Continue(XmppState::new(x.1, a)))
    }
    Err(Either::A((e, b))) => {
    // got signal error, breaks
    error!("Signal error: {}", e);
    future::ok(future::Loop::Break(b.into_inner()))
    }
    Err(Either::B((_e, _a))) => {
    // got cmd error, its bad
    future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "Cmd error",
    ))
    [3.627]
    [3.1591]
    let conn = XmppConnection::new(jid, password);
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    cmd_recv,
    signal,
    conn,
    } = s;
    signal
    .select2(conn.connect().and_then(|conn| {
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "Receive cmd error")
    }).map(|f| (f, conn))
    })).then(|r| {
    match r {
    Ok(Either::A((_x, b))) => {
    // got signal, breaks
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    }
    Ok(Either::B((x, a))) => {
    // got cmd, continue
    Box::new(future::ok(future::Loop::Continue(XmppState::new(
    (x.0).1,
    a,
    x.1,
    )))) as Box<Future<Item = _, Error = _>>
    }
    Err(Either::A((e, b))) => {
    // got signal error, breaks
    error!("Signal error: {}", e);
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    }
    Err(Either::B((_e, _a))) => {
    // got cmd error, its bad
    Box::new(future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "Cmd error",
    ))) as Box<Future<Item = _, Error = _>>
    }
  • edit in src/xmpp.rs at line 107
    [3.1609]
    [3.1609]
    })
    }).and_then(
    |(opt_cmd_recv, _conn): (Option<Receiver<XmppCommand>>, XmppConnection)| {
    if let Some(mut cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    cmd_recv.close();
    Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")
    })) as Box<Future<Item = (), Error = tokio::io::Error>>
    } else {
    Box::new(future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "cmd receiver gone",
    )))
  • replacement in src/xmpp.rs at line 123
    [3.1623][3.1623:1634](),[3.1623][3.1623:1634](),[3.1634][2.228:875](),[2.875][3.1634:1641](),[3.1634][3.1634:1641]()
    })
    }).and_then(|opt_cmd_recv: Option<Receiver<XmppCommand>>| {
    if let Some(mut cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    cmd_recv.close();
    Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")
    })) as Box<Future<Item = (), Error = tokio::io::Error>>
    } else {
    Box::new(future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "cmd receiver gone",
    )))
    }
    })
    [3.1623]
    [3.276]
    },
    )
  • replacement in src/main.rs at line 87
    [3.3038][3.3038:3112]()
    ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()), cmd_recv))
    [3.3038]
    [3.3112]
    ctrt.block_on(xmpp_process(
    ctrl_c.clone().map(|_| ()),
    cmd_recv,
    &config.account.jid,
    &config.account.password,
    ))