use futures_util::future::Either;
use futures_util::stream::Stream;
use log::{error, info, warn};
use crate::config;
mod element_processor;
mod stanzas;
mod xmpp_connection;
use xmpp_connection::MaybeXmppConnection;
pub use xmpp_connection::XmppCommand;
pub async fn xmpp_process<F, S>(
signal: F,
mut cmd_recv: S,
account: config::Account,
) -> Result<(), failure::Error>
where
F: std::future::Future<Output = ()> + Clone + 'static,
S: Stream<Item = XmppCommand> + Unpin,
{
use futures_util::StreamExt;
let mut maybe_conn: MaybeXmppConnection = account.into();
let mut recv_future = cmd_recv.next();
loop {
match maybe_conn.connect(signal.clone()).await {
Ok(conn) => match conn.processing(|_, _| Ok(false), recv_future, false).await {
(Ok(conn), Either::Left(f)) => {
maybe_conn = conn.into();
recv_future = f;
}
(Ok(_conn), Either::Right(None)) => break Ok(()),
(Ok(conn), Either::Right(Some(cmd))) => {
info!("Get command: {:?}", cmd);
maybe_conn = conn.process_command(cmd);
recv_future = cmd_recv.next();
}
(Err(acc), Either::Left(f)) => {
maybe_conn = acc.into();
recv_future = f;
}
(Err(_acc), Either::Right(None)) => break Ok(()),
(Err(acc), Either::Right(Some(cmd))) => {
warn!("Get command for broken connection: {:?}", cmd);
maybe_conn = acc.into();
recv_future = cmd_recv.next();
}
},
Err((acc, e)) => {
error!("Connection error: {}", e);
maybe_conn = acc.into();
}
}
}
}