4LRBIGVT4GOFDT7EUBBRG7776IC6WVJYTB6NVMIBVWAFGISYGIUAC
HOAZX2PBYRX3B7CPCOGROLLBEUJT44SYBRNYTJMAPFJSQRYDAKMQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
PFC7OJQFPAFGRAF4CDWVSFR5DERUNOBAIYOW2TKLROPL3VKKSBPQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
IK3YDPTYYB4IQR3JFFXFLPGWTBG4HE4OCMKJ447RK72OYESHOMRQC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
/// don't connect if stop_future resolved
fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
/// Error shoud be !
fn connect<E: 'static>(self) -> impl Future<Item = XmppConnection, Error = E> {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let res_client = Client::new(&account.jid, &account.password);
match res_client {
Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
as Box<Future<Item = _, Error = _>>,
Ok(client) => {
info!("xmpp initialized");
Box::new(future::loop_fn(account, |account| {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
// future to wait for online
Box::new(
XmppConnection {
inner: client,
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map(|(conn, _)| conn)
.map_err(|(acc, _)| acc)
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.then(
|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
},
),
)
}
}
})
.map_err(|_: ()| ()),
)
.then(|r| match r {
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
// future to wait for online
XmppConnection {
inner: client,
account,
}
.online()
.and_then(XmppConnection::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
}))
match stop_condition(&mut xmpp, event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
Err(e) => future::err((xmpp.account, Err(e))),
if stop_condition(&mut xmpp, &event) {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
} else {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(format_err!("Disconnected while online"))
}
}
}
fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
if let Event::Stanza(s) = event {
use try_from::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if let Some(id) = iq.id {
if id == ID_GET_ROSTER {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
Err(format_err!("Get error instead of roster"))
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into()
as Result<xmpp_parsers::roster::Roster, _>
{
Ok(roster) => {
info!("Got roster:");
for i in roster.items {
info!(" >>> {:?}", i);
}
Ok(true)
}
Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
}
}
_ => Err(format_err!("Unknown result of roster")),
}
} else {
Ok(false)
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
Box::new(future::loop_fn(
(self.inner, self.account),
|(client, account)| {
client.into_future().then(|r| match r {
Ok((event, client)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
account,
inner: client,
}))
}
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((client, account)))
}
_ => {
warn!("Disconnected");
future::err(account)
}
Err(_e) => Ok(false),
}
} else {
Err(format_err!("Wrong event while waiting roster"))
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
use tokio::prelude::Sink;
let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(get_roster)
.map_err(move |e| {
error!("Error on querying roster: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(XmppConnection::process_initial_roster, stop_future)
})
.then(|r| match r {
Err((account, e)) => {
error!(
"Cann't wait roster: {}",
e.err().map_or_else(
|| std::borrow::Cow::Borrowed("None"),
|e| e.to_string().into()
)
);
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
})
},
))
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
let XmppConnection { account, inner } = self;
let client = inner;
info!("Sending presence... {:?}", presence);
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
Err(format_err!("Wrong event while waiting self-presence"))
}
},
stop_future,
)
})
.then(|r| match r {
Err((account, _e)) => {
error!("Cann't wait self-presence");
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
} else {
future::ok(future::Loop::Continue((account, client)))
}
} else {
future::err("Got wrong event".to_owned())
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
Ok((cmd, cmd_recv, conn)) => {
if let Some(_cmd) = cmd {
info!("Got cmd");
// got cmd, continue
future::ok(future::Loop::Continue(XmppState::new(
cmd_recv,
signal,
conn.into(),
)))
} else {
future::ok(future::Loop::Break((None, conn.into())))
}
Ok(Either::A((_x, b))) => {
info!("Got signal");
// got signal, breaks
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
as Box<Future<Item = _, Error = _>>
}
Ok(Either::B((x, a))) => {
info!("Got cmd");
// got cmd, continue
Box::new(future::ok(future::Loop::Continue(XmppState::new(
(x.0).1,
a,
x.1.into(),
)))) as Box<Future<Item = _, Error = _>>
}
Err(Either::A((e, b))) => {
// got signal error, breaks
error!("Signal error: {}", e);
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1.into()))))
as Box<Future<Item = _, Error = _>>