QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQC
WJNXI6Z4NH5YEBDS4F6FY5AI5GNXNITC4IJZVWOESTVLOV667AHQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC
ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC
UWY5EVZ6AGIMH3OGHECQPK2WNT3MCDIQZRFB4TSBFBAVGVVD3MRQC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
PVCRPP3BXTLRRT2VK2BCKFCRJXUOX3AEF2A6UCPOSZYEKQEMBWDQC
AYQZ2UIA3HDOJAXF7WZZBFBQGNBLB63PPCMXJYLGES4FVIPXITPQC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
IK3YDPTYYB4IQR3JFFXFLPGWTBG4HE4OCMKJ447RK72OYESHOMRQC
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
4LRBIGVT4GOFDT7EUBBRG7776IC6WVJYTB6NVMIBVWAFGISYGIUAC
HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
match stop_condition(&mut xmpp, event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
Err(e) => future::err((xmpp.account, Err(e))),
if stop_condition(&mut xmpp, &event) {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
} else {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(format_err!("Disconnected while online"))
}
}
}
fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
if let Event::Stanza(s) = event {
use try_from::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if let Some(id) = iq.id {
if id == ID_GET_ROSTER {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
Err(format_err!("Get error instead of roster"))
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into()
as Result<xmpp_parsers::roster::Roster, _>
{
Ok(roster) => {
info!("Got roster:");
for i in roster.items {
info!(" >>> {:?}", i);
self.state.roster.insert(i.jid, ());
}
Ok(true)
}
Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
}
}
_ => Err(format_err!("Unknown result of roster")),
}
} else {
Ok(false)
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
Box::new(future::loop_fn(
(self.inner, self.account),
|(client, account)| {
client.into_future().then(|r| match r {
Ok((event, client)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
account,
inner: client,
}))
}
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((client, account)))
}
_ => {
warn!("Disconnected");
future::err(account)
}
Err(_e) => Ok(false),
}
} else {
Err(format_err!("Wrong event while waiting roster"))
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
use tokio::prelude::Sink;
let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(get_roster)
.map_err(move |e| {
error!("Error on querying roster: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(XmppConnection::process_initial_roster, stop_future)
})
.then(|r| match r {
Err((account, e)) => {
error!(
"Cann't wait roster: {}",
e.err().map_or_else(
|| std::borrow::Cow::Borrowed("None"),
|e| e.to_string().into()
)
);
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
})
},
))
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
let XmppConnection { account, inner } = self;
let client = inner;
info!("Sending presence... {:?}", presence);
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
Err(format_err!("Wrong event while waiting self-presence"))
}
},
stop_future,
)
})
.then(|r| match r {
Err((account, _e)) => {
error!("Cann't wait self-presence");
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
} else {
future::ok(future::Loop::Continue((account, client)))
}
} else {
future::err("Got wrong event".to_owned())
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
S: stream::Stream<Item = XmppCommand> + 'static,
S: stream::Stream<Item = XmppCommand, Error = ()> + 'static,