BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC
CP4MZO6VZHSAL2ENE7MUYUS2BJFL6EIZSHFFMT66WPQD7KHOE5ZQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC
WJNXI6Z4NH5YEBDS4F6FY5AI5GNXNITC4IJZVWOESTVLOV667AHQC
HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQC
ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC
OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC
UWY5EVZ6AGIMH3OGHECQPK2WNT3MCDIQZRFB4TSBFBAVGVVD3MRQC
AYQZ2UIA3HDOJAXF7WZZBFBQGNBLB63PPCMXJYLGES4FVIPXITPQC
PVCRPP3BXTLRRT2VK2BCKFCRJXUOX3AEF2A6UCPOSZYEKQEMBWDQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
pub fn make_get_roster(id: &str) -> Element {
let mut get_roster = Iq::from_get(Roster {
ver: None,
items: vec![],
});
get_roster.id = Some(id.to_string());
get_roster.into()
}
.online()
.and_then(XmppConnection::self_presence)
.processing(XmppConnection::online, stop_future.clone())
.map_err(|(acc, _)| acc)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
})
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
if stop_condition(&mut xmpp, &event) {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
} else {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
match stop_condition(&mut xmpp, event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
/// returns error if something went wrong
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
Box::new(future::loop_fn(
(self.inner, self.account),
|(client, account)| {
client.into_future().then(|r| match r {
Ok((event, client)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
account,
inner: client,
}))
/// returns error if something went wrong and xmpp connection is broken
fn online(&mut self, event: Event) -> Result<bool, ()> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(())
}
}
}
fn process_initial_roster(&mut self, event: Event) -> Result<bool, ()> {
if let Event::Stanza(s) = event {
use try_from::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if let Some(id) = iq.id {
if id == ID_GET_ROSTER {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
error!("Get error instead of roster");
Err(())
}
xmpp_parsers::iq::IqType::Result(Some(result)) => {
match result.try_into()
as Result<xmpp_parsers::roster::Roster, _>
{
Ok(roster) => {
info!("Got roster:");
for i in roster.items {
info!(" >>> {:?}", i);
}
Ok(true)
}
Err(e) => {
error!("Cann't parse roster: {}", e);
Err(())
}
}
}
_ => {
error!("Unknown result of roster");
Err(())
}
}
} else {
Ok(false)
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((client, account)))
}
_ => {
warn!("Disconnected");
future::err(account)
}
},
Err((e, _)) => {
error!("xmpp receive error: {}", e);
future::err(account)
} else {
error!("Iq stanza without id");
Err(())
}
Err(_e) => Ok(false),
}
} else {
error!("Wrong event while waiting roster");
Err(())
}
}
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
{
let XmppConnection {
account,
state: XmppState { client, roster },
} = self;
use tokio::prelude::Sink;
let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(get_roster)
.map_err(move |e| {
error!("Error on querying roster: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, roster },
account,
}
.processing(XmppConnection::process_initial_roster, stop_future)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
let XmppConnection { account, inner } = self;
let client = inner;
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
state: XmppState { client, roster },
} = self;
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
info!("Sending presence... {:?}", presence);
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
account2
})
.and_then(move |client| {
XmppConnection {
state: XmppState { client, roster },
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
error!("Wrong event while waiting self-presence");
Err(())
}
},
stop_future,
)
.map_err(|(account, _)| account)
.and_then(|(conn, r)| match r {
Ok(Either::A(_)) => future::ok(conn),
Ok(Either::B(_)) => future::err(conn.account),
Err(_e) => future::err(conn.account),
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
} else {
future::ok(future::Loop::Continue((account, client)))
}
} else {
future::err("Got wrong event".to_owned())
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
})
.and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
if let Some(cmd_recv) = opt_cmd_recv {
// process left commands
info!("Stop accepting commands");
Box::new(
cmd_recv
.for_each(|_cmd| future::ok(()))
.map_err(|_| format_err!("cmd receiver last error")),
) as Box<Future<Item = (), Error = failure::Error>>
} else {
Box::new(future::ok(()))
}
})