use futures_util::future::Either;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use tokio_xmpp::SimpleClient;
use log::{error, info, warn};
use super::stanzas;
use super::element_processor;
use crate::config;
#[derive(Debug)]
pub enum XmppCommand {
Chat {
xmpp_to: xmpp_parsers::Jid,
message: String,
},
Chatroom { muc_id: String, message: String },
Ping {
opt_xmpp_to: Option<xmpp_parsers::Jid>,
},
Presence {
show: xmpp_parsers::presence::Show,
message: String,
},
ChatroomPresence {
muc_id: String,
show: xmpp_parsers::presence::Show,
message: String,
},
TimeoutCleanup,
}
trait IqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool;
fn error(
self: Box<Self>,
conn: &mut XmppConnection,
error: xmpp_parsers::stanza_error::StanzaError,
) -> bool;
fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
}
struct AddRosterIqHandler {
jid: xmpp_parsers::BareJid,
}
impl IqHandler for AddRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
match opt_element {
Some(element) => {
warn!(
"Wrong payload when adding {} to roster: {}",
self.jid,
String::from(&element)
);
}
None => {
if conn.state.data.roster.contains_key(&self.jid) {
info!("Jid {} updated to roster", self.jid);
conn.process_jid(&self.jid);
} else {
info!("Jid {} added in roster", self.jid);
conn.state.data.roster.insert(
self.jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
}
}
true
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
true
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
true }
}
struct PingIqHandler {
ignorable: bool,
}
impl IqHandler for PingIqHandler {
fn result(
self: Box<Self>,
_conn: &mut XmppConnection,
_opt_element: Option<xmpp_parsers::Element>,
) -> bool {
info!("ping successed");
true
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
self.ignorable
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
self.ignorable
}
}
struct InitRosterIqHandler {}
impl IqHandler for InitRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
if let Some(result) = opt_element {
match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
Ok(roster) => {
conn.state.data.roster_init = true;
conn.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
conn.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
true
}
Err(e) => {
error!("Cann't parse roster: {}", e);
false
}
}
} else {
error!("No roster responded");
false
}
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
struct SelfDiscoveryIqHandler {}
impl IqHandler for SelfDiscoveryIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
if let Some(result) = opt_element {
match result.try_into() as Result<xmpp_parsers::disco::DiscoInfoResult, _> {
Ok(self_discovery) => {
conn.state.data.self_discovery_init = true;
conn.state.data.self_pubsub_pep = false;
for i in self_discovery.identities {
if i.category == "pubsub" && i.type_ == "pep" {
conn.state.data.self_pubsub_pep = true;
break;
}
}
info!(
"Support XEP-0163: Personal Eventing Protocol: {}",
conn.state.data.self_pubsub_pep
);
true
}
Err(e) => {
error!("Cann't parse self discovery: {}", e);
false
}
}
} else {
error!("No self discovery");
false
}
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
struct IqWait {
sent: Instant,
timeout: Duration,
handler: Box<dyn IqHandler>,
}
impl IqWait {
pub fn new<T: IqHandler + 'static>(timeout_secs: u64, handler: T) -> IqWait {
IqWait {
sent: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
handler: Box::new(handler),
}
}
}
#[derive(Default)]
struct XmppData {
roster: HashMap<
xmpp_parsers::BareJid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
roster_init: bool,
self_discovery_init: bool,
self_pubsub_pep: bool,
self_presence: bool,
counter: usize,
send_queue: VecDeque<minidom::Element>,
outgoing_mailbox: HashMap<xmpp_parsers::BareJid, Vec<(String, xmpp_parsers::Jid, String)>>,
mucs: HashMap<String, xmpp_parsers::Jid>,
pending_ids: HashMap<String, IqWait>,
used_domains: HashSet<String>,
error_mucs: HashSet<xmpp_parsers::Jid>,
}
struct XmppState {
client: SimpleClient,
data: XmppData,
}
pub struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
trait IqRequestHandler {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq;
}
struct IqRequestUnknown {
element: xmpp_parsers::Element,
type_: &'static str,
}
impl IqRequestHandler for IqRequestUnknown {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
warn!(
"Unsupported IQ {} request from {:?}: {}",
self.type_,
from,
String::from(&self.element)
);
stanzas::make_iq_unsupported_error(id, conn.account.jid.clone(), from)
}
}
struct IqSetRoster {}
impl IqRequestHandler for IqSetRoster {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got roster push {} from {:?}", id, from);
stanzas::make_roster_push_answer(id, conn.account.jid.clone(), from)
}
}
struct IqGetPing {}
impl IqRequestHandler for IqGetPing {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got ping {} from {:?}", id, from);
stanzas::make_pong(id, conn.account.jid.clone(), from)
}
}
struct IqGetVersion {}
impl IqRequestHandler for IqGetVersion {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got version query {} from {:?}", id, from);
stanzas::make_version(id, conn.account.jid.clone(), from)
}
}
struct IqGetDiscoInfo {}
impl IqRequestHandler for IqGetDiscoInfo {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got disco query {} from {:?}", id, from);
stanzas::make_disco_info_result(id, conn.account.jid.clone(), from)
}
}
struct IqGetDiscoItems {}
impl IqRequestHandler for IqGetDiscoItems {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got disco items query {} from {:?}", id, from);
stanzas::make_disco_items_result(id, conn.account.jid.clone(), from)
}
}
struct IqGetDiscoCommands {}
impl IqRequestHandler for IqGetDiscoCommands {
fn process(
self: Box<Self>,
conn: &mut XmppConnection,
id: String,
from: Option<xmpp_parsers::Jid>,
) -> xmpp_parsers::iq::Iq {
info!("Got disco commands query {} from {:?}", id, from);
stanzas::make_disco_items_commands(id, conn.account.jid.clone(), from)
}
}
lazy_static::lazy_static! {
static ref INCOMING: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element> = {
let mut incoming = element_processor::Processor::new(&|_, e| {
warn!("Unknown stanza {}", String::from(&e));
true
});
incoming.register(&XmppConnection::incoming_iq_processing);
incoming.register(&XmppConnection::incoming_presence_processing);
incoming.register(&XmppConnection::incoming_message_processing);
incoming
};
static ref INCOMING_IQ_SET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {
let mut iq_set =
element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {
Box::new(IqRequestUnknown {
element,
type_: "set",
}) as Box<dyn IqRequestHandler>
});
iq_set.register(&XmppConnection::incoming_iq_processing_set_roster);
iq_set
};
static ref INCOMING_IQ_GET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {
let mut iq_get =
element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {
Box::new(IqRequestUnknown {
element,
type_: "get",
}) as Box<dyn IqRequestHandler>
});
iq_get.register(&XmppConnection::incoming_iq_processing_get_ping);
iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_info);
iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_items);
iq_get.register(&XmppConnection::incoming_iq_processing_get_version);
iq_get
};
}
pub struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
pub async fn connect<F>(
self,
stop_future: F,
) -> Result<XmppConnection, (std::rc::Rc<config::Account>, failure::Error)>
where
F: std::future::Future<Output = ()> + Clone + 'static,
{
info!("xmpp check connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Ok(XmppConnection { account, state })
} else {
loop {
let pin_stop_future = stop_future.clone();
tokio::pin!(pin_stop_future);
let connect = futures_util::future::select(
pin_stop_future,
Box::pin(async {
info!("xmpp initialization...");
let client = match SimpleClient::new_with_jid(
account.jid.clone(),
account.password.clone(),
)
.await
{
Ok(client) => client,
Err(e) => {
error!("Initialization error: {}", e);
return (Err(account.clone()), Either::Right(()));
}
};
let stop_future = stop_future.clone();
let (connected, stopped) = XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account: account.clone(),
}
.initial_roster(Box::pin(stop_future))
.await;
let (connected, stopped) = match (connected, stopped) {
(Ok(connection), Either::Left(stop_future)) => {
connection.self_presence(stop_future).await
}
(connected, stopped) => (connected, stopped),
};
let (connected, stopped) = match (connected, stopped) {
(Ok(connection), Either::Left(stop_future)) => {
connection.enter_mucs(stop_future).await
}
(connected, stopped) => (connected, stopped),
};
match (connected, stopped) {
(Ok(connection), Either::Left(stop_future)) => {
connection.self_discovery(stop_future).await
}
(connected, stopped) => (connected, stopped),
}
}),
)
.await;
match connect {
Either::Left((_, _)) => {
break Err((
account.clone(),
failure::format_err!("Stop XMPP connection"),
))
}
Either::Right(((_, Either::Right(_)), _)) => {
break Err((
account.clone(),
failure::format_err!("Stop XMPP connection(2)"),
))
}
Either::Right(((result, Either::Left(_)), _)) => match result {
Ok(connection) => break Ok(connection),
Err(_account) => {}
},
}
}
}
}
}
impl XmppConnection {
fn xmpp_processing(
&mut self,
event: &Result<xmpp_parsers::Element, tokio_xmpp::Error>,
) -> bool {
match event {
Ok(stanza) => INCOMING.process(self, stanza.clone()),
Err(error) => {
error!("Unexpected event {:?}", error);
false
}
}
}
fn incoming_iq_processing_set_roster(
&mut self,
roster: xmpp_parsers::roster::Roster,
) -> Box<dyn IqRequestHandler> {
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
}
Box::new(IqSetRoster {})
}
fn incoming_iq_processing_set(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
INCOMING_IQ_SET
.process(self, element)
.process(self, id, from)
}
fn incoming_iq_processing_get_ping(
&mut self,
_ping: xmpp_parsers::ping::Ping,
) -> Box<dyn IqRequestHandler> {
Box::new(IqGetPing {})
}
fn incoming_iq_processing_get_disco_info(
&mut self,
disco: xmpp_parsers::disco::DiscoInfoQuery,
) -> Box<dyn IqRequestHandler> {
if let Some(ref node) = disco.node {
warn!("Unsupported node {}", node);
Box::new(IqRequestUnknown {
element: disco.into(),
type_: "get",
})
} else {
Box::new(IqGetDiscoInfo {})
}
}
fn incoming_iq_processing_get_disco_items(
&mut self,
disco: xmpp_parsers::disco::DiscoItemsQuery,
) -> Box<dyn IqRequestHandler> {
match &disco.node {
Some(node) if node == "http://jabber.org/protocol/commands" => {
Box::new(IqGetDiscoCommands {})
}
Some(node) => {
warn!("Unsupported node {}", node);
Box::new(IqRequestUnknown {
element: disco.into(),
type_: "get",
})
}
None => Box::new(IqGetDiscoItems {}),
}
}
fn incoming_iq_processing_get_version(
&mut self,
_version: xmpp_parsers::version::VersionQuery,
) -> Box<dyn IqRequestHandler> {
Box::new(IqGetVersion {})
}
fn incoming_iq_processing_get(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
INCOMING_IQ_GET
.process(self, element)
.process(self, id, from)
}
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
match iq.payload {
xmpp_parsers::iq::IqType::Set(element) => {
let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Error(e) => {
if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
return data.handler.error(self, e);
}
error!("iq error: {:?}", e);
return false;
}
xmpp_parsers::iq::IqType::Get(element) => {
let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Result(opt_element) => {
if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
let now = Instant::now();
if data.sent + data.timeout > now {
info!(
"Timeout for {} was {:?} expected {:?}",
iq.id,
now - data.sent,
data.timeout
);
return data.handler.result(self, opt_element);
} else {
warn!(
"Timeout for {} was {:?} expected {:?}",
iq.id,
now - data.sent,
data.timeout
);
return data.handler.timeout(self);
}
}
warn!(
"Unwanted iq result id {} from {:?}: {:?}",
iq.id,
iq.from,
opt_element.map(|e| String::from(&e))
);
}
}
true
}
fn incoming_presence_processing(&mut self, presence: xmpp_parsers::presence::Presence) -> bool {
if presence.from.as_ref() == Some(&self.account.jid) {
info!("Self-presence accepted");
self.state.data.self_presence = true;
} else {
match presence.type_ {
xmpp_parsers::presence::Type::Error => {
if presence.to.as_ref() == Some(&self.account.jid) {
if let Some(room) = self
.account
.chatrooms
.values()
.find(|&j| Some(j) == presence.from.as_ref())
{
error!(
"Got error from MUC {}. Try again later: {:?}",
room, presence
);
self.state.data.error_mucs.insert(room.clone());
return true;
}
}
error!("Incoming presence stanza error: {:?}", presence);
if let Some(from) = presence.from {
if let Some(ref mut mailbox) = self
.state
.data
.outgoing_mailbox
.remove(&from.clone().into())
{
if !mailbox.is_empty() {
error!(
"Removed {} undeliverable messages for {}",
mailbox.len(),
from
);
}
}
}
}
xmpp_parsers::presence::Type::Unavailable => {
if presence.to.as_ref() == Some(&self.account.jid) {
if let Some(room) = self
.account
.chatrooms
.values()
.find(|&j| Some(j) == presence.from.as_ref())
{
warn!(
"Got disconnected from MUC {}. Try again: {:?}",
room, presence
);
self.state.data.counter += 1;
let id_muc_presence =
format!("id_muc_presence{}", self.state.data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
self.account.jid.clone(),
room.clone(),
None,
None,
);
self.state.data.send_queue.push_back(muc_presence);
} else {
warn!(
"Incoming Unavailable presence stanza to self: {:?}",
presence
);
}
} else {
warn!("Incoming Unavailable presence stanza: {:?}", presence);
}
}
_ => {
warn!("Incoming presence stanza: {:?}", presence);
}
}
}
true
}
fn incoming_message_processing(&mut self, message: xmpp_parsers::message::Message) -> bool {
for payload in message.payloads.iter() {
if let Some(_delay) =
payload.clone().try_into().ok() as Option<xmpp_parsers::delay::Delay>
{
return true; }
}
warn!("Incoming message stanza: {:?}", message);
true
}
pub async fn processing<S, F, O>(
mut self,
mut stop_condition: S,
stop_future: F,
stop_fatal: bool,
) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, O>)
where
S: FnMut(&mut Self, Result<xmpp_parsers::Element, tokio_xmpp::Error>) -> Result<bool, ()>
+ 'static,
F: std::future::Future<Output = O> + Unpin,
{
let mut stop_future = stop_future;
loop {
if let Some(send_element) = self.state.data.send_queue.pop_front() {
info!("Sending {}", String::from(&send_element));
let stop = {
let send_future = self.state.client.send_stanza(send_element);
tokio::pin!(send_future);
match futures_util::future::select(stop_future, send_future).await {
Either::Left((stop, send_future)) => {
if stop_fatal {
warn!("Sending interrupted!");
} else {
match send_future.await {
Ok(_) => {
info!("Sent!");
}
Err(e) => {
error!("Send stanza error: {}", e);
break (Err(self.account), Either::Right(stop));
}
}
}
stop
}
Either::Right((Ok(_), f)) => {
info!("Sent!");
stop_future = f;
continue;
}
Either::Right((Err(e), f)) => {
error!("Send stanza error: {}", e);
break (Err(self.account), Either::Left(f));
}
}
};
break (Ok(self), Either::Right(stop));
} else {
use futures_util::StreamExt;
let recv_future = self.state.client.next();
tokio::pin!(recv_future);
match futures_util::future::select(stop_future, recv_future).await {
Either::Left((stop, _s)) => break (Ok(self), Either::Right(stop)),
Either::Right((None, f)) => {
break (Err(self.account), Either::Left(f));
}
Either::Right((Some(event), f)) => {
stop_future = f;
if self.xmpp_processing(&event) {
match stop_condition(&mut self, event) {
Ok(true) => break (Ok(self), Either::Left(stop_future)),
Ok(false) => {}
Err(_) => break (Err(self.account), Either::Left(stop_future)),
}
} else {
break (Err(self.account), Either::Left(stop_future));
}
}
}
}
}
}
async fn initial_roster<F>(
mut self,
stop_future: F,
) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
where
F: std::future::Future<Output = ()> + Unpin,
{
self.state.data.counter += 1;
let id_init_roster = format!("id_init_roster{}", self.state.data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
info!("Quering roster... {}", String::from(&get_roster));
self.state.data.pending_ids.insert(
id_init_roster.clone(),
IqWait::new(60, InitRosterIqHandler {}),
);
let stop_future = {
let send_future = self.state.client.send_stanza(get_roster);
tokio::pin!(send_future);
match futures_util::future::select(stop_future, send_future).await {
Either::Left(((), _)) => None,
Either::Right((Ok(_), stop_future)) => Some(stop_future),
Either::Right((Err(e), f)) => {
error!("Send initial roster stanza error: {}", e);
return (Err(self.account), Either::Left(f));
}
}
};
if let Some(stop_future) = stop_future {
self.processing(
move |conn, _| Ok(conn.state.data.roster_init),
stop_future,
true,
)
.await
} else {
(Ok(self), Either::Right(()))
}
}
async fn self_presence<F>(
mut self,
stop_future: F,
) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
where
F: std::future::Future<Output = ()> + Unpin,
{
self.state.data.counter += 1;
let id_presence = format!("id_init_presence{}", self.state.data.counter);
let presence = stanzas::make_presence(id_presence, None, "Online!".to_string());
info!("Sending presence... {}", String::from(&presence));
let stop_future = {
let send_future = self.state.client.send_stanza(presence);
tokio::pin!(send_future);
match futures_util::future::select(stop_future, send_future).await {
Either::Left(((), _send_future)) => None,
Either::Right((Ok(_), stop_future)) => Some(stop_future),
Either::Right((Err(e), f)) => {
error!("Send self-presence stanza error: {}", e);
return (Err(self.account), Either::Left(f));
}
}
};
if let Some(stop_future) = stop_future {
self.processing(
move |conn, _| Ok(conn.state.data.self_presence),
stop_future,
true,
)
.await
} else {
(Ok(self), Either::Right(()))
}
}
pub fn process_command(mut self, cmd: XmppCommand) -> MaybeXmppConnection {
match cmd {
XmppCommand::Presence { show, message } => {
self.state.data.counter += 1;
let id_presence = format!("id_presence{}", self.state.data.counter);
self.state.data.send_queue.push_back(stanzas::make_presence(
id_presence,
Some(show),
message,
));
}
XmppCommand::Ping { opt_xmpp_to } => {
let ignorable = opt_xmpp_to.is_some();
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(
&id_ping,
self.state.client.bound_jid().clone(),
opt_xmpp_to.clone(),
);
self.state.data.send_queue.push_back(ping);
self.state
.data
.pending_ids
.insert(id_ping, IqWait::new(30, PingIqHandler { ignorable }));
if opt_xmpp_to.is_none() {
for domains in &self.state.data.used_domains {
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(
&id_ping,
self.state.client.bound_jid().clone(),
Some(xmpp_parsers::Jid::Bare(xmpp_parsers::BareJid::domain(
domains,
))),
);
self.state.data.send_queue.push_back(ping);
self.state
.data
.pending_ids
.insert(id_ping, IqWait::new(30, PingIqHandler { ignorable: true }));
}
for muc in self.state.data.error_mucs.drain() {
self.state.data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", self.state.data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
self.account.jid.clone(),
muc,
None,
None,
);
self.state.data.send_queue.push_back(muc_presence);
}
}
if let Some(xmpp_to) = opt_xmpp_to {
self.add_domain_to_ping(xmpp_to.into());
}
}
XmppCommand::TimeoutCleanup => {
let now = Instant::now();
let timeouted: Vec<String> = self
.state
.data
.pending_ids
.iter()
.filter_map(|(id, data)| {
if now >= data.sent + data.timeout {
warn!(
"Timeout for {} was {:?} expected {:?}",
id,
now - data.sent,
data.timeout
);
Some(id.to_string())
} else {
None
}
})
.collect();
let mut correct = true;
timeouted.into_iter().for_each(|id| {
if let Some(data) = self.state.data.pending_ids.remove(&id) {
correct &= data.handler.timeout(&mut self);
}
})
}
XmppCommand::Chat { xmpp_to, message } => {
self.state.data.counter += 1;
let id_send_message = format!("id_send_message{}", self.state.data.counter);
let bare_xmpp_to: xmpp_parsers::BareJid = xmpp_to.clone().into();
self.add_domain_to_ping(bare_xmpp_to.clone());
self.state
.data
.outgoing_mailbox
.entry(bare_xmpp_to.clone())
.or_default()
.push((message, xmpp_to, id_send_message));
self.process_jid(&bare_xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
XmppCommand::ChatroomPresence {
muc_id,
show,
message,
} => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state.data.counter += 1;
let id_presence = format!("id_presence{}", self.state.data.counter);
self.state
.data
.send_queue
.push_back(stanzas::make_muc_presence(
&id_presence,
self.account.jid.clone(),
muc.clone(),
Some(show),
Some(message),
));
} else {
error!("Not found MUC {}", muc_id);
}
}
}
self.into()
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::BareJid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = matches!(
rdata.0,
xmpp_parsers::roster::Subscription::To
| xmpp_parsers::roster::Subscription::Both
);
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.extend(mailbox.drain(..).map(|message| {
stanzas::make_chat_message(message.2, message.1, message.0)
}));
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state.data.counter += 1;
let id_ask_subscribe =
format!("id_ask_subscribe{}", self.state.data.counter);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(
id_ask_subscribe,
xmpp_to.clone().into(),
));
} else {
warn!(
"Unsupported subscription state {:?} to {}. Try to send messages",
rdata, xmpp_to
);
self.state
.data
.send_queue
.extend(mailbox.drain(..).map(|message| {
stanzas::make_chat_message(message.2, message.1, message.0)
}));
self.state.data.counter += 1;
let id_ask_subscribe =
format!("id_ask_subscribe{}", self.state.data.counter);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(
id_ask_subscribe,
xmpp_to.clone().into(),
));
}
let sub_from = matches!(
rdata.0,
xmpp_parsers::roster::Subscription::From
| xmpp_parsers::roster::Subscription::Both
);
if !sub_from {
info!("Not subscription from {}", xmpp_to);
self.state.data.counter += 1;
let id_allow_subscribe =
format!("id_allow_subscribe{}", self.state.data.counter);
self.state
.data
.send_queue
.push_back(stanzas::make_allow_subscribe(
id_allow_subscribe,
xmpp_to.clone().into(),
));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
self.state.data.pending_ids.insert(
id_add_roster,
IqWait::new(
60,
AddRosterIqHandler {
jid: xmpp_to.clone(),
},
),
);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
async fn enter_mucs<F>(
mut self,
mut stop_future: F,
) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
where
F: std::future::Future<Output = ()> + Unpin,
{
let mut domains: Vec<xmpp_parsers::BareJid> = vec![];
for chatroom in &self.account.chatrooms {
self.state.data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", self.state.data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
self.account.jid.clone(),
chatroom.1.clone(),
None,
None,
);
info!("Sending muc presence... {}", String::from(&muc_presence));
let opt_stop_future = {
let send_future = self.state.client.send_stanza(muc_presence);
tokio::pin!(send_future);
match futures_util::future::select(stop_future, send_future).await {
Either::Left(((), _)) => None,
Either::Right((Ok(_), stop_future)) => {
self.state
.data
.mucs
.insert(chatroom.0.clone(), chatroom.1.clone());
domains.push(chatroom.1.clone().into());
Some(stop_future)
}
Either::Right((Err(e), f)) => {
error!("Send muc presence stanza error: {}", e);
return (Err(self.account), Either::Left(f));
}
}
};
if let Some(f) = opt_stop_future {
stop_future = f;
} else {
return (Ok(self), Either::Right(()));
}
}
domains.into_iter().for_each(|d| self.add_domain_to_ping(d));
(Ok(self), Either::Left(stop_future))
}
async fn self_discovery<F>(
mut self,
mut stop_future: F,
) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
where
F: std::future::Future<Output = ()> + Unpin,
{
self.state.data.counter += 1;
let id_self_discovery = format!("id_self_discovery{}", self.state.data.counter);
let self_discovery = stanzas::make_disco_get(
id_self_discovery.clone(),
Some(self.account.jid.clone()),
Some(xmpp_parsers::BareJid::from(self.account.jid.clone()).into()),
);
self.state.data.pending_ids.insert(
id_self_discovery,
IqWait::new(60, SelfDiscoveryIqHandler {}),
);
info!(
"Sending self discovery... {}",
String::from(&self_discovery)
);
let opt_stop_future = {
let send_future = self.state.client.send_stanza(self_discovery);
tokio::pin!(send_future);
match futures_util::future::select(stop_future, send_future).await {
Either::Left(((), _)) => None,
Either::Right((Ok(_), stop_future)) => Some(stop_future),
Either::Right((Err(e), f)) => {
error!("Send self discovery error: {}", e);
return (Err(self.account), Either::Left(f));
}
}
};
if let Some(f) = opt_stop_future {
stop_future = f;
} else {
return (Ok(self), Either::Right(()));
}
(Ok(self), Either::Left(stop_future))
}
fn add_domain_to_ping(&mut self, jid: xmpp_parsers::BareJid) {
if jid.domain != (*self.account).jid.clone().domain() {
self.state.data.used_domains.insert(jid.domain);
}
}
}