77USPY5IJSK7YN355CRGZQ2ZIG5WHRCD7JWKE3BVVBYIU6QLB5ZAC
XOAM22TTB4TTPABSG753EQWIADXB43ZEW6LKS64P6NWRETLDUONQC
BWDUANCV77MCLCYMRS2UNFIFW4ZC3KB2KEGEUI77FRO7KW6TUZJQC
FV6BJ5K64QG63YI2PTII44ZEBAYNJBQZ5FSBHW65EPHGBKFGUW4AC
SA2IOFGYAOWEAS2M3QUXAIUZ5PRGXLQLDYCF2JQKCBGN42K6XU6AC
EBETRYK7RNRATWD75LTFUTPF6M6KB6YPKTQUX4K232JB6CKVOZOQC
DKXSFTDY6FVCBUAFNKOFBKKIKWQI7L7WSD3MRVEW7WFRT3R5VIFQC
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC
CBWCXUZZBPQYFROVZFCIWHW3CPNAGXO3PUARLGOF4DUF6VVCYKVQC
5IKA4GO7DIMBBYB7OUF5S7N2LUWIZ5MKCCCTZCAYPZI2357MVFHAC
XGP44R5HBQAHBXEJKN4AB77OZ32B32FPUHSUILZDVNO4QLUOTW6QC
UMTLHH77LGABTVKULH6ONVSBTMSFGH3CJ6GPNTFWH73AWNJZV6LQC
pub fn make_chat_message(jid: jid::Jid, text: String) -> Element {
let mut message = Message::new(Some(jid));
message.bodies.insert(String::new(), Body(text));
message.type_ = MessageType::Chat;
message.into()
}
self.state
.data
.roster
.extend(roster.items.into_iter().map(|i| {
for i in roster.items {
if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid)
{
info!("Update {} in roster", i.jid);
rdata.0 = i.subscription;
if !rdata.1.is_empty() {
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", i.jid);
let jid = i.jid.clone();
self.state.data.send_queue.extend(
rdata.1.drain(..).map(|message| {
stanzas::make_chat_message(jid.clone(), message)
}),
)
} else {
info!("Not subscribed to {}", i.jid);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(i.jid));
}
}
} else {
if let Some(send_element) = data.send_queue.pop_front() {
use tokio::prelude::Sink;
info!("Sending {:?}", send_element);
Box::new(client.send(send_element).select2(stop_future).then(
move |r| match r {
Ok(Either::A((client, b))) => {
Box::new(future::ok(future::Loop::Continue((
XmppConnection {
state: XmppState { client, data },
account,
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
}
Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Ok(Either::B(t))))
}
})),
Err(Either::A((e, b))) => {
warn!("XMPP sending error: {}", e);
} else {
Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
Ok(client) => future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
))),
Err(se) => {
warn!("XMPP sending error: {}", se);
future::err((account, Err(e)))
}
})),
},
)) as Box<dyn Future<Item = _, Error = _>>
} else {
Box::new(
client
.into_future()
.select2(stop_future)
.then(move |r| match r {
Ok(Either::A(((event, client), b))) => {
if let Some(event) = event {
let mut xmpp = XmppConnection {
state: XmppState { client, data },
account,
};
if xmpp.xmpp_processing(&event) {
match stop_condition(&mut xmpp, event) {
Ok(true) => future::ok(future::Loop::Break((
xmpp,
Ok(Either::A(b)),
))),
Ok(false) => future::ok(future::Loop::Continue((
xmpp,
b,
stop_condition,
))),
Err(_e) => {
future::err((xmpp.account, Ok(Either::A(b))))
}
}
} else {
future::err((xmpp.account, Ok(Either::A(b))))
}
} else {
future::err((account, Ok(Either::A(b))))
}
}
Ok(Either::B((t, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}
}
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
future::err((account, Ok(Either::A(b))))
}
Err(Either::B((e, a))) => {
if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}
}
}),
)
}
}
Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Ok(Either::B(t)),
)))
} else {
future::err((account, Ok(Either::B(t))))
}),
Err(Either::A((e, b))) => {
warn!("XMPP error: {}", e.0);
Box::new(future::err((account, Ok(Either::A(b)))))
}
Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
future::ok(future::Loop::Break((
XmppConnection {
state: XmppState { client, data },
account,
},
Err(e),
)))
} else {
future::err((account, Err(e)))
}),
})
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Box::new(future::ok(self)) as Box<dyn Future<Item = _, Error = _>>
================================
let sub_to = match rdata.0 {
xmpp_parsers::roster::Subscription::To => true,
xmpp_parsers::roster::Subscription::Both => true,
_ => false,
};
if sub_to {
info!("Subscribed to {}", cmd.xmpp_to);
self.state
.data
.send_queue
.push_back(stanzas::make_chat_message(cmd.xmpp_to, cmd.message));
} else {
info!("Not subscribed to {}", cmd.xmpp_to);
rdata.1.push(cmd.message);
self.state
.data
.send_queue
.push_back(stanzas::make_ask_subscribe(cmd.xmpp_to));
}
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
self.state.data.send_queue.push_back(add_roster);
Box::new(
client
.send(add_roster)
.map_err(|e| {
error!("Error on send adding to roster: {}", e);
account2
})
.and_then(move |client| {
data.pending_add_roster_ids
.insert(id_add_roster, (cmd.xmpp_to, cmd.message));
future::ok(XmppConnection {
account,
state: XmppState { client, data },
})
}),
)
Ok((mut conn, r)) => match r {
Ok(Either::A(f)) => {
if let Some(cmd_recv) = f.into_inner() {
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: conn.into(),
}))
} else {
future::err(format_err!("Command receiver is gone"))
}
}
Ok((conn, r)) => match r {
Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: conn.into(),
}))
} else {
future::err(format_err!("Command receiver is gone"))
})
as Box<dyn Future<Item = _, Error = _>>,
conn.process_command(cmd);
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: conn.into(),
Box::new(conn.process_command(cmd).then(|r| {
future::ok(future::Loop::Continue(XmppProcessState {
cmd_recv,
signal,
conn: match r {
Ok(conn) => conn.into(),
Err(account) => account.into(),
},
}))