AYQZ2UIA3HDOJAXF7WZZBFBQGNBLB63PPCMXJYLGES4FVIPXITPQC
V5HDBSZM7GHPKTQAVSOXVF6UTSQTXSU3SJFWL7CIBFNBLJ4Q4V6QC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
HOAZX2PBYRX3B7CPCOGROLLBEUJT44SYBRNYTJMAPFJSQRYDAKMQC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
X6L47BHQPIWFWO53QU6GMRYZC7VPFR4AFTJV7MZIJWIMZG5L65WQC
OGMBXBKPWAX4A4CY2LJITNKUYHWBFE4EVBHI4NRZO5VNZ5KXM2VQC
IK3YDPTYYB4IQR3JFFXFLPGWTBG4HE4OCMKJ447RK72OYESHOMRQC
AGIW6YR3J5M3PIUP3UKFMBQT2VAK6CHDDSIMK4A3KJEXH5SKQEQAC
PVCRPP3BXTLRRT2VK2BCKFCRJXUOX3AEF2A6UCPOSZYEKQEMBWDQC
4LRBIGVT4GOFDT7EUBBRG7776IC6WVJYTB6NVMIBVWAFGISYGIUAC
PFC7OJQFPAFGRAF4CDWVSFR5DERUNOBAIYOW2TKLROPL3VKKSBPQC
NDDQQP2PSYH5YNU5EHFRUFACLJURWAHFZQNOIQNJCWHRSB7A3N6QC
5A5UVGNMHEYI62XUENBEPSRJ7M3MBULYITRDOEYYA5ZDUMFBB6ZQC
QWE26TMV6A5VUQLWG5ZMKHMH4NFZIQMVVXQJMPP42JUMM66C3VOQC
FVVPKFTLD5VOGVDF7MOSSKQ3KPCT4HDGTNTTC6I6B4TWXOTPCJ7AC
QYY3KRGLZFFOMYZUMEX5DUFU2DSP3H3NVFLMVTS2PEFQZ6U63AMAC
5OBTKGDLRGZ4K4373CVGBF53LRECXQ7OW45PW43MJLBU622OKMTQC
EOHEZXX3TBKGJTOMXEPAPHLAJSATRXNY4AIL63BE4VG3HFNWARVQC
ZI4GJ72VMCBV6EQAO4PIKCGXV6ASUYOTGPRVSIGCKDBCDNRQNUFAC
impl From<XmppConnection> for MaybeXmppConnection {
fn from(from: XmppConnection) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from.account,
inner: Some(from.inner),
}
}
}
impl MaybeXmppConnection {
fn new(account: config::Account) -> MaybeXmppConnection {
MaybeXmppConnection {
impl XmppConnection {
fn new(account: config::Account) -> XmppConnection {
XmppConnection {
/// don't connect if stop_future resolved
fn connect<F>(
self,
stop_future: F,
) -> impl Future<Item = XmppConnection, Error = failure::Error>
where
F: future::Future + Clone + 'static,
<F as hyper::rt::Future>::Error: Into<failure::Error> + Send,
{
/// Error shoud be !
fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
Box::new(
stop_future
.clone()
.select2(
future::loop_fn(account, move |account| {
info!("xmpp initialization...");
let res_client = Client::new(&account.jid, &account.password);
match res_client {
Err(_e) => Box::new(future::ok(future::Loop::Continue(account)))
as Box<Future<Item = _, Error = _>>,
Ok(client) => {
info!("xmpp initialized");
let stop_future2 = stop_future.clone();
let stop_future3 = stop_future.clone();
Box::new(future::loop_fn(account, |account| {
info!("xmpp initialization...");
let mut res_client = Client::new(&account.jid, &account.password);
while let Err(e) = res_client {
error!("Cann't init xmpp client: {}", e);
res_client = Client::new(&account.jid, &account.password);
}
let client = res_client.expect("Cann't init xmpp client");
info!("xmpp initialized");
// future to wait for online
Box::new(
XmppConnection {
inner: client,
account,
}
.processing(XmppConnection::online, stop_future.clone())
.map(|(conn, _)| conn)
.map_err(|(acc, _)| acc)
.and_then(|conn| conn.initial_roster(stop_future2))
.and_then(|conn| conn.self_presence(stop_future3))
.then(
|r| match r {
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
},
),
)
}
}
})
.map_err(|_: ()| ()),
)
// future to wait for online
Self::online(client.split(), account)
.and_then(Self::self_presence)
Ok(Either::A((_x, _b))) => future::err(format_err!("Stop XMMP connection")),
Ok(Either::B((x, _a))) => future::ok(x),
Err(Either::A((e, _b))) => future::err(e.into()),
Err(Either::B((_, _a))) => {
future::err(format_err!("Cann't initiate XMPP connection"))
}
}),
)
Ok(conn) => future::ok(future::Loop::Break(conn)),
Err(acc) => future::ok(future::Loop::Continue(acc)),
})
}))
impl XmppConnection {
/// base XMPP processing
fn xmpp_processing(&mut self, event: &Event) {
info!("Incoming xmpp event: {:?}", event);
}
/// process event from xmpp stream
/// returns from future when condition met
/// or stop future was resolved
fn processing<S, F, T, E>(
self,
stop_condition: S,
stop_future: F,
) -> impl Future<
Item = (Self, Result<Either<F, T>, failure::Error>),
Error = (
std::rc::Rc<config::Account>,
Result<Either<F, T>, failure::Error>,
),
>
where
F: Future<Item = T, Error = E>,
E: Into<failure::Error>,
S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
{
future::loop_fn(
(self, stop_future, stop_condition),
|(xmpp, stop_future, mut stop_condition)| {
let XmppConnection { inner, account } = xmpp;
inner.into_future().select2(stop_future).then(|r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
inner: client,
/// get connection and wait for online status and set presence
/// returns error if something went wrong
fn online(
(sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
account: std::rc::Rc<config::Account>,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
Box::new(future::loop_fn(
(sink, stream, account),
|(sink, stream, account)| {
stream.into_future().then(|r| match r {
Ok((event, stream)) => match event {
Some(Event::Online) => {
info!("Online");
future::ok(future::Loop::Break(XmppConnection {
};
xmpp.xmpp_processing(&event);
match stop_condition(&mut xmpp, event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => {
future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
}
Err(e) => future::err((xmpp.account, Err(e))),
}
} else {
future::err((account, Ok(Either::A(b))))
inner: Some((sink, stream)),
}))
}
Ok(Either::B((t, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
Some(Event::Stanza(s)) => {
info!("xmpp stanza: {:?}", s);
future::ok(future::Loop::Continue((sink, stream, account)))
}
Err(Either::A((_e, b))) => future::err((account, Ok(Either::A(b)))),
Err(Either::B((e, a))) => {
if let Some(inner) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection { inner, account },
Err(e.into()),
)))
} else {
future::err((account, Err(e.into())))
_ => {
warn!("Disconnected");
future::err(account)
/// get connection and wait for online status and set presence
/// returns error if something went wrong
fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
match event {
Event::Online => {
info!("Online!");
Ok(true)
}
Event::Stanza(s) => {
warn!("Stanza before online: {:?}", s);
Ok(false)
}
_ => {
error!("Disconnected while online");
Err(format_err!("Disconnected while online"))
}
}
}
fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
let XmppConnection { account, inner } = self;
if let Some((sink, stream)) = inner {
use tokio::prelude::Sink;
fn initial_roster<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
use tokio::prelude::Sink;
let id_get_roster: String = "id_get_roster0".to_string();
let get_roster = stanzas::make_get_roster(&id_get_roster);
let account2 = account.clone();
info!("Quering roster... {:?}", get_roster);
client
.send(get_roster)
.map_err(move |e| {
error!("Error on querying roster: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(
move |_conn, event| {
if let Event::Stanza(s) = event {
use try_from::TryInto;
match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
Ok(iq) => {
if let Some(id) = iq.id {
if id == id_get_roster {
match iq.payload {
xmpp_parsers::iq::IqType::Error(_e) => {
Err(format_err!("Get error instead of roster"))
}
_ => Ok(true),
let presence = stanzas::make_presence(&account);
info!("Sending presence...");
Box::new(
sink.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
"Cann't send self-presence".to_owned()
})
.join(
future::loop_fn((account.clone(), stream), |(account, stream)| {
stream
.into_future()
.map_err(|(e, _)| {
error!("Error on reading self-presence: {}", e);
"Cann't read self-presence".to_owned()
})
.and_then(|(event, stream)| match event {
Some(event) => {
if let tokio_xmpp::Event::Stanza(e) = event {
info!("Get stanza: {:?}", e);
if e.name() == "presence"
&& e.attr("from")
.map_or(false, |f| f == account.jid)
&& e.attr("to").map_or(false, |f| f == account.jid)
{
info!("Self presence");
future::ok(future::Loop::Break(stream))
} else {
future::ok(future::Loop::Continue((
account, stream,
)))
}
Err(_e) => Ok(false),
}
} else {
Err(format_err!("Wrong event while waiting roster"))
}
},
stop_future,
)
})
.then(|r| match r {
Err((account, e)) => {
error!(
"Cann't wait roster: {}",
e.err().map_or_else(
|| std::borrow::Cow::Borrowed("None"),
|e| e.to_string().into()
)
);
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
}
fn self_presence<F, E>(
self,
stop_future: F,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
where
F: Future<Error = E>,
E: Into<failure::Error>,
{
let XmppConnection {
account,
inner: client,
} = self;
use tokio::prelude::Sink;
let presence = stanzas::make_presence(&account);
let account2 = account.clone();
info!("Sending presence... {:?}", presence);
client
.send(presence)
.map_err(|e| {
error!("Error on send self-presence: {}", e);
(account2, Err(failure::SyncFailure::new(e).into()))
})
.and_then(move |client| {
XmppConnection {
inner: client,
account,
}
.processing(
move |conn, event| {
if let Event::Stanza(s) = event {
if s.name() == "presence"
&& s.attr("from").map_or(false, |f| f == conn.account.jid)
&& s.attr("to").map_or(false, |f| f == conn.account.jid)
{
Ok(true)
} else {
Ok(false)
}
} else {
Err(format_err!("Wrong event while waiting self-presence"))
None => future::err("Got closed stream".to_owned()),
})
})
.map_err(|e| format!("waiting self-presence: {}", e)),
)
.then(|r| match r {
Err(e) => {
error!("Self-presence waiting error: {}", e);
future::err(account)
},
stop_future,
)
})
.then(|r| match r {
Err((account, _e)) => {
error!("Cann't wait self-presence");
future::err(account)
}
Ok((conn, _)) => future::ok(conn),
})
Ok(inner) => future::ok(XmppConnection {
account,
inner: Some(inner),
}),
}),
)
} else {
warn!("Don't gen connection on self-presence");
Box::new(future::err(account)) as Box<Future<Item = _, Error = _>>
}
Ok((cmd, cmd_recv, conn)) => {
if let Some(_cmd) = cmd {
info!("Got cmd");
// got cmd, continue
future::ok(future::Loop::Continue(XmppState::new(
cmd_recv,
signal,
conn.into(),
)))
} else {
future::ok(future::Loop::Break((None, conn.into())))
}
Ok(Either::A((_x, b))) => {
info!("Got signal");
// got signal, breaks
Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
as Box<Future<Item = _, Error = _>>
}
Ok(Either::B((x, a))) => {
info!("Got cmd");
// got cmd, continue
Box::new(future::ok(future::Loop::Continue(XmppState::new(
(x.0).1,
a,
x.1,
)))) as Box<Future<Item = _, Error = _>>
struct ServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl hyper::service::Service for ServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
type Future =
Box<dyn Future<Item = Response<Self::ResBody>, Error = Self::Error> + Send + 'static>;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
let xmpp_to_opt = req.headers().get("X-XMPP-To");
let xmpp_to_res: Result<jid::Jid, failure::Error> = xmpp_to_opt.map_or_else(
|| Err(format_err!("No X-XMPP-To header")),
|xmpp_to| {
std::str::from_utf8(xmpp_to.as_bytes())
.map_err(|e| e.into())
.and_then(|s| {
std::str::FromStr::from_str(s).map_err(|e: jid::JidParseError| e.into())
})
},
);
match xmpp_to_res {
Err(err) => {
warn!("Unknown destination: {}", err);
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Unknown destination: {}", err)))
.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
}),
)) as Box<Future<Item = _, Error = _> + Send + 'static>
}
Ok(xmpp_to) => {
info!("Got request. Reading body...");
let cmd_send = self.cmd_send.clone();
Box::new(
req.into_body()
.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
.fold(String::new(), |mut acc, ch| {
std::str::from_utf8(&*ch).map(|s| {
acc.push_str(s);
acc
})
})
.and_then(move |message: String| {
if !message.is_empty() {
Box::new(
cmd_send
.clone()
.send(XmppCommand { xmpp_to, message })
.then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(
Body::from("Accepted"),
)),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!(
"Command sent error: {}",
e
))),
)
}
})
.map_err(|e| {
Box::new(e)
as Box<
dyn std::error::Error + Sync + Send + 'static,
>
}),
)
} else {
warn!("Empty message");
Box::new(tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from("Empty message"))
.map_err(|e| {
Box::new(e)
as Box<
dyn std::error::Error + Sync + Send + 'static,
>
}),
))
as Box<Future<Item = _, Error = _> + Send + 'static>
}
}),
) as Box<Future<Item = _, Error = _> + Send + 'static>
}
}
}
}
struct MakeServiceCmd {
cmd_send: tokio_channel::mpsc::Sender<XmppCommand>,
}
impl<Ctx> hyper::service::MakeService<Ctx> for MakeServiceCmd {
type ReqBody = Body;
type ResBody = Body;
type Error = Box<dyn std::error::Error + Sync + Send + 'static>;
type Service = ServiceCmd;
type Future = tokio::prelude::future::FutureResult<ServiceCmd, Self::MakeError>;
type MakeError = hyper::http::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
tokio::prelude::future::ok(ServiceCmd {
cmd_send: self.cmd_send.clone(),
})
}
}
.serve(MakeServiceCmd { cmd_send })
.serve(move || {
let cmd_send = cmd_send.clone();
service_fn(move |_req: Request<Body>| {
info!("Got request");
cmd_send.clone().send(XmppCommand {}).then(|r| match r {
Ok(_) => tokio::prelude::future::ok(Response::new(Body::from("Accepted"))),
Err(e) => {
error!("Command sent error: {}", e);
tokio::prelude::future::result(
Response::builder()
.status(hyper::StatusCode::BAD_REQUEST)
.body(Body::from(format!("Command sent error: {}", e))),
)
}
})
})
})
"serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.34 (registry+https://github.com/rust-lang/crates.io-index)",
"checksum serde_json 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)" = "c37ccd6be3ed1fdf419ee848f7c758eb31b054d7cd3ae3600e3bae0adf569811"
"checksum serde_json 1.0.34 (registry+https://github.com/rust-lang/crates.io-index)" = "bdf540260cfee6da923831f4776ddc495ada940c30117977c70f1313a6130545"