Sending messages works!

[?]
Jan 2, 2019, 7:50 PM
77USPY5IJSK7YN355CRGZQ2ZIG5WHRCD7JWKE3BVVBYIU6QLB5ZAC

Dependencies

  • [2] XOAM22TT Simplify xmpp incoming stanzas processing without futures
  • [3] BWDUANCV Second part of processing result is only about stop_future
  • [4] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [5] CBWCXUZZ Prepare adding new items to roster
  • [6] OANBCLN5 Move xmpp client into XmppState
  • [7] XGP44R5H Rework stopping xmpp connection
  • [8] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [9] VS6AHRWI Move XMPP to separate dir
  • [10] QWE26TMV update deps
  • [11] SA2IOFGY Add items to roster
  • [12] DKXSFTDY Send stanzas via send queue
  • [13] EBETRYK7 Add counter for id. Check for jid in roster
  • [14] UMTLHH77 Process commands in the separate function
  • [15] QTCUURXN Add additional requirement for command stream
  • [16] SU4DNVCB Start to processing roster data
  • [17] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [18] ALP2YJIU Rename XmppState to XmppProcessState
  • [19] UAT5MV5O Directly use id for initial roster request
  • [20] 3FYEOGCI Move additional rarely changed data to separate structure

Change contents

  • edit in src/xmpp/stanzas.rs at line 3
    [3.26]
    [3.57]
    use xmpp_parsers::message::{Body, Message, MessageType};
  • edit in src/xmpp/stanzas.rs at line 37
    [3.445]
    [3.270]
    }
    pub fn make_ask_subscribe(jid: jid::Jid) -> Element {
    let mut presence = Presence::new(PresenceType::Subscribe);
    presence.to = Some(jid);
    presence.into()
  • edit in src/xmpp/stanzas.rs at line 44
    [3.272]
    pub fn make_chat_message(jid: jid::Jid, text: String) -> Element {
    let mut message = Message::new(Some(jid));
    message.bodies.insert(String::new(), Body(text));
    message.type_ = MessageType::Chat;
    message.into()
    }
  • replacement in src/xmpp/mod.rs at line 8
    [3.1][3.0:43]()
    use std::collections::{HashMap, VecDeque};
    [3.1]
    [3.420]
    use std::collections::HashMap;
  • replacement in src/xmpp/mod.rs at line 15
    [3.472][3.484:519](),[3.484][3.484:519]()
    roster: HashMap<jid::Jid, ()>,
    [3.472]
    [3.473]
    roster: HashMap<jid::Jid, (xmpp_parsers::roster::Subscription, Vec<String>)>,
  • edit in src/xmpp/mod.rs at line 21
    [3.670][3.44:112]()
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
  • replacement in src/xmpp/mod.rs at line 147
    [3.3788][2.0:90]()
    /// Returns false on error
    fn xmpp_processing(&mut self, event: &Event) -> bool {
    [3.3788]
    [3.792]
    fn xmpp_processing(
    mut self,
    event: &Event,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
  • replacement in src/xmpp/mod.rs at line 159
    [3.1135][3.1135:1195](),[3.1135][3.1135:1195]()
    if let Some((_, (jid, _message))) =
    [3.1135]
    [3.1195]
    if let Some((_, (jid, message))) =
  • edit in src/xmpp/mod.rs at line 164
    [3.1465]
    [3.1465]
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&jid) {
    rdata.1.push(message);
    } else {
    self.state.data.roster.insert(
    jid,
    (xmpp_parsers::roster::Subscription::None, vec![message]),
    );
    }
    } else {
    warn!(
    "Wrong payload when adding {} to roster: {:?}",
    jid, iq.payload
    );
  • replacement in src/xmpp/mod.rs at line 184
    [3.1788][3.1788:1980](),[3.1788][3.1788:1980]()
    self.state
    .data
    .roster
    .extend(roster.items.into_iter().map(|i| {
    [3.1788]
    [3.1980]
    for i in roster.items {
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid)
    {
    info!("Update {} in roster", i.jid);
    rdata.0 = i.subscription;
    if !rdata.1.is_empty() {
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", i.jid);
    let jid = i.jid.clone();
    self.state.data.send_queue.extend(
    rdata.1.drain(..).map(|message| {
    stanzas::make_chat_message(jid.clone(), message)
    }),
    )
    } else {
    info!("Not subscribed to {}", i.jid);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(i.jid));
    }
    }
    } else {
  • replacement in src/xmpp/mod.rs at line 213
    [3.2050][3.2050:2135](),[3.2050][3.2050:2135]()
    (i.jid, ())
    }));
    [3.2050]
    [3.2135]
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, vec![]));
    }
    }
  • replacement in src/xmpp/mod.rs at line 222
    [3.2201][2.91:112]()
    true
    [3.2201]
    [3.2234]
    future::ok(self)
  • replacement in src/xmpp/mod.rs at line 224
    [3.2248][2.113:148]()
    Event::Online => true,
    [3.2248]
    [3.2295]
    Event::Online => future::ok(self),
  • replacement in src/xmpp/mod.rs at line 227
    [3.2365][2.149:171]()
    false
    [3.2365]
    [3.2407]
    future::err(self.account)
  • replacement in src/xmpp/mod.rs at line 255
    [3.1389][3.113:172]()
    state: XmppState { client, mut data },
    [3.1389]
    [3.1588]
    state: XmppState { client, data },
  • replacement in src/xmpp/mod.rs at line 258
    [3.1643][3.173:791]()
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(client.send(send_element).select2(stop_future).then(
    move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
    [3.1643]
    [3.2575]
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
  • replacement in src/xmpp/mod.rs at line 281
    [3.2614][3.792:920]()
    b,
    stop_condition,
    ))))
    [3.2614]
    [3.2744]
    Err(account) => future::err((account, Ok(Either::A(b)))),
    }))
  • replacement in src/xmpp/mod.rs at line 284
    [3.2820][3.921:1806]()
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
    [3.2820]
    [3.2857]
    } else {
  • replacement in src/xmpp/mod.rs at line 287
    [3.2970][3.1807:2979](),[3.2979][2.172:244](),[2.244][3.3047:3222](),[3.3047][3.3047:3222](),[3.3222][2.245:1169](),[2.1169][3.4515:4611](),[3.4515][3.4515:4611](),[3.4611][2.1170:1351](),[2.1351][3.4936:4981](),[3.4936][3.4936:4981](),[3.4981][2.1352:1433](),[2.1433][3.5072:5203](),[3.5072][3.5072:5203](),[3.5203][2.1434:1509](),[2.1509][3.5287:5840](),[3.5287][3.5287:5840](),[3.5840][2.1510:1548](),[2.1548][3.5879:6039](),[3.5879][3.5879:6039](),[3.6039][2.1549:1626](),[2.1626][3.6126:6220](),[3.6126][3.6126:6220](),[3.6220][2.1627:1702](),[2.1702][3.6304:6837](),[3.6304][3.6304:6837](),[3.6837][2.1703:1741](),[2.1741][3.6876:6982](),[3.6876][3.6876:6982]()
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    },
    )) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let mut xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    if xmpp.xmpp_processing(&event) {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    }
    } else {
    future::err((xmpp.account, Ok(Either::A(b))))
    }
    } else {
    future::err((account, Ok(Either::A(b))))
    }
    }
    Ok(Either::B((t, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    future::err((account, Ok(Either::A(b))))
    }
    Err(Either::B((e, a))) => {
    if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }
    }
    }),
    )
    }
    [3.2970]
    [3.5580]
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
  • replacement in src/xmpp/mod.rs at line 359
    [3.5538][3.4253:4343]()
    self.state.data.roster.insert(i.jid, ());
    [3.5538]
    [3.5538]
    self.state
    .data
    .roster
    .insert(i.jid, (i.subscription, vec![]));
  • replacement in src/xmpp/mod.rs at line 491
    [3.4553][3.6983:7037]()
    fn process_command(&mut self, cmd: XmppCommand) {
    [3.4553]
    [3.4692]
    fn process_command(
    self,
    cmd: XmppCommand,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
  • replacement in src/xmpp/mod.rs at line 496
    [3.4722][3.2967:3031]()
    if self.state.data.roster.get(&cmd.xmpp_to).is_some() {
    [3.4722]
    [3.4786]
    if let Some(ref mut rdata) = self.state.data.roster.get_mut(&cmd.xmpp_to) {
  • edit in src/xmpp/mod.rs at line 498
    [3.4838]
    [3.4867]
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    Box::new(future::ok(self)) as Box<dyn Future<Item = _, Error = _>>
    ================================
    let sub_to = match rdata.0 {
    xmpp_parsers::roster::Subscription::To => true,
    xmpp_parsers::roster::Subscription::Both => true,
    _ => false,
    };
    if sub_to {
    info!("Subscribed to {}", cmd.xmpp_to);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_chat_message(cmd.xmpp_to, cmd.message));
    } else {
    info!("Not subscribed to {}", cmd.xmpp_to);
    rdata.1.push(cmd.message);
    self.state
    .data
    .send_queue
    .push_back(stanzas::make_ask_subscribe(cmd.xmpp_to));
    }
    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
  • edit in src/xmpp/mod.rs at line 523
    [3.4940]
    [3.3247]
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
  • replacement in src/xmpp/mod.rs at line 528
    [3.3248][3.7038:7165]()
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
    [3.3248]
    [3.3353]
    data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", data.counter);
  • replacement in src/xmpp/mod.rs at line 531
    [3.3445][3.7166:7319]()
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, (cmd.xmpp_to, cmd.message));
    [3.3445]
    [3.3489]
    let account2 = account.clone();
  • edit in src/xmpp/mod.rs at line 533
    [3.3552]
    [3.3590]
    use tokio::prelude::Sink;
  • replacement in src/xmpp/mod.rs at line 535
    [3.3591][3.7320:7382]()
    self.state.data.send_queue.push_back(add_roster);
    [3.3591]
    [3.5454]
    Box::new(
    client
    .send(add_roster)
    .map_err(|e| {
    error!("Error on send adding to roster: {}", e);
    account2
    })
    .and_then(move |client| {
    data.pending_add_roster_ids
    .insert(id_add_roster, (cmd.xmpp_to, cmd.message));
    future::ok(XmppConnection {
    account,
    state: XmppState { client, data },
    })
    }),
    )
  • replacement in src/xmpp/mod.rs at line 596
    [3.5994][3.7383:7993]()
    Ok((mut conn, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
    [3.5994]
    [3.6478]
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    })
    as Box<dyn Future<Item = _, Error = _>>,
  • replacement in src/xmpp/mod.rs at line 609
    [3.6591][3.7994:8284]()
    conn.process_command(cmd);
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    [3.6591]
    [3.7141]
    Box::new(conn.process_command(cmd).then(|r| {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: match r {
    Ok(conn) => conn.into(),
    Err(account) => account.into(),
    },
    }))
  • edit in src/xmpp/mod.rs at line 619
    [3.7177]
    [3.7253]
    as Box<dyn Future<Item = _, Error = _>>
  • replacement in src/xmpp/mod.rs at line 621
    [3.7290][3.8285:8353]()
    future::ok(future::Loop::Break(()))
    [3.7290]
    [3.7368]
    Box::new(future::ok(future::Loop::Break(())))
  • replacement in src/xmpp/mod.rs at line 624
    [3.9168][3.8354:8444]()
    Err(_) => future::err(format_err!("Command receiver is broken")),
    [3.9168]
    [3.7499]
    Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
  • replacement in src/xmpp/mod.rs at line 626
    [3.7522][3.8445:8496]()
    Err((account, r)) => match r {
    [3.7522]
    [3.7582]
    Err((account, r)) => Box::new(match r {
  • replacement in src/xmpp/mod.rs at line 651
    [3.8862][3.8497:8520]()
    },
    [3.8862]
    [3.8886]
    }),