AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
2L3JHRULRLBHT4K2VE4MHKDFX5RJXYUDTJJCLTUYH3DOOVCJT6WAC
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
FDHRCKH5OM4TONRBQZK5MNKAXTX56LZWVLXEW3QATI5KL4JZO3QQC
BTOZT4JPYXPTMXBSKQGXZ3K46UNACE65RIKAGPLUH3OIRVV7WBKAC
TDOR5XQUFRE2LRXML25IJ7A6CIZPFSRES7FQCXDBMIOV4JE2O6OAC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
5OBTKGDLRGZ4K4373CVGBF53LRECXQ7OW45PW43MJLBU622OKMTQC
EOHEZXX3TBKGJTOMXEPAPHLAJSATRXNY4AIL63BE4VG3HFNWARVQC
ZI4GJ72VMCBV6EQAO4PIKCGXV6ASUYOTGPRVSIGCKDBCDNRQNUFAC
inner: Client,
}
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
inner: Some(from.inner),
}
}
inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
XmppConnection {
inner: client,
account,
}
.online()
.and_then(XmppConnection::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
Self::online(client.split(), account)
.and_then(Self::self_presence)
.then(|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
impl XmppConnection {
/// base XMPP processing
fn xmpp_processing(&mut self, event: &Event) {}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved
fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, E>),
Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
>
where
F: Future<Item = T, Error = E>,
S: FnMut(&mut Self, &Event) -> bool,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection { inner, account } = xmpp;
inner.into_future().select2(stop_future).then(|r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
inner: client,
account,
};
xmpp.xmpp_processing(&event);
if stop_condition(&mut xmpp, &event) {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
} else {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
Err(Either::B((e, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
})
},
)
}
fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: std::rc::Rc<config::Account>,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
(self.inner, self.account),
|(client, account)| {
client.into_future().then(|r| match r {
Ok((event, client)) => match event {
(sink, stream, account),
|(sink, stream, account)| {
stream.into_future().then(|r| match r {
Ok((event, stream)) => match event {
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
let account2 = account.clone();
Box::new(
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.and_then(move |client| {
future::loop_fn((account2.clone(), client), |(account, client)| {
client
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, client)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from").map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(client))
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
Box::new(
sink.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.join(
future::loop_fn((account.clone(), stream), |(account, stream)| {
stream
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, stream)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from")
.map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(stream))
} else {
future::ok(future::Loop::Continue((
account, stream,
)))
}
}
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e))
})
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(XmppConnection { account, inner }),
}),
)
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e)),
)
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
}
Ok(inner) => future::ok(MaybeXmppConnection {
account,
inner: Some(inner),
}),
}),
)
} else {
warn!("Don't gen connection on self-presence");
Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
}
let result = ctrt.block_on(xmpp_process(
ctrl_c.clone().map(|_| ()),
recv,
config.account,
));
let result = ctrt.block_on(xmpp_process(ctrl_c.clone(), recv, config.account));