use tokio_xmpp::{Client, Event, Packet};
use tokio::prelude::future::{self, Either};
use tokio::prelude::stream;
use tokio::prelude::{Future, Stream};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use super::stanzas;
use super::element_processor;
use crate::config;
#[derive(Debug)]
pub enum XmppCommand {
/// Send message to someone by jid
Chat {
xmpp_to: xmpp_parsers::Jid,
message: String,
},
/// Send message to MUC
Chatroom {
muc_id: String,
message: String,
},
// Send ping request to the server to test connection
Ping,
/// Check iq requests if some have expired timeouts
TimeoutCleanup,
}
/// trait of processing iq
/// each function consumes handlers and
/// returns false if connection should be reset
trait IqHandler {
/// process result
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool;
/// process error
fn error(
self: Box<Self>,
conn: &mut XmppConnection,
error: xmpp_parsers::stanza_error::StanzaError,
) -> bool;
/// process tmeout
fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
}
struct AddRosterIqHandler {
jid: xmpp_parsers::Jid,
}
impl IqHandler for AddRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
match opt_element {
Some(element) => {
warn!(
"Wrong payload when adding {} to roster: {}",
self.jid,
String::from(&element)
);
}
None => {
if conn.state.data.roster.contains_key(&self.jid) {
info!("Jid {} updated to roster", self.jid);
} else {
info!("Jid {} added in roster", self.jid);
conn.state.data.roster.insert(
self.jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
conn.process_jid(&self.jid);
}
}
true
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
true
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
true // ignore
}
}
struct PingIqHandler {}
impl IqHandler for PingIqHandler {
fn result(
self: Box<Self>,
_conn: &mut XmppConnection,
_opt_element: Option<xmpp_parsers::Element>,
) -> bool {
true // ignore
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
struct InitRosterIqHandler {}
impl IqHandler for InitRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
if let Some(result) = opt_element {
use std::convert::TryInto;
match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
Ok(roster) => {
conn.state.data.roster_init = true;
conn.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
conn.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
true
}
Err(e) => {
error!("Cann't parse roster: {}", e);
false
}
}
} else {
error!("No roster responded");
false
}
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
#[derive(Default)]
struct XmppData {
/// known roster data
roster: HashMap<
xmpp_parsers::Jid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
/// if roster was initialized
/// ToDo: remove it as it used only for initialization
roster_init: bool,
/// ids counter
counter: usize,
/// stanzas to send
send_queue: VecDeque<minidom::Element>,
/// outgoing mailbox
outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
/// muc id to muc jid
mucs: HashMap<String, xmpp_parsers::Jid>,
/// map from iq's id to handler of this type of iqs
pending_ids: HashMap<String, (Instant, Box<dyn IqHandler>)>,
}
struct XmppState {
client: Client,
data: XmppData,
}
pub struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
struct XmppElementProcessor {
incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
}
impl XmppElementProcessor {
fn new() -> XmppElementProcessor {
let mut incoming = element_processor::Processor::new(&|_, e| {
warn!("Unknown stanza {}", String::from(&e));
true
});
incoming.register(&XmppConnection::incoming_iq_processing);
XmppElementProcessor { incoming }
}
}
pub struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
/// connects if nothing connected
/// don't connect only if stop_future resolved
pub fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
info!("xmpp connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Box::new(future::ok(XmppConnection { account, state }))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
let stop_future4 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
}
impl XmppConnection {
/// base XMPP processing
/// Returns false on error to disconnect
fn xmpp_processing(&mut self, event: &Event) -> bool {
match event {
Event::Stanza(stanza) => {
let processors = XmppElementProcessor::new();
processors.incoming.process(self, stanza.clone())
}
Event::Online => true,
e => {
warn!("Unexpected event {:?}", e);
false
}
}
}
/// Enforce to answer to IQ "set"
fn incoming_iq_processing_set(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
use std::convert::TryInto;
if let Some(roster) =
element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
{
// RFC 6212 2.1.6. Roster Push
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
self.process_jid(&i.jid);
}
return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
}
warn!(
"Unsupported IQ set request from {:?}: {}",
from,
String::from(&element)
);
stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
}
/// Enforce to answer to IQ "get"
fn incoming_iq_processing_get(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
use std::convert::TryInto;
if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
}
warn!(
"Unsupported IQ get request from {:?}: {}",
from,
String::from(&element)
);
stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
}
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
match iq.payload {
xmpp_parsers::iq::IqType::Set(element) => {
let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Error(e) => {
if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
return handler.1.error(self, e);
}
error!("iq error: {:?}", e);
return false;
}
xmpp_parsers::iq::IqType::Get(element) => {
let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Result(opt_element) => {
if let Some((_, handler)) = self.state.data.pending_ids.remove_entry(&iq.id) {
return handler.1.result(self, opt_element);
}
warn!(
"Unwanted iq result id {} from {:?}: {:?}",
iq.id,
iq.from,
opt_element.map(|e| String::from(&e))
);
}
}
true
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved.
/// Return item if connection was preserved or error otherwise.
/// Second part is a state of stop_future
pub fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E> + 'static,
S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
T: 'static,
E: 'static,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
// ToDo: check timeouts if iqs
let XmppConnection {
state: XmppState { client, mut data },
account,
} = xmpp;
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {}", String::from(&send_element));
Box::new(
client
.send(Packet::Stanza(send_element))
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
},
b,
stop_condition,
))))
as Box<dyn Future<Item = _, Error = _>>
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
}),
) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
if xmpp.xmpp_processing(&event) {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => {
future::err((xmpp.account, Ok(Either::A(b))))
}
}
} else {
future::err((xmpp.account, Ok(Either::A(b))))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
future::err((account, Ok(Either::A(b))))
}
Err(Either::B((e, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
}),
)
}
},
)
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {}", String::from(&s));
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: 'static,
{
let XmppConnection {
account,
state: XmppState { client, mut data },
} = self;
use tokio::prelude::Sink;
data.counter += 1;
let id_init_roster = format!("id_init_roster{}", data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
let account2 = account.clone();
info!("Quering roster... {}", String::from(&get_roster));
data.pending_ids.insert(
id_init_roster.clone(),
(
Instant::now() + Duration::from_secs(60),
Box::new(InitRosterIqHandler {}),
),
);
client
.send(Packet::Stanza(get_roster))
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection {
account,
state: XmppState { client, data },
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {}", String::from(&presence));
client
.send(Packet::Stanza(presence))
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
Ok(presence) => {
Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
}
Err(e) => {
warn!("Not a self-presence: {}", e);
Ok(false)
}
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state.data.send_queue.extend(
mailbox.drain(..).map(|message| {
stanzas::make_chat_message(xmpp_to.clone(), message)
}),
);
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
}
let sub_from = match rdata.0 {
xmpp_parsers::roster::Subscription::From => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if !sub_from {
info!("Not subscription from {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
self.state.data.pending_ids.insert(
id_add_roster,
(
Instant::now() + Duration::from_secs(60),
Box::new(AddRosterIqHandler {
jid: xmpp_to.clone(),
}),
),
);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
pub fn process_command(&mut self, cmd: XmppCommand) {
info!("Got command");
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
self.state
.data
.outgoing_mailbox
.entry(xmpp_to.clone())
.or_default()
.push(message);
self.process_jid(&xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
XmppCommand::Ping => {
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
self.state.data.send_queue.push_back(ping);
self.state.data.pending_ids.insert(
id_ping,
(
Instant::now() + Duration::from_secs(30),
Box::new(PingIqHandler {}),
),
);
}
XmppCommand::TimeoutCleanup => {
let now = Instant::now();
let timeouted: Vec<String> = self
.state
.data
.pending_ids
.iter()
.filter_map(|(id, (timeout, _))| {
if now >= *timeout {
Some(id.to_string())
} else {
None
}
})
.collect();
let mut correct = true;
timeouted.into_iter().for_each(|id| {
if let Some((_, handler)) = self.state.data.pending_ids.remove(&id) {
correct &= handler.timeout(&mut self);
}
})
}
}
}
pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
info!("Shutdown connection");
let XmppConnection { account, state } = self;
stream::iter_ok(
state
.data
.mucs
.values()
.map(std::clone::Clone::clone)
.collect::<Vec<_>>(),
)
.fold(state, move |XmppState { client, data }, muc_jid| {
let muc_presence =
stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
info!(
"Sending muc leave presence... {}",
String::from(&muc_presence)
);
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
e
})
.and_then(|client| future::ok(XmppState { client, data }))
})
.map(|_| ())
}
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(account.chatrooms.clone())
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
account2.jid.clone(),
muc_jid.1.clone(),
);
info!("Sending muc presence... {}", String::from(&muc_presence));
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| {
data.mucs.insert(muc_jid.0, muc_jid.1);
future::ok(XmppState { client, data })
})
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}
use tokio_xmpp::{Client, Event, Packet};
use tokio::prelude::future::{self, Either};
use tokio::prelude::stream;
use tokio::prelude::{Future, Stream};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use super::stanzas;
use super::element_processor;
use crate::config;
#[derive(Debug)]
pub enum XmppCommand {
/// Send message to someone by jid
Chat {
xmpp_to: xmpp_parsers::Jid,
message: String,
},
/// Send message to MUC
Chatroom {
muc_id: String,
message: String,
},
// Send ping request to the server to test connection
Ping,
/// Check iq requests if some have expired timeouts
TimeoutCleanup,
}
/// trait of processing iq
/// each function consumes handlers and
/// returns false if connection should be reset
trait IqHandler {
/// process result
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool;
/// process error
fn error(
self: Box<Self>,
conn: &mut XmppConnection,
error: xmpp_parsers::stanza_error::StanzaError,
) -> bool;
/// process tmeout
fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
}
struct AddRosterIqHandler {
jid: xmpp_parsers::Jid,
}
impl IqHandler for AddRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
match opt_element {
Some(element) => {
warn!(
"Wrong payload when adding {} to roster: {}",
self.jid,
String::from(&element)
);
}
None => {
if conn.state.data.roster.contains_key(&self.jid) {
info!("Jid {} updated to roster", self.jid);
} else {
info!("Jid {} added in roster", self.jid);
conn.state.data.roster.insert(
self.jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
conn.process_jid(&self.jid);
}
}
true
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
true
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
true // ignore
}
}
struct PingIqHandler {}
impl IqHandler for PingIqHandler {
fn result(
self: Box<Self>,
_conn: &mut XmppConnection,
_opt_element: Option<xmpp_parsers::Element>,
) -> bool {
true // ignore
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
struct InitRosterIqHandler {}
impl IqHandler for InitRosterIqHandler {
fn result(
self: Box<Self>,
conn: &mut XmppConnection,
opt_element: Option<xmpp_parsers::Element>,
) -> bool {
if let Some(result) = opt_element {
use std::convert::TryInto;
match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
Ok(roster) => {
conn.state.data.roster_init = true;
conn.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
conn.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
true
}
Err(e) => {
error!("Cann't parse roster: {}", e);
false
}
}
} else {
error!("No roster responded");
false
}
}
fn error(
self: Box<Self>,
_conn: &mut XmppConnection,
_error: xmpp_parsers::stanza_error::StanzaError,
) -> bool {
false
}
fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
false
}
}
/// data for awaiting iq answer
struct IqWait {
/// time of sending iq
sent: Instant,
/// timeout
timeout: Duration,
/// handler for iq processing
handler: Box<dyn IqHandler>,
}
impl IqWait {
pub fn new<T: IqHandler + 'static>(timeout_secs: u64, handler: T) -> IqWait {
IqWait {
sent: Instant::now(),
timeout: Duration::from_secs(timeout_secs),
handler: Box::new(handler),
}
}
}
#[derive(Default)]
struct XmppData {
/// known roster data
roster: HashMap<
xmpp_parsers::Jid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
/// if roster was initialized
/// ToDo: remove it as it used only for initialization
roster_init: bool,
/// ids counter
counter: usize,
/// stanzas to send
send_queue: VecDeque<minidom::Element>,
/// outgoing mailbox
outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
/// muc id to muc jid
mucs: HashMap<String, xmpp_parsers::Jid>,
/// map from iq's id to handler of this type of iqs
pending_ids: HashMap<String, IqWait>,
}
struct XmppState {
client: Client,
data: XmppData,
}
pub struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
struct XmppElementProcessor {
incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
}
impl XmppElementProcessor {
fn new() -> XmppElementProcessor {
let mut incoming = element_processor::Processor::new(&|_, e| {
warn!("Unknown stanza {}", String::from(&e));
true
});
incoming.register(&XmppConnection::incoming_iq_processing);
XmppElementProcessor { incoming }
}
}
pub struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
/// connects if nothing connected
/// don't connect only if stop_future resolved
pub fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
info!("xmpp connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Box::new(future::ok(XmppConnection { account, state }))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
let stop_future4 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
}
impl XmppConnection {
/// base XMPP processing
/// Returns false on error to disconnect
fn xmpp_processing(&mut self, event: &Event) -> bool {
match event {
Event::Stanza(stanza) => {
let processors = XmppElementProcessor::new();
processors.incoming.process(self, stanza.clone())
}
Event::Online => true,
e => {
warn!("Unexpected event {:?}", e);
false
}
}
}
/// Enforce to answer to IQ "set"
fn incoming_iq_processing_set(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
use std::convert::TryInto;
if let Some(roster) =
element.clone().try_into().ok() as Option<xmpp_parsers::roster::Roster>
{
// RFC 6212 2.1.6. Roster Push
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
self.process_jid(&i.jid);
}
return stanzas::make_roster_push_answer(id, self.state.client.jid.clone(), from);
}
warn!(
"Unsupported IQ set request from {:?}: {}",
from,
String::from(&element)
);
stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
}
/// Enforce to answer to IQ "get"
fn incoming_iq_processing_get(
&mut self,
id: String,
from: Option<xmpp_parsers::Jid>,
element: minidom::Element,
) -> xmpp_parsers::iq::Iq {
use std::convert::TryInto;
if let Some(_ping) = element.clone().try_into().ok() as Option<xmpp_parsers::ping::Ping> {
return stanzas::make_pong(&id, self.state.client.jid.clone(), from);
}
warn!(
"Unsupported IQ get request from {:?}: {}",
from,
String::from(&element)
);
stanzas::make_iq_unsupported_error(id, self.state.client.jid.clone(), from)
}
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
match iq.payload {
xmpp_parsers::iq::IqType::Set(element) => {
let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Error(e) => {
if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
return data.handler.error(self, e);
}
error!("iq error: {:?}", e);
return false;
}
xmpp_parsers::iq::IqType::Get(element) => {
let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
self.state.data.send_queue.push_back(iq_answer.into());
}
xmpp_parsers::iq::IqType::Result(opt_element) => {
if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
let now = Instant::now();
if data.sent + data.timeout > now {
info!(
"Timeout for {} was {:?} expected {:?}",
iq.id,
now - data.sent,
data.timeout
);
return data.handler.result(self, opt_element);
} else {
warn!(
"Timeout for {} was {:?} expected {:?}",
iq.id,
now - data.sent,
data.timeout
);
return data.handler.timeout(self);
}
}
warn!(
"Unwanted iq result id {} from {:?}: {:?}",
iq.id,
iq.from,
opt_element.map(|e| String::from(&e))
);
}
}
true
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved.
/// Return item if connection was preserved or error otherwise.
/// Second part is a state of stop_future
pub fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E> + 'static,
S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
T: 'static,
E: 'static,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection {
state: XmppState { client, mut data },
account,
} = xmpp;
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {}", String::from(&send_element));
Box::new(
client
.send(Packet::Stanza(send_element))
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
},
b,
stop_condition,
))))
as Box<dyn Future<Item = _, Error = _>>
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
}),
) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
if xmpp.xmpp_processing(&event) {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => {
future::err((xmpp.account, Ok(Either::A(b))))
}
}
} else {
future::err((xmpp.account, Ok(Either::A(b))))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
future::err((account, Ok(Either::A(b))))
}
Err(Either::B((e, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
}),
)
}
},
)
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {}", String::from(&s));
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: 'static,
{
let XmppConnection {
account,
state: XmppState { client, mut data },
} = self;
use tokio::prelude::Sink;
data.counter += 1;
let id_init_roster = format!("id_init_roster{}", data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
let account2 = account.clone();
info!("Quering roster... {}", String::from(&get_roster));
data.pending_ids.insert(
id_init_roster.clone(),
IqWait::new(60, InitRosterIqHandler {}),
);
client
.send(Packet::Stanza(get_roster))
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(move |conn, _| Ok(conn.state.data.roster_init), stop_future)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection {
account,
state: XmppState { client, data },
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {}", String::from(&presence));
client
.send(Packet::Stanza(presence))
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
Ok(presence) => {
Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
}
Err(e) => {
warn!("Not a self-presence: {}", e);
Ok(false)
}
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state.data.send_queue.extend(
mailbox.drain(..).map(|message| {
stanzas::make_chat_message(xmpp_to.clone(), message)
}),
);
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
}
let sub_from = match rdata.0 {
xmpp_parsers::roster::Subscription::From => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if !sub_from {
info!("Not subscription from {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
self.state.data.pending_ids.insert(
id_add_roster,
IqWait::new(
60,
AddRosterIqHandler {
jid: xmpp_to.clone(),
},
),
);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
pub fn process_command(&mut self, cmd: XmppCommand) {
info!("Got command");
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
self.state
.data
.outgoing_mailbox
.entry(xmpp_to.clone())
.or_default()
.push(message);
self.process_jid(&xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
XmppCommand::Ping => {
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
self.state.data.send_queue.push_back(ping);
self.state
.data
.pending_ids
.insert(id_ping, IqWait::new(30, PingIqHandler {}));
}
XmppCommand::TimeoutCleanup => {
let now = Instant::now();
let timeouted: Vec<String> = self
.state
.data
.pending_ids
.iter()
.filter_map(|(id, data)| {
if now >= data.sent + data.timeout {
warn!(
"Timeout for {} was {:?} expected {:?}",
id,
now - data.sent,
data.timeout
);
Some(id.to_string())
} else {
None
}
})
.collect();
let mut correct = true;
timeouted.into_iter().for_each(|id| {
if let Some(data) = self.state.data.pending_ids.remove(&id) {
correct &= data.handler.timeout(&mut self);
}
})
}
}
}
pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
info!("Shutdown connection");
let XmppConnection { account, state } = self;
stream::iter_ok(
state
.data
.mucs
.values()
.map(std::clone::Clone::clone)
.collect::<Vec<_>>(),
)
.fold(state, move |XmppState { client, data }, muc_jid| {
let muc_presence =
stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
info!(
"Sending muc leave presence... {}",
String::from(&muc_presence)
);
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
e
})
.and_then(|client| future::ok(XmppState { client, data }))
})
.map(|_| ())
}
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(account.chatrooms.clone())
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
account2.jid.clone(),
muc_jid.1.clone(),
);
info!("Sending muc presence... {}", String::from(&muc_presence));
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| {
data.mucs.insert(muc_jid.0, muc_jid.1);
future::ok(XmppState { client, data })
})
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}