SU4DNVCBZYNXKBPMMNL2TAUVXYVDGBJR2P3BAZJKG7GJMYWELMKQC
HCCX7VW6A536YXENOGEHI7BPNJ6FTCAPNOYKEQFQ53F6SLVI4VRQC
5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC
3FYEOGCIRRMOVYU2NB4MM477RSF4ZTSNJYFJ5NNXSB6WGR6MQYPQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC
UIXIQHDY7E4DLLRQP4ATXF625QC4KGMHWOV24UEQ6WD7G2IPM4HAC
UMTLHH77LGABTVKULH6ONVSBTMSFGH3CJ6GPNTFWH73AWNJZV6LQC
FWJDW3G5KT66GHFS7IPPSFS3G5COH2BNN57GHTVT533FE7467QZAC
OANBCLN5TD5VQTSAQXSIXU6IUCXMKQSML2UJGQCKVTOCENEVABIAC
ALP2YJIUN45LOOJU7GZWYDY7BMLCKR3LJVFPXTTLLZFB4LSYJKMAC
WJNXI6Z4NH5YEBDS4F6FY5AI5GNXNITC4IJZVWOESTVLOV667AHQC
HU3NZX5ZNTZB43SBAYMY2PUUMIF2ROJ4EFAJ6HTZXZL5T7ZJ3OQAC
EBETRYK7RNRATWD75LTFUTPF6M6KB6YPKTQUX4K232JB6CKVOZOQC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
5A5UVGNMHEYI62XUENBEPSRJ7M3MBULYITRDOEYYA5ZDUMFBB6ZQC
CP4MZO6VZHSAL2ENE7MUYUS2BJFL6EIZSHFFMT66WPQD7KHOE5ZQC
impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
MaybeXmppConnection {
account: from,
state: None,
}
}
}
impl MaybeXmppConnection {
info!("Incoming xmpp event: {:?}", event);
future::ok(self)
match event {
Event::Stanza(stanza) => {
info!("Incoming xmpp event: {:?}", stanza);
let stanza = stanza.clone();
use try_from::TryInto;
if let Some(_iq) = stanza.try_into().ok() as Option<xmpp_parsers::iq::Iq> {}
future::ok(self)
}
Event::Online => future::ok(self),
e => {
warn!("Unexpected event {:?}", e);
future::err(self.account)
}
}
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
},
Err(account) => future::err((account, Ok(Either::A(b)))),
}))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(future::err((account, Ok(Either::A(b)))))
}
}
Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
client.into_future().select2(stop_future).then(|r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let xmpp = XmppConnection {
state: XmppState { client, roster },
account,
};
Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
Ok(true) => {
future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
}
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}),
})
}
Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, roster },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}),
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, roster },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}),
})
}
fn process_command(
self,
cmd: &XmppCommand,
) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
info!("Got command");
if let Some(_jid_data) = self.state.data.roster.get(&cmd.xmpp_to) {
info!("Jid {} in roster", cmd.xmpp_to);
} else {
info!("Jid {} not in roster", cmd.xmpp_to);
}
future::ok(self)
impl<F, S> XmppProcessState<F, S> {
fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppProcessState<F, S> {
XmppProcessState {
impl<F, S> XmppState<F, S> {
fn new(cmd_recv: S, signal: F, conn: MaybeXmppConnection) -> XmppState<F, S> {
XmppState {
conn.connect(signal.clone()).and_then(|conn| {
info!("xmpp connected!");
conn.processing(|_, _| Ok(false), cmd_recv.into_future())
.then(|r| match r {
Ok((conn, r)) => match r {
Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
future::ok(future::Loop::Continue(XmppProcessState {
conn.connect(signal.clone())
.and_then(|conn| {
info!("xmpp connected!");
cmd_recv
.into_future()
.map_err(|_| {
error!("Got error on recv cmd");
format_err!("Receive cmd error")
})
.map(|(cmd, cmd_recv)| (cmd, cmd_recv, conn))
})
.then(|r| {
match r {
Ok((cmd, cmd_recv, conn)) => {
if let Some(_cmd) = cmd {
info!("Got cmd");
// got cmd, continue
future::ok(future::Loop::Continue(XmppState::new(
future::err(format_err!("Command receiver is gone"))
})
as Box<dyn Future<Item = _, Error = _>>,
Ok(Either::B((cmd, cmd_recv))) => {
if let Some(cmd) = cmd {
Box::new(conn.process_command(&cmd).then(|r| {
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: match r {
Ok(conn) => conn.into(),
Err(account) => account.into(),
},
}))
}))
as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(future::ok(future::Loop::Break(())))
}
}
Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
},
Err((account, r)) => Box::new(match r {
Ok(Either::A(f)) => {
if let Some(cmd_recv) = f.into_inner() {
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: account.into(),
}))
} else {
future::err(format_err!("Command receiver is gone"))
}
}
Ok(Either::B((cmd, cmd_recv))) => {
if let Some(_cmd) = cmd {
error!("Xmpp connection broken while get command");
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: account.into(),
}))
} else {
future::ok(future::Loop::Break(()))
}
future::ok(future::Loop::Break((None, conn.into())))
Err(_) => future::err(format_err!("Command receiver is broken")),
}),
})
})
}
Err(e) => {
// got cmd error, its bad
error!("Cmd error: {}", e);
future::err(e)
}
}
})
})
.and_then(|(opt_cmd_recv, _conn): (Option<S>, MaybeXmppConnection)| {
if let Some(cmd_recv) = opt_cmd_recv {
// process left commands
info!("Stop accepting commands");
Box::new(
cmd_recv
.for_each(|_cmd| future::ok(()))
.map_err(|_| format_err!("cmd receiver last error")),
) as Box<Future<Item = (), Error = failure::Error>>
} else {
Box::new(future::ok(()))
}