use tokio::prelude::future::{self, Either};
use tokio::prelude::stream;
use tokio::prelude::{Future, Stream};
use tokio_channel::mpsc::Receiver;
use tokio_xmpp::{Client, Event};
use config;
pub struct XmppConnection {
account: config::Account,
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
}
impl XmppConnection {
fn new(account: config::Account) -> XmppConnection {
XmppConnection {
account,
inner: None,
}
}
/// connects if nothing connected
/// Error shoud be !
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
info!("xmpp connection...");
let XmppConnection { account, inner } = self;
if let Some(inner) = inner {
Box::new(future::ok(XmppConnection {
account,
inner: Some(inner),
})) as Box<Future<Item = _, Error = E>>
} else {
Box::new(future::loop_fn(account, |account| {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
// future to wait for online
Self::online(client.split(), account)
.and_then(Self::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
}))
}
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: config::Account,
) -> impl Future<Item = Self, Error = config::Account> {
Box::new(future::loop_fn(
(sink, stream, account),
|(sink, stream, account)| {
stream.into_future().then(|r| match r {
Ok((event, stream)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
account,
inner: Some((sink, stream)),
}))
}
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((sink, stream, account)))
}
_ => {
warn!("Disconnected");
future::err(account)
}
},
Err((e, _)) => {
error!("xmpp receive error: {}", e);
future::err(account)
}
})
},
))
}
fn self_presence(self) -> impl Future<Item = Self, Error = config::Account> {
let XmppConnection { account, inner } = self;
if let Some((sink, stream)) = inner {
future::ok(XmppConnection {
account,
inner: Some((sink, stream)),
})
} else {
warn!("Don't gen connection on self-presence");
future::err(account)
}
}
}
pub struct XmppCommand;
struct XmppState<F> {
cmd_recv: Receiver<XmppCommand>,
signal: F,
conn: XmppConnection,
}
impl<F> XmppState<F> {
fn new(cmd_recv: Receiver<XmppCommand>, signal: F, conn: XmppConnection) -> XmppState<F> {
XmppState {
cmd_recv,
signal,
conn,
}
}
}
pub fn xmpp_process<F>(
signal: F,
cmd_recv: Receiver<XmppCommand>,
account: config::Account,
) -> impl future::Future<Item = (), Error = tokio::io::Error>
where
F: future::Future<Item = ()> + 'static,
{
let signal = signal
.map_err(|_| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Wrong shutdown signal"));
let conn = XmppConnection::new(account);
future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
let XmppState {
cmd_recv,
signal,
conn,
} = s;
signal
.select2(conn.connect().and_then(|conn| {
info!("xmpp connected!");
cmd_recv
.into_future()
.map_err(|_| {
error!("Got error on recv cmd");
tokio::io::Error::new(tokio::io::ErrorKind::Other, "Receive cmd error")
}).map(|f| (f, conn))
})).then(|r| {
match r {
Ok(Either::A((_x, b))) => {
info!("Got signal");
// got signal, breaks
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
}
Ok(Either::B((x, a))) => {
info!("Got cmd");
// got cmd, continue
Box::new(future::ok(future::Loop::Continue(XmppState::new(
(x.0).1,
a,
x.1,
)))) as Box<Future<Item = _, Error = _>>
}
Err(Either::A((e, b))) => {
// got signal error, breaks
error!("Signal error: {}", e);
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
}
Err(Either::B((e, _a))) => {
// got cmd error, its bad
error!("Cmd error: {}", e);
Box::new(future::err(tokio::io::Error::new(
tokio::io::ErrorKind::Other,
"Cmd error",
))) as Box<Future<Item = _, Error = _>>
}
}
})
}).and_then(
|(opt_cmd_recv, _conn): (Option<Receiver<XmppCommand>>, XmppConnection)| {
if let Some(mut cmd_recv) = opt_cmd_recv {
// process left commands
info!("Stop accepting commands");
cmd_recv.close();
Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {
tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")
})) as Box<Future<Item = (), Error = tokio::io::Error>>
} else {
Box::new(future::err(tokio::io::Error::new(
tokio::io::ErrorKind::Other,
"cmd receiver gone",
)))
}
},
)
}