Manage xmpp connection in cmd processing loop
[?]
Oct 28, 2018, 1:32 PM
F5UQL2FBQIM7TRVPVCFT3JBPZ6O75SKL6IRWR7RX6BWPVNRA7PFACDependencies
- [2]
7OTSYQ4ZProcess leftover commands - [3]
FVVPKFTLInitial commit - [4]
AVBHYHOAStop xmpp thread by shudown - [5]
PHPCAQ6ZUse env logger. Implement command loop - [6]
6DSWVNSYLaunch single-thread executor on own thread
Change contents
- edit in src/xmpp.rs at line 3
pub struct XmppConnection {jid: String,password: String,inner: Option<()>,}impl XmppConnection {fn new(jid: &str, password: &str) -> XmppConnection {XmppConnection {jid: jid.to_owned(),password: password.to_owned(),inner: None,}} - edit in src/xmpp.rs at line 19
/// connects if nothing connected/// Error shoud be !fn connect<E>(self) -> impl Future<Item = Self, Error = E> {let inner = self.inner.unwrap_or_else(|| ());future::ok(XmppConnection {jid: self.jid,password: self.password,inner: Some(inner),})}} - edit in src/xmpp.rs at line 37
conn: XmppConnection, - replacement in src/xmpp.rs at line 41
fn new(cmd_recv: Receiver<XmppCommand>, signal: F) -> XmppState<F> {XmppState { cmd_recv, signal }fn new(cmd_recv: Receiver<XmppCommand>, signal: F, conn: XmppConnection) -> XmppState<F> {XmppState {cmd_recv,signal,conn,} - replacement in src/xmpp.rs at line 50
pub fn xmpp_process<F>(pub fn xmpp_process<F: 'static>( - edit in src/xmpp.rs at line 53
jid: &str,password: &str, - replacement in src/xmpp.rs at line 62[3.627]→[3.627:771](∅→∅),[3.771]→[2.0:44](∅→∅),[2.44]→[3.816:858](∅→∅),[3.816]→[3.816:858](∅→∅),[3.858]→[2.45:113](∅→∅),[2.113]→[3.914:1113](∅→∅),[3.914]→[3.914:1113](∅→∅),[3.1113]→[2.114:158](∅→∅),[2.158]→[3.1158:1257](∅→∅),[3.1158]→[3.1158:1257](∅→∅),[3.1257]→[2.159:227](∅→∅),[2.227]→[3.1313:1591](∅→∅),[3.1313]→[3.1313:1591](∅→∅)
future::loop_fn(XmppState::new(cmd_recv, signal), |s| {s.signal.select2(s.cmd_recv.into_future()).then(|r| {match r {Ok(Either::A((_x, b))) => {// got signal, breaksfuture::ok(future::Loop::Break(b.into_inner()))}Ok(Either::B((x, a))) => {// got cmd, continuefuture::ok(future::Loop::Continue(XmppState::new(x.1, a)))}Err(Either::A((e, b))) => {// got signal error, breakserror!("Signal error: {}", e);future::ok(future::Loop::Break(b.into_inner()))}Err(Either::B((_e, _a))) => {// got cmd error, its badfuture::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"Cmd error",))let conn = XmppConnection::new(jid, password);future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState {cmd_recv,signal,conn,} = s;signal.select2(conn.connect().and_then(|conn| {cmd_recv.into_future().map_err(|_| {error!("Got error on recv cmd");tokio::io::Error::new(tokio::io::ErrorKind::Other, "Receive cmd error")}).map(|f| (f, conn))})).then(|r| {match r {Ok(Either::A((_x, b))) => {// got signal, breaksBox::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Ok(Either::B((x, a))) => {// got cmd, continueBox::new(future::ok(future::Loop::Continue(XmppState::new((x.0).1,a,x.1,)))) as Box<Future<Item = _, Error = _>>}Err(Either::A((e, b))) => {// got signal error, breakserror!("Signal error: {}", e);Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Err(Either::B((_e, _a))) => {// got cmd error, its badBox::new(future::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"Cmd error",))) as Box<Future<Item = _, Error = _>>} - edit in src/xmpp.rs at line 107
})}).and_then(|(opt_cmd_recv, _conn): (Option<Receiver<XmppCommand>>, XmppConnection)| {if let Some(mut cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");cmd_recv.close();Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")})) as Box<Future<Item = (), Error = tokio::io::Error>>} else {Box::new(future::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"cmd receiver gone",))) - replacement in src/xmpp.rs at line 123[3.1623]→[3.1623:1634](∅→∅),[3.1623]→[3.1623:1634](∅→∅),[3.1634]→[2.228:875](∅→∅),[2.875]→[3.1634:1641](∅→∅),[3.1634]→[3.1634:1641](∅→∅)
})}).and_then(|opt_cmd_recv: Option<Receiver<XmppCommand>>| {if let Some(mut cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");cmd_recv.close();Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")})) as Box<Future<Item = (), Error = tokio::io::Error>>} else {Box::new(future::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"cmd receiver gone",)))}})},) - replacement in src/main.rs at line 87
ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()), cmd_recv))ctrt.block_on(xmpp_process(ctrl_c.clone().map(|_| ()),cmd_recv,&config.account.jid,&config.account.password,))