OFLAP2G2GH52BDQ7LXXF7P3IMTNW6MS4RQPW3DX2HX4GAYVW6FLAC
CCLGGFKRUNXBM6IOGEYJOFULCSVOL6C5PFAGCKEWNXOZIKPWLLWQC
RRLRZTMR3RQMZCMPIW5DJYAH5A6GXJMJ6ACOM2DPRYXQMQAGMPVAC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
OB3HA2MD7TDBGGURKXNQKLLLC5FAAD5OX5BENF3M2XNU7OTJ5HHAC
JD62RVOJGALHC4WMSGU3SZK4I2FO4LCYPRM7UD5PPXN7PNOHXDYAC
DCGEFPRCTSAZNMGCTU5ZMS7W5RVS73IN5LVWI6HDGNS2RIZKSA2QC
LL3D5CXKPWIGTQ7MFK4YXDGDZOKD6CU5WCIWV3FG6TPGQOGD75QAC
UO4WTU6UQCZOJ6VQLBXFL5NZFSEIRC3BILFXEMWOVVGL5U3XNJXQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
5GINRCKLDLAQ7J32MLENDUBXZTVSLGPJFARMHUPKSRXVBT75BG3QC
5Y6YJ6UH55XJLPQ627ZGKTHW3IUFPFS5C4AYNCS6BX4BAWPLEZQAC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
HKSQO7JZEW6GXKPDJD4VJSYCJJBUDLKVN7SGUU5ZEMIVGII455RAC
L3D22A5JKX3SBZQHULAKZDCEXAOYVPSU2CHPRE5G6MVVLTFVSZVAC
J7VX56FW5BPM77MEFTKK6HP2ZOWRXP6GFUTBTQKBO52EBGFTVXMQC
use tokio_xmpp::{Client, Event, Packet};
use tokio::prelude::future::{self, Either};
use tokio::prelude::stream;
use tokio::prelude::{Future, Stream};
use std::collections::{HashMap, VecDeque};
use super::XmppCommand;
use super::stanzas;
use super::element_processor;
use crate::config;
#[derive(Default)]
struct XmppData {
/// known roster data
roster: HashMap<
xmpp_parsers::Jid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
/// ids counter
counter: usize,
/// map from id of adding item to roster and jid of item
pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
/// stanzas to send
send_queue: VecDeque<minidom::Element>,
/// outgoing mailbox
outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
/// muc id to muc jid
mucs: HashMap<String, xmpp_parsers::Jid>,
}
struct XmppState {
client: Client,
data: XmppData,
}
pub struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
struct XmppElementProcessor {
incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
}
impl XmppElementProcessor {
fn new() -> XmppElementProcessor {
let mut incoming = element_processor::Processor::new(&|_, e| {
warn!("Unknown stanza {:#?}", e);
true
});
incoming.register(&XmppConnection::incoming_iq_processing);
XmppElementProcessor { incoming }
}
}
pub struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
/// connects if nothing connected
/// don't connect only if stop_future resolved
pub fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
info!("xmpp connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Box::new(future::ok(XmppConnection { account, state }))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
let stop_future4 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
}
impl XmppConnection {
/// base XMPP processing
/// Returns false on error to disconnect
fn xmpp_processing(&mut self, event: &Event) -> bool {
match event {
Event::Stanza(stanza) => {
let processors = XmppElementProcessor::new();
processors.incoming.process(self, stanza.clone())
}
Event::Online => true,
e => {
warn!("Unexpected event {:?}", e);
false
}
}
}
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
use std::convert::TryInto;
if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
if self.state.data.roster.contains_key(&jid) {
info!("Jid {} updated to roster", jid);
} else {
info!("Jid {} added in roster", jid);
self.state.data.roster.insert(
jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
self.process_jid(&jid);
} else {
warn!(
"Wrong payload when adding {} to roster: {:?}",
jid, iq.payload
);
}
}
match iq.payload {
xmpp_parsers::iq::IqType::Set(element) => {
if let Some(roster) =
element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
{
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
self.process_jid(&i.jid);
}
}
}
xmpp_parsers::iq::IqType::Error(e) => {
error!("iq error: {:?}", e);
return false;
}
xmpp_parsers::iq::IqType::Get(element) => {
if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
self.state.data.send_queue.push_back(pong);
}
}
_ => (), // ignore
}
true
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved.
/// Return item if connection was preserved or error otherwise.
/// Second part is a state of stop_future
pub fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E> + 'static,
S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
T: 'static,
E: 'static,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection {
state: XmppState { client, mut data },
account,
} = xmpp;
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {:?}", send_element);
Box::new(
client
.send(Packet::Stanza(send_element))
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
},
b,
stop_condition,
))))
as Box<dyn Future<Item = _, Error = _>>
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
}),
) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
if xmpp.xmpp_processing(&event) {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => {
future::err((xmpp.account, Ok(Either::A(b))))
}
}
} else {
future::err((xmpp.account, Ok(Either::A(b))))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
future::err((account, Ok(Either::A(b))))
}
Err(Either::B((e, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
}),
)
}
},
)
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if iq.id == id_init_roster {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
error!("Get error instead of roster");
Err(())
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
Ok(roster) => {
self.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
self.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
Ok(true)
}
Err(e) => {
error!("Cann't parse roster: {}", e);
Err(())
}
}
}
_ => {
error!("Unknown result of roster");
Err(())
}
}
} else {
Ok(false)
}
}
Err(_e) => Ok(false),
}
} else {
error!("Wrong event while waiting roster");
Err(())
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: 'static,
{
let XmppConnection {
account,
state: XmppState { client, mut data },
} = self;
use tokio::prelude::Sink;
data.counter += 1;
let id_init_roster = format!("id_init_roster{}", data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(Packet::Stanza(get_roster))
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| conn.process_initial_roster(event, &id_init_roster),
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection {
account,
state: XmppState { client, data },
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {:?}", presence);
client
.send(Packet::Stanza(presence))
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
Ok(presence) => {
Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
}
Err(e) => {
warn!("Not a self-presence: {}", e);
Ok(false)
}
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state.data.send_queue.extend(
mailbox.drain(..).map(|message| {
stanzas::make_chat_message(xmpp_to.clone(), message)
}),
);
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
}
let sub_from = match rdata.0 {
xmpp_parsers::roster::Subscription::From => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if !sub_from {
info!("Not subscription from {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
self.state
.data
.pending_add_roster_ids
.insert(id_add_roster, xmpp_to.clone());
info!("Adding jid to roster... {:?}", add_roster);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
pub fn process_command(&mut self, cmd: XmppCommand) {
info!("Got command");
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
self.state
.data
.outgoing_mailbox
.entry(xmpp_to.clone())
.or_default()
.push(message);
self.process_jid(&xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
XmppCommand::Ping => {
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
self.state.data.send_queue.push_back(ping);
}
}
}
pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
info!("Shutdown connection");
let XmppConnection { account, state } = self;
stream::iter_ok(
state
.data
.mucs
.values()
.map(std::clone::Clone::clone)
.collect::<Vec<_>>(),
)
.fold(state, move |XmppState { client, data }, muc_jid| {
let muc_presence =
stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
info!("Sending muc leave presence... {:?}", muc_presence);
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
e
})
.and_then(|client| future::ok(XmppState { client, data }))
})
.map(|_| ())
}
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(account.chatrooms.clone())
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
account2.jid.clone(),
muc_jid.1.clone(),
);
info!("Sending muc presence... {:?}", muc_presence);
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| {
data.mucs.insert(muc_jid.0, muc_jid.1);
future::ok(XmppState { client, data })
})
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}
use tokio_xmpp::{Client, Event, Packet};
use tokio::prelude::future::{self, Either};
use tokio::prelude::stream;
use tokio::prelude::{Future, Stream};
use std::collections::{HashMap, VecDeque};
use super::XmppCommand;
use super::stanzas;
use super::element_processor;
use crate::config;
#[derive(Default)]
struct XmppData {
/// known roster data
roster: HashMap<
xmpp_parsers::Jid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
/// ids counter
counter: usize,
/// map from id of adding item to roster and jid of item
pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
/// stanzas to send
send_queue: VecDeque<minidom::Element>,
/// outgoing mailbox
outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
/// muc id to muc jid
mucs: HashMap<String, xmpp_parsers::Jid>,
}
struct XmppState {
client: Client,
data: XmppData,
}
pub struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
struct XmppElementProcessor {
incoming: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element>,
}
impl XmppElementProcessor {
fn new() -> XmppElementProcessor {
let mut incoming = element_processor::Processor::new(&|_, e| {
warn!("Unknown stanza {:#?}", e);
true
});
incoming.register(&XmppConnection::incoming_iq_processing);
XmppElementProcessor { incoming }
}
}
pub struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
/// connects if nothing connected
/// don't connect only if stop_future resolved
pub fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
info!("xmpp connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Box::new(future::ok(XmppConnection { account, state }))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
let stop_future4 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
}
impl XmppConnection {
/// base XMPP processing
/// Returns false on error to disconnect
fn xmpp_processing(&mut self, event: &Event) -> bool {
match event {
Event::Stanza(stanza) => {
let processors = XmppElementProcessor::new();
processors.incoming.process(self, stanza.clone())
}
Event::Online => true,
e => {
warn!("Unexpected event {:?}", e);
false
}
}
}
fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
use std::convert::TryInto;
if let Some((_, jid)) = self.state.data.pending_add_roster_ids.remove_entry(&iq.id) {
if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
if self.state.data.roster.contains_key(&jid) {
info!("Jid {} updated to roster", jid);
} else {
info!("Jid {} added in roster", jid);
self.state.data.roster.insert(
jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
self.process_jid(&jid);
} else {
warn!(
"Wrong payload when adding {} to roster: {:?}",
jid, iq.payload
);
}
}
match iq.payload {
xmpp_parsers::iq::IqType::Set(element) => {
if let Some(roster) =
element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
{
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
self.process_jid(&i.jid);
}
}
}
xmpp_parsers::iq::IqType::Error(e) => {
error!("iq error: {:?}", e);
return false;
}
xmpp_parsers::iq::IqType::Get(element) => {
if let Some(_ping) = element.try_into().ok() as Option<xmpp_parsers::ping::Ping> {
let pong = stanzas::make_pong(&iq.id, self.state.client.jid.clone(), iq.from);
self.state.data.send_queue.push_back(pong);
}
}
_ => (), // ignore
}
true
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved.
/// Return item if connection was preserved or error otherwise.
/// Second part is a state of stop_future
pub fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E> + 'static,
S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
T: 'static,
E: 'static,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection {
state: XmppState { client, mut data },
account,
} = xmpp;
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {:?}", send_element);
Box::new(
client
.send(Packet::Stanza(send_element))
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
},
b,
stop_condition,
))))
as Box<dyn Future<Item = _, Error = _>>
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
}),
) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
if xmpp.xmpp_processing(&event) {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => {
future::err((xmpp.account, Ok(Either::A(b))))
}
}
} else {
future::err((xmpp.account, Ok(Either::A(b))))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
future::err((account, Ok(Either::A(b))))
}
Err(Either::B((e, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
}),
)
}
},
)
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if iq.id == id_init_roster {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
error!("Get error instead of roster");
Err(())
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
Ok(roster) => {
self.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
self.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
Ok(true)
}
Err(e) => {
error!("Cann't parse roster: {}", e);
Err(())
}
}
}
_ => {
error!("Unknown result of roster");
Err(())
}
}
} else {
Ok(false)
}
}
Err(_e) => Ok(false),
}
} else {
error!("Wrong event while waiting roster");
Err(())
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: 'static,
{
let XmppConnection {
account,
state: XmppState { client, mut data },
} = self;
use tokio::prelude::Sink;
data.counter += 1;
let id_init_roster = format!("id_init_roster{}", data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(Packet::Stanza(get_roster))
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| conn.process_initial_roster(event, &id_init_roster),
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection {
account,
state: XmppState { client, data },
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {:?}", presence);
client
.send(Packet::Stanza(presence))
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
use std::convert::TryInto;
match s.try_into() as Result<xmpp_parsers::presence::Presence, _> {
Ok(presence) => {
Ok(presence.from.as_ref() == Some(&conn.state.client.jid))
}
Err(e) => {
warn!("Not a self-presence: {}", e);
Ok(false)
}
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state.data.send_queue.extend(
mailbox.drain(..).map(|message| {
stanzas::make_chat_message(xmpp_to.clone(), message)
}),
);
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
}
let sub_from = match rdata.0 {
xmpp_parsers::roster::Subscription::From => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if !sub_from {
info!("Not subscription from {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_allow_subscribe(xmpp_to.clone()));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
self.state
.data
.pending_add_roster_ids
.insert(id_add_roster, xmpp_to.clone());
info!("Adding jid to roster... {:?}", add_roster);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
pub fn process_command(&mut self, cmd: XmppCommand) {
info!("Got command");
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
self.state
.data
.outgoing_mailbox
.entry(xmpp_to.clone())
.or_default()
.push(message);
self.process_jid(&xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
XmppCommand::Ping => {
self.state.data.counter += 1;
let id_ping = format!("id_ping{}", self.state.data.counter);
let ping = stanzas::make_ping(&id_ping, self.state.client.jid.clone());
self.state.data.send_queue.push_back(ping);
}
}
}
pub fn shutdown(self) -> impl Future<Item = (), Error = failure::Error> {
info!("Shutdown connection");
let XmppConnection { account, state } = self;
stream::iter_ok(
state
.data
.mucs
.values()
.map(std::clone::Clone::clone)
.collect::<Vec<_>>(),
)
.fold(state, move |XmppState { client, data }, muc_jid| {
let muc_presence =
stanzas::make_muc_presence_leave(account.jid.clone(), muc_jid.clone());
info!("Sending muc leave presence... {:?}", muc_presence);
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
e
})
.and_then(|client| future::ok(XmppState { client, data }))
})
.map(|_| ())
}
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(account.chatrooms.clone())
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
account2.jid.clone(),
muc_jid.1.clone(),
);
info!("Sending muc presence... {:?}", muc_presence);
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| {
data.mucs.insert(muc_jid.0, muc_jid.1);
future::ok(XmppState { client, data })
})
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}
Iq::from_set(
id,
Roster {
items: vec![Item {
jid,
name: None,
subscription: xmpp_parsers::roster::Subscription::None,
ask: xmpp_parsers::roster::Ask::None,
groups: vec![],
}],
ver: None,
},
)
.into()
let mut add_roster = Iq::from_set(Roster {
items: vec![Item {
jid,
name: None,
subscription: xmpp_parsers::roster::Subscription::None,
ask: xmpp_parsers::roster::Ask::None,
groups: vec![],
}],
ver: None,
});
add_roster.id = Some(id.to_string());
add_roster.into()
}
pub fn make_muc_presence_leave(from: xmpp_parsers::Jid, to: xmpp_parsers::Jid) -> Element {
let mut presence = Presence::new(PresenceType::Unavailable);
presence.from = Some(from);
presence.to = Some(to);
presence.into()
pub fn make_ping(id: &str, from: xmpp_parsers::Jid) -> Element {
let mut ping = Iq::from_get(id, Ping);
ping.to = Some(from.clone().into_domain_jid());
ping.from = Some(from);
ping.into()
}
pub fn make_pong(id: &str, from: xmpp_parsers::Jid, to: Option<xmpp_parsers::Jid>) -> Element {
let mut pong = Iq::from_result(id, None as Option<Roster>);
pong.from = Some(from);
pong.to = to;
pong.into()
}
#[derive(Default)]
struct XmppData {
/// known roster data
roster: HashMap<
xmpp_parsers::Jid,
(
xmpp_parsers::roster::Subscription,
xmpp_parsers::roster::Ask,
),
>,
/// ids counter
counter: usize,
/// map from id of adding item to roster and jid of item
pending_add_roster_ids: HashMap<String, xmpp_parsers::Jid>,
/// stanzas to send
send_queue: VecDeque<minidom::Element>,
/// outgoing mailbox
outgoing_mailbox: HashMap<xmpp_parsers::Jid, Vec<String>>,
/// muc id to muc jid
mucs: HashMap<String, xmpp_parsers::Jid>,
}
struct XmppState {
client: Client,
data: XmppData,
}
struct MaybeXmppConnection {
account: std::rc::Rc<config::Account>,
state: Option<XmppState>,
}
struct XmppConnection {
account: std::rc::Rc<config::Account>,
state: XmppState,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
state: Some(from.state),
}
}
}
impl From<config::Account> for MaybeXmppConnection {
fn from(from: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
account: std::rc::Rc::new(from),
state: None,
}
}
}
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
/// connects if nothing connected
/// don't connect only if stop_future resolved
fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
info!("xmpp connection...");
let MaybeXmppConnection { account, state } = self;
if let Some(state) = state {
Box::new(future::ok(XmppConnection { account, state }))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let client =
Client::new_with_jid(account.jid.clone(), &account.password);
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
let stop_future4 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
state: XmppState {
client,
data: std::default::Default::default(),
},
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.and_then(|conn| conn.enter_mucs(stop_future4))
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
}),
)
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
}
}
}
impl XmppConnection {
/// base XMPP processing
fn xmpp_processing(
mut self,
event: &Event,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
match event {
Event::Stanza(stanza) => {
info!("Incoming xmpp event: {:?}", stanza);
let stanza = stanza.clone();
use try_from::TryInto;
if let Some(iq) = stanza.try_into().ok() as Option<xmpp_parsers::iq::Iq> {
if let Some(id) = iq.id {
if let Some((_, jid)) =
self.state.data.pending_add_roster_ids.remove_entry(&id)
{
if let xmpp_parsers::iq::IqType::Result(None) = iq.payload {
if self.state.data.roster.contains_key(&jid) {
info!("Jid {} updated to roster", jid);
} else {
info!("Jid {} added in roster", jid);
self.state.data.roster.insert(
jid.clone(),
(
xmpp_parsers::roster::Subscription::None,
xmpp_parsers::roster::Ask::None,
),
);
}
self.process_jid(&jid);
} else {
warn!(
"Wrong payload when adding {} to roster: {:?}",
jid, iq.payload
);
}
}
}
if let xmpp_parsers::iq::IqType::Set(element) = iq.payload {
if let Some(roster) =
element.try_into().ok() as Option<xmpp_parsers::roster::Roster>
{
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid)
{
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
rdata.1 = i.ask;
} else {
info!("Add {} to roster", i.jid);
self.state
.data
.roster
.insert(i.jid.clone(), (i.subscription, i.ask));
}
self.process_jid(&i.jid);
}
}
}
}
future::ok(self)
}
Event::Online => future::ok(self),
e => {
warn!("Unexpected event {:?}", e);
future::err(self.account)
}
}
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved.
/// Return item if connection was preserved or error otherwise.
/// Second part is a state of stop_future
fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E> + 'static,
S: FnMut(&mut Self, Event) -> Result<bool, ()> + 'static,
T: 'static,
E: 'static,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection {
state: XmppState { client, mut data },
account,
} = xmpp;
mod xmpp_connection;
use xmpp_connection::MaybeXmppConnection;
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {:?}", send_element);
Box::new(
client
.send(Packet::Stanza(send_element))
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
},
b,
stop_condition,
))))
as Box<dyn Future<Item = _, Error = _>>
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
}),
) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
Ok(mut xmpp) => {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => {
future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
)))
}
Err(_e) => future::err((
xmpp.account,
Ok(Either::A(b)),
)),
}
}
Err(account) => {
future::err((account, Ok(Either::A(b))))
}
}))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(future::err((account, Ok(Either::A(b)))))
}
}
Ok(Either::B((t, a))) => {
Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
})
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => {
Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
})
}
}),
)
}
},
)
}
/// get connection and wait for online status and set presence
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn process_initial_roster(&mut self, event: Event, id_init_roster: &str) -> Result<bool, ()> {
if let Event::Stanza(s) = event {
use try_from::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if let Some(id) = iq.id {
if id == id_init_roster {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
error!("Get error instead of roster");
Err(())
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into()
as Result<xmpp_parsers::roster::Roster, _>
{
Ok(roster) => {
self.state.data.roster.clear();
info!("Got first roster:");
for i in roster.items {
info!(" >>> {:?}", i);
self.state
.data
.roster
.insert(i.jid, (i.subscription, i.ask));
}
Ok(true)
}
Err(e) => {
error!("Cann't parse roster: {}", e);
Err(())
}
}
}
_ => {
error!("Unknown result of roster");
Err(())
}
}
} else {
Ok(false)
}
} else {
error!("Iq stanza without id");
Err(())
}
}
Err(_e) => Ok(false),
}
} else {
error!("Wrong event while waiting roster");
Err(())
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: 'static,
{
let XmppConnection {
account,
state: XmppState { client, mut data },
} = self;
use tokio::prelude::Sink;
data.counter += 1;
let id_init_roster = format!("id_init_roster{}", data.counter);
let get_roster = stanzas::make_get_roster(&id_init_roster);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(Packet::Stanza(get_roster))
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| conn.process_initial_roster(event, &id_init_roster),
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection {
account,
state: XmppState { client, data },
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {:?}", presence);
client
.send(Packet::Stanza(presence))
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, data },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
})
}
fn process_jid(&mut self, xmpp_to: &xmpp_parsers::Jid) {
if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
if !mailbox.is_empty() {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
info!("Jid {} in roster", xmpp_to);
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", xmpp_to);
self.state.data.send_queue.extend(
mailbox.drain(..).map(|message| {
stanzas::make_chat_message(xmpp_to.clone(), message)
}),
);
} else if rdata.1 == xmpp_parsers::roster::Ask::None {
info!("Not subscribed to {}", xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(xmpp_to.clone()));
}
} else {
info!("Jid {} not in roster", xmpp_to);
self.state.data.counter += 1;
let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
self.state
.data
.pending_add_roster_ids
.insert(id_add_roster, xmpp_to.clone());
info!("Adding jid to roster... {:?}", add_roster);
self.state.data.send_queue.push_back(add_roster);
}
}
}
}
fn process_command(&mut self, cmd: XmppCommand) {
info!("Got command");
match cmd {
XmppCommand::Chat { xmpp_to, message } => {
self.state
.data
.outgoing_mailbox
.entry(xmpp_to.clone())
.or_default()
.push(message);
self.process_jid(&xmpp_to);
}
XmppCommand::Chatroom { muc_id, message } => {
if let Some(muc) = self.state.data.mucs.get(&muc_id) {
self.state
.data
.send_queue
.push_back(stanzas::make_muc_message(muc.clone(), message));
} else {
error!("Not found MUC {}", muc_id);
}
}
}
}
fn enter_mucs<F, E>(
self,
_stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E> + 'static,
E: Into<failure::Error> + 'static,
{
let XmppConnection { account, state } = self;
let account2 = account.clone();
let account3 = account.clone();
stream::iter_ok(account.chatrooms.clone())
.fold(state, move |XmppState { client, mut data }, muc_jid| {
data.counter += 1;
let id_muc_presence = format!("id_muc_presence{}", data.counter);
let muc_presence = stanzas::make_muc_presence(
&id_muc_presence,
account2.jid.clone(),
muc_jid.1.clone(),
);
info!("Sending muc presence... {:?}", muc_presence);
let account4 = account2.clone();
use tokio::prelude::Sink;
client
.send(Packet::Stanza(muc_presence))
.map_err(|e| {
error!("Error on send muc presence: {}", e);
account4
})
.and_then(|client| {
data.mucs.insert(muc_jid.0, muc_jid.1);
future::ok(XmppState { client, data })
})
})
.map(|state| XmppConnection {
account: account3,
state,
})
}
}
type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
processors: Vec<Box<Func<S, Option<T>, E>>>,
default: &'static Func<S, T, E>,
}
impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
pub fn new<F>(f: &'static F) -> Processor<S, T, E>
where
F: Fn(&mut S, E) -> T + 'static,
{
Processor {
processors: vec![],
default: f,
}
}
pub fn register<F, A>(&mut self, f: &'static F)
where
F: Fn(&mut S, A) -> T + 'static,
A: std::convert::TryFrom<E>,
{
self.processors.push(Box::new(move |s, e: E| {
use std::convert::TryInto;
(e.try_into().ok() as Option<A>).map(|a| f(s, a))
}));
}
pub fn process(&self, s: &mut S, e: E) -> T {
for processor in self.processors.iter() {
match processor(s, e.clone()) {
Some(t) => return t,
None => continue,
}
}
(*self.default)(s, e)
}
}
type Func<S, T, E> = dyn Fn(&mut S, E) -> T;
pub struct Processor<S: 'static, T: 'static, E: Clone + 'static> {
processors: Vec<Box<Func<S, Option<T>, E>>>,
default: &'static Func<S, T, E>,
}
impl<S: 'static, T: 'static, E: Clone + 'static> Processor<S, T, E> {
pub fn new<F>(f: &'static F) -> Processor<S, T, E>
where
F: Fn(&mut S, E) -> T + 'static,
{
Processor {
processors: vec![],
default: f,
}
}
pub fn register<F, A>(&mut self, f: &'static F)
where
F: Fn(&mut S, A) -> T + 'static,
A: std::convert::TryFrom<E>,
{
self.processors.push(Box::new(move |s, e: E| {
use std::convert::TryInto;
(e.try_into().ok() as Option<A>).map(|a| f(s, a))
}));
}
pub fn process(&self, s: &mut S, e: E) -> T {
for processor in self.processors.iter() {
match processor(s, e.clone()) {
Some(t) => return t,
None => continue,
}
}
(*self.default)(s, e)
}
}
.fold(String::new(), |mut acc, ch| {
std::str::from_utf8(&*ch).map(|s| {
acc.push_str(s);
acc
})
.fold(Vec::new(), |mut acc, ch| {
acc.extend_from_slice(&*ch);
future::ok::<_, failure::Error>(acc)
})
.and_then(|acc| {
std::str::from_utf8(&acc)
.map(std::string::ToString::to_string)
.map_err(std::convert::Into::into)
if let Some(ping) = config.account.ping {
let ping = tokio::timer::Interval::new_interval(std::time::Duration::from_secs(ping));
rt.spawn(
ping.map_err(|e| {
error!("Ping error: {}", e);
})
.for_each(move |_| {
cmd_send
.clone()
.send(XmppCommand::Ping)
.map_err(|e| {
error!("Ping command error: {}", e);
})
.map(|_| ())
})
.select(
ctrl_c
.clone()
.map(|_| ())
.map_err(|e| error!("ping server error: {}", e)),
)
.map(|_| ())
.map_err(|_| ()),
);
}
}
fn deserialize_jid_map<'de, D>(
deserializer: D,
) -> Result<HashMap<String, xmpp_parsers::Jid>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let s = HashMap::<String, String>::deserialize(deserializer)?;
let size = s.len();
s.into_iter()
.map(|(k, v)| (k, std::str::FromStr::from_str(&v)))
.take_while(|(_k, r)| r.is_ok())
.fold(Ok(HashMap::with_capacity(size)), |res, (k, r)| match res {
Ok(mut res) => match r {
Ok(v) => {
res.insert(k, v);
Ok(res)
}
Err(e) => Err(e),
},
Err(e) => Err(e),
})
.map_err(serde::de::Error::custom)
try_from = "=0.3.2" # xmpp-parsers
"cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-demangle 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.35 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.30 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"textwrap 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.27 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl-sys 0.9.43 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl-sys 0.9.42 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.35 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.30 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"xmpp-parsers 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)",
"toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"try_from 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"xmpp-parsers 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.27 (registry+https://github.com/rust-lang/crates.io-index)",
"arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"arc-swap 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.27 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "xmpp-parsers"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"blake2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jid 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"minidom 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"checksum aho-corasick 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e6f484ae0c99fec2e858eb6134949117399f222608d84cadb3f58c1f97c2364c"
"checksum aho-corasick 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "81ce3d38065e618af2d7b77e10c5ad9a069859b4be3c2250f674af3840d9c8a5"
"checksum arc-swap 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)" = "a57a5698f85c6fd92f19dad87ff2d822fc4ba79dd85c13914d8c4dad589cb815"
"checksum arc-swap 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1025aeae2b664ca0ea726a89d574fe8f4e77dd712d443236ad1de00379450cf6"
"checksum backtrace 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "f106c02a3604afcdc0df5d36cc47b44b55917dbaf3d808f71c163a0ddba64637"
"checksum backtrace 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "cd5a90e2b463010cd0e0ce9a11d4a9d5d58d9f41d4a6ba3dcaf9e68b466e88b4"
"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
"checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa"
"checksum cc 1.0.35 (registry+https://github.com/rust-lang/crates.io-index)" = "5e5f3fee5eeb60324c2781f1e41286bdee933850fff9b3c672587fed5ec58c83"
"checksum cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "11d43355396e872eefb45ce6342e4374ed7bc2b3a502d1b28e36d6e23c05d1f4"
"checksum cc 1.0.30 (registry+https://github.com/rust-lang/crates.io-index)" = "d01c69d08ff207f231f07196e30f84c70f1c815b04f980f8b7b01ff01f05eb92"
"checksum cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "082bb9b28e00d3c9d39cc03e64ce4cea0f1bb9b3fde493f0cbc008472d22bdf4"
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b61fa891024a945da30a9581546e8cfaf5602c7b3f4c137a2805cf388f92075a"
"checksum env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "afb070faf94c85d17d50ca44f6ad076bce18ae92f0037d350947240a36e9d42e"
"checksum futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "62941eff9507c8177d448bd83a44d9b9760856e184081d8cd79ba9f03dd24981"
"checksum futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)" = "49e7653e374fe0d0c12de4250f0bdb60680b8c80eed558c5c7538eec9c89e21b"
"checksum h2 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "85ab6286db06040ddefb71641b50017c06874614001a134b423783e2db2920bd"
"checksum h2 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "ddb2b25a33e231484694267af28fec74ac63b5ccf51ee2065a5e313b834d836e"
"checksum http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "eed324f0f0daf6ec10c474f150505af2c143f251722bf9dbd1261bd1f2ee2c1a"
"checksum http 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fe67e3678f2827030e89cc4b9e7ecd16d52f132c0b940ab5005f88e821500f6a"
"checksum hyper 0.12.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4f2777434f26af6e4ce4fdcdccd3bed9d861d11e87bcbe72c0f51ddaca8ff848"
"checksum hyper 0.12.25 (registry+https://github.com/rust-lang/crates.io-index)" = "7d5b6658b016965ae301fa995306db965c93677880ea70765a84235a96eae896"
"checksum libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)" = "bedcc7a809076656486ffe045abeeac163da1b558e963a31e29fbfbeba916917"
"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83"
"checksum libc 0.2.49 (registry+https://github.com/rust-lang/crates.io-index)" = "413f3dfc802c5dc91dc570b05125b6cda9855edfaa9825c9849807876376e70e"
"checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939"
"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
"checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21"
"checksum openssl 0.10.20 (registry+https://github.com/rust-lang/crates.io-index)" = "5a0d6b781aac4ac1bd6cafe2a2f0ad8c16ae8e1dd5184822a16c50139f8838d9"
"checksum openssl 0.10.19 (registry+https://github.com/rust-lang/crates.io-index)" = "84321fb9004c3bce5611188a644d6171f895fa2889d155927d528782edb21c5d"
"checksum openssl-sys 0.9.43 (registry+https://github.com/rust-lang/crates.io-index)" = "33c86834957dd5b915623e94f2f4ab2c70dd8f6b70679824155d5ae21dbd495d"
"checksum openssl-sys 0.9.42 (registry+https://github.com/rust-lang/crates.io-index)" = "cb534d752bf98cf363b473950659ac2546517f9c6be9723771614ab3f03bbc9e"
"checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db"
"checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1"
"checksum rand_os 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
"checksum rand_os 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b7c690732391ae0abafced5015ffb53656abfaec61b342290e5eb56b286a679d"
"checksum redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)" = "12229c14a0f65c4f1cb046a3b52047cdd9da1f4b30f8a39c5063c8bae515e252"
"checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85"
"checksum regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "559008764a17de49a3146b234641644ed37d118d1ef641a0bb573d146edc6ce0"
"checksum regex-syntax 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfd8681eebe297b81d98498869d4aae052137651ad7b96822f09ceb690d0a96"
"checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f"
"checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861"
"checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288"
"checksum rustc-demangle 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "adacaae16d02b6ec37fdc7acfcddf365978de76d1983d3ee22afc260e1ca9619"
"checksum serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)" = "aa5f7c20820475babd2c077c3ab5f8c77a31c15e16ea38687b4c02d3e48680f4"
"checksum serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)" = "58fc82bec244f168b23d1963b45c8bf5726e9a15a9d146a067f9081aeed2de79"
"checksum serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)" = "92514fb95f900c9b5126e32d020f5c6d40564c27a5ea6d1d7d9f157a96623560"
"checksum serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6eabf4b5914e88e24eea240bb7c9f9a2cbc1bbbe8d961d381975ec3c6b806c"
"checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550"
"checksum syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)" = "66c8865bf5a7cbb662d8b011950060b3c8743dca141b054bf7195b20d314d8e2"
"checksum syn 0.15.27 (registry+https://github.com/rust-lang/crates.io-index)" = "525bd55255f03c816e5d7f615587bd13030c7103354fadb104993dcee6a788ec"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
"checksum textwrap 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "307686869c93e71f94da64286f9a9524c0f308a9e1c87a583de8e9c9039ad3f6"
"checksum tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "65641e515a437b308ab131a82ce3042ff9795bef5d6c5a9be4eb24195c417fd9"
"checksum tokio 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "fcaabb3cec70485d0df6e9454fe514393ad1c4070dee8915f11041e95630b230"
"checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443"
"checksum tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "83ea44c6c0773cc034771693711c35c677b4b5a4b21b9e7071704c54de7d555e"
"checksum tokio-current-thread 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "c756b04680eea21902a46fca4e9f410a2332c04995af590e07ff262e2193a9a3"
"checksum tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "30c6dbf2d1ad1de300b393910e8a3aa272b724a400b6531da03eed99e329fbf0"
"checksum tokio-sync 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fda385df506bf7546e70872767f71e81640f1f251bdf2fd8eb81a0eaec5fe022"
"checksum tokio-sync 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1bf2b9dac2a0509b5cfd1df5aa25eafacb616a42a491a13604d6bbeab4486363"
"checksum tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ec5759cf26cf9659555f36c431b515e3d05f66831741c85b4b5d5dfb9cf1323c"
"checksum tokio-threadpool 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "742e511f6ce2298aeb86fc9ea0d8df81c2388c6ebae3dc8a7316e8c9df0df801"
"checksum toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "87c5890a989fa47ecdc7bcb4c63a77a82c18f306714104b1decfd722db17b39e"
"checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f"
"checksum winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f10e386af2b13e47c89e7236a7a14a086791a2b88ebad6df9bf42040195cf770"
"checksum winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "92c1eb33641e276cfa214a0522acad57be5c56b10cb348b3c5117db75f3ac4b0"