SYH7UQP6A62HRY2K6YR23X45TMFHJD7SW6J4T2MH4C3DJXKXVDZQC
AA2ZWGRLFD5XV3LYOFJU7ELWMVKGXTNUSH4BJNLXFWPSSTDZWO2AC
OB3HA2MD7TDBGGURKXNQKLLLC5FAAD5OX5BENF3M2XNU7OTJ5HHAC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC
BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC
5Y6YJ6UH55XJLPQ627ZGKTHW3IUFPFS5C4AYNCS6BX4BAWPLEZQAC
5A5UVGNMHEYI62XUENBEPSRJ7M3MBULYITRDOEYYA5ZDUMFBB6ZQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
HKSQO7JZEW6GXKPDJD4VJSYCJJBUDLKVN7SGUU5ZEMIVGII455RAC
WBU7UOQWY3J7KORUHQGXRXY5O76RCMNOP7CG5HA3RTZUTJJNTESQC
pub fn make_muc_presence(id: &str, from: xmpp_parsers::Jid, to: xmpp_parsers::Jid) -> Element {
let mut presence = Presence::new(PresenceType::None);
presence.from = Some(from);
presence.to = Some(to);
presence.id = Some(id.to_string());
presence.add_payload(xmpp_parsers::muc::Muc::new());
presence.into()
}
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let res_client = Client::new(&account.jid, &account.password);
match res_client {
Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
as Box<dyn Future<Item = _, Error = _>>,
Ok(client) => {
info!("xmpp initialized");
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.then(
|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
},
),
)
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
}
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&cmd.xmpp_to) {
info!("Jid {} in roster", cmd.xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", cmd.xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_chat_message(cmd.xmpp_to, cmd.message));
} else {
info!("Not subscribed to {}", cmd.xmpp_to);
rdata.1.push(cmd.message);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(cmd.xmpp_to));
}
} else {
info!("Jid {} not in roster", cmd.xmpp_to);
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_chat_message(xmpp_to, message));
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
rdata.2.push(message);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, cmd.xmpp_to.clone());
self.state
.data
.pending_add_roster_ids
.insert(id_add_roster, (cmd.xmpp_to, cmd.message));
info!("Adding jid to roster... {:?}", add_roster);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
self.state
.data
.pending_add_roster_ids
.insert(id_add_roster, (xmpp_to, message));
info!("Adding jid to roster... {:?}", add_roster);
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(
account
.chatrooms
.values()
.map(std::clone::Clone::clone)
.collect::<Vec<_>>(),
)
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence =
stanzas::make_muc_presence(&id_muc_presence, account2.jid.clone(), muc_jid);
info!("Sending muc presence... {:?}", muc_presence);
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(muc_presence)
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| future::ok(XmppState { client, data }))
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}
fn deserialize_jid<'de, D>(deserializer: D) -> Result<xmpp_parsers::Jid, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let s = String::deserialize(deserializer)?;
std::str::FromStr::from_str(&s).map_err(serde::de::Error::custom)