Move XMPP to separate dir
[?]
Oct 30, 2018, 11:25 AM
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQCDependencies
- [2]
FVVPKFTLInitial commit
Change contents
- file addition: xmpp[2.6]
- file addition: mod.rs[0.7]
use tokio::prelude::future::{self, Either};use tokio::prelude::stream;use tokio::prelude::{Future, Stream};use tokio_channel::mpsc::Receiver;use tokio_xmpp::{Client, Event};use config;pub struct XmppConnection {account: config::Account,inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,}impl XmppConnection {fn new(account: config::Account) -> XmppConnection {XmppConnection {account,inner: None,}}/// connects if nothing connected/// Error shoud be !fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {info!("xmpp connection...");let XmppConnection { account, inner } = self;if let Some(inner) = inner {Box::new(future::ok(XmppConnection {account,inner: Some(inner),})) as Box<Future<Item = _, Error = E>>} else {Box::new(future::loop_fn(account, |account| {info!("xmpp initialization...");let mut res_client = Client::new(&account.jid, &account.password);while let Err(e) = res_client {error!("Cann't init xmpp client: {}", e);res_client = Client::new(&account.jid, &account.password);}let client = res_client.expect("Cann't init xmpp client");info!("xmpp initialized");// future to wait for onlineSelf::online(client.split(), account).and_then(Self::self_presence).then(|r| match r {Ok(conn) => future::ok(future::Loop::Break(conn)),Err(acc) => future::ok(future::Loop::Continue(acc)),})}))}}/// get connection and wait for online status and set presence/// returns error if something went wrongfn online((sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),account: config::Account,) -> impl Future<Item = Self, Error = config::Account> {Box::new(future::loop_fn((sink, stream, account),|(sink, stream, account)| {stream.into_future().then(|r| match r {Ok((event, stream)) => match event {Some(Event::Online) => {info!("Online");future::ok(future::Loop::Break(XmppConnection {account,inner: Some((sink, stream)),}))}Some(Event::Stanza(s)) => {info!("xmpp stanza: {:?}", s);future::ok(future::Loop::Continue((sink, stream, account)))}_ => {warn!("Disconnected");future::err(account)}},Err((e, _)) => {error!("xmpp receive error: {}", e);future::err(account)}})},))}fn self_presence(self) -> impl Future<Item = Self, Error = config::Account> {let XmppConnection { account, inner } = self;if let Some((sink, stream)) = inner {future::ok(XmppConnection {account,inner: Some((sink, stream)),})} else {warn!("Don't gen connection on self-presence");future::err(account)}}}pub struct XmppCommand;struct XmppState<F> {cmd_recv: Receiver<XmppCommand>,signal: F,conn: XmppConnection,}impl<F> XmppState<F> {fn new(cmd_recv: Receiver<XmppCommand>, signal: F, conn: XmppConnection) -> XmppState<F> {XmppState {cmd_recv,signal,conn,}}}pub fn xmpp_process<F>(signal: F,cmd_recv: Receiver<XmppCommand>,account: config::Account,) -> impl future::Future<Item = (), Error = tokio::io::Error>whereF: future::Future<Item = ()> + 'static,{let signal = signal.map_err(|_| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Wrong shutdown signal"));let conn = XmppConnection::new(account);future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {let XmppState {cmd_recv,signal,conn,} = s;signal.select2(conn.connect().and_then(|conn| {info!("xmpp connected!");cmd_recv.into_future().map_err(|_| {error!("Got error on recv cmd");tokio::io::Error::new(tokio::io::ErrorKind::Other, "Receive cmd error")}).map(|f| (f, conn))})).then(|r| {match r {Ok(Either::A((_x, b))) => {info!("Got signal");// got signal, breaksBox::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Ok(Either::B((x, a))) => {info!("Got cmd");// got cmd, continueBox::new(future::ok(future::Loop::Continue(XmppState::new((x.0).1,a,x.1,)))) as Box<Future<Item = _, Error = _>>}Err(Either::A((e, b))) => {// got signal error, breakserror!("Signal error: {}", e);Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))as Box<Future<Item = _, Error = _>>}Err(Either::B((e, _a))) => {// got cmd error, its baderror!("Cmd error: {}", e);Box::new(future::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"Cmd error",))) as Box<Future<Item = _, Error = _>>}}})}).and_then(|(opt_cmd_recv, _conn): (Option<Receiver<XmppCommand>>, XmppConnection)| {if let Some(mut cmd_recv) = opt_cmd_recv {// process left commandsinfo!("Stop accepting commands");cmd_recv.close();Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")})) as Box<Future<Item = (), Error = tokio::io::Error>>} else {Box::new(future::err(tokio::io::Error::new(tokio::io::ErrorKind::Other,"cmd receiver gone",)))}},)}