Send stanzas via send queue

[?]
Jan 2, 2019, 9:18 AM
DKXSFTDY6FVCBUAFNKOFBKKIKWQI7L7WSD3MRVEW7WFRT3R5VIFQC

Dependencies

  • [2] SA2IOFGY Add items to roster
  • [3] VS6AHRWI Move XMPP to separate dir
  • [4] UMTLHH77 Process commands in the separate function
  • [5] FWJDW3G5 Allow process xmpp incoming stanzas with futures
  • [6] QWE26TMV update deps
  • [7] EBETRYK7 Add counter for id. Check for jid in roster
  • [8] CBWCXUZZ Prepare adding new items to roster
  • [9] XGP44R5H Rework stopping xmpp connection
  • [10] UAT5MV5O Directly use id for initial roster request
  • [11] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [12] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [13] 3FYEOGCI Move additional rarely changed data to separate structure
  • [14] BWDUANCV Second part of processing result is only about stop_future
  • [15] SU4DNVCB Start to processing roster data

Change contents

  • replacement in src/xmpp/mod.rs at line 8
    [3.1][3.1:32]()
    use std::collections::HashMap;
    [3.1]
    [3.420]
    use std::collections::{HashMap, VecDeque};
  • edit in src/xmpp/mod.rs at line 21
    [2.670]
    [2.670]
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
  • replacement in src/xmpp/mod.rs at line 216
    [3.1389][3.1228:1283]()
    state: XmppState { client, data },
    [3.1389]
    [3.1588]
    state: XmppState { client, mut data },
  • replacement in src/xmpp/mod.rs at line 219
    [3.1643][3.1284:2575]()
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    ))),
    Err(_e) => future::err((xmpp.account, Ok(Either::A(b)))),
    [3.1643]
    [3.2575]
    if let Some(send_element) = data.send_queue.pop_front() {
    use tokio::prelude::Sink;
    info!("Sending {:?}", send_element);
    Box::new(client.send(send_element).select2(stop_future).then(
    move |r| match r {
    Ok(Either::A((client, b))) => {
    Box::new(future::ok(future::Loop::Continue((
    XmppConnection {
    state: XmppState { client, data },
    account,
  • replacement in src/xmpp/mod.rs at line 232
    [3.2614][3.2614:2744]()
    Err(account) => future::err((account, Ok(Either::A(b)))),
    }))
    [3.2614]
    [3.2744]
    b,
    stop_condition,
    ))))
  • replacement in src/xmpp/mod.rs at line 236
    [3.2820][3.2820:2857]()
    } else {
    [3.2820]
    [3.2857]
    }
    Ok(Either::B((t, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Ok(Either::B(t))))
    }
    })),
    Err(Either::A((e, b))) => {
    warn!("XMPP sending error: {}", e);
  • replacement in src/xmpp/mod.rs at line 254
    [3.2970][3.2970:3319](),[3.2043][3.3609:3644](),[3.3319][3.3609:3644](),[3.3609][3.3609:3644](),[3.3644][3.3320:3402](),[3.2206][3.3726:3759](),[3.3402][3.3726:3759](),[3.3726][3.3726:3759](),[3.3759][3.3403:3610](),[3.3610][3.3857:3936](),[3.2773][3.3857:3936](),[3.3936][3.2842:2868](),[3.2842][3.2842:2868](),[3.2868][3.3611:4185]()
    }
    Ok(Either::B((t, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    }),
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    }),
    })
    [3.2970]
    [3.5580]
    Err(Either::B((e, a))) => Box::new(a.then(|r| match r {
    Ok(client) => future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    ))),
    Err(se) => {
    warn!("XMPP sending error: {}", se);
    future::err((account, Err(e)))
    }
    })),
    },
    )) as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(
    client
    .into_future()
    .select2(stop_future)
    .then(move |r| match r {
    Ok(Either::A(((event, client), b))) => {
    if let Some(event) = event {
    let xmpp = XmppConnection {
    state: XmppState { client, data },
    account,
    };
    Box::new(xmpp.xmpp_processing(&event).then(|r| match r {
    Ok(mut xmpp) => {
    match stop_condition(&mut xmpp, event) {
    Ok(true) => future::ok(future::Loop::Break((
    xmpp,
    Ok(Either::A(b)),
    ))),
    Ok(false) => {
    future::ok(future::Loop::Continue((
    xmpp,
    b,
    stop_condition,
    )))
    }
    Err(_e) => future::err((
    xmpp.account,
    Ok(Either::A(b)),
    )),
    }
    }
    Err(account) => {
    future::err((account, Ok(Either::A(b))))
    }
    }))
    as Box<dyn Future<Item = _, Error = _>>
    } else {
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    }
    Ok(Either::B((t, a))) => {
    Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Ok(Either::B(t)),
    )))
    } else {
    future::err((account, Ok(Either::B(t))))
    })
    }
    Err(Either::A((e, b))) => {
    warn!("XMPP error: {}", e.0);
    Box::new(future::err((account, Ok(Either::A(b)))))
    }
    Err(Either::B((e, a))) => {
    Box::new(if let Some(client) = a.into_inner() {
    future::ok(future::Loop::Break((
    XmppConnection {
    state: XmppState { client, data },
    account,
    },
    Err(e),
    )))
    } else {
    future::err((account, Err(e)))
    })
    }
    }),
    )
    }
  • replacement in src/xmpp/mod.rs at line 516
    [3.4553][3.4553:4591](),[3.4591][2.2940:2966](),[2.2966][3.4618:4692](),[3.4618][3.4618:4692]()
    fn process_command(
    self,
    cmd: XmppCommand,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    [3.4553]
    [3.4692]
    fn process_command(&mut self, cmd: XmppCommand) {
  • edit in src/xmpp/mod.rs at line 520
    [3.4838][2.3032:3111]()
    Box::new(future::ok(self)) as Box<dyn Future<Item = _, Error = _>>
  • edit in src/xmpp/mod.rs at line 522
    [3.4940][2.3112:3247]()
    let XmppConnection {
    account,
    state: XmppState { client, mut data },
    } = self;
  • replacement in src/xmpp/mod.rs at line 523
    [2.3248][2.3248:3353]()
    data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", data.counter);
    [2.3248]
    [2.3353]
    self.state.data.counter += 1;
    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
  • replacement in src/xmpp/mod.rs at line 526
    [2.3445][2.3445:3489]()
    let account2 = account.clone();
    [2.3445]
    [2.3489]
    self.state
    .data
    .pending_add_roster_ids
    .insert(id_add_roster, (cmd.xmpp_to, cmd.message));
  • edit in src/xmpp/mod.rs at line 531
    [2.3552][2.3552:3590]()
    use tokio::prelude::Sink;
  • replacement in src/xmpp/mod.rs at line 532
    [2.3591][2.3591:4233]()
    Box::new(
    client
    .send(add_roster)
    .map_err(|e| {
    error!("Error on send adding to roster: {}", e);
    account2
    })
    .and_then(move |client| {
    data.pending_add_roster_ids
    .insert(id_add_roster, (cmd.xmpp_to, cmd.message));
    future::ok(XmppConnection {
    account,
    state: XmppState { client, data },
    })
    }),
    )
    [2.3591]
    [3.5454]
    self.state.data.send_queue.push_back(add_roster);
  • replacement in src/xmpp/mod.rs at line 578
    [3.5994][3.5994:6216](),[3.5015][3.7143:7225](),[3.6612][3.7143:7225](),[3.6216][3.7143:7225](),[3.7143][3.7143:7225](),[3.7225][3.6217:6300](),[3.5093][3.7302:7335](),[3.6696][3.7302:7335](),[3.6300][3.7302:7335](),[3.7302][3.7302:7335](),[3.7335][3.6301:6478]()
    Ok((conn, r)) => match r {
    Ok(Either::A(f)) => Box::new(if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    })
    as Box<dyn Future<Item = _, Error = _>>,
    [3.5994]
    [3.6478]
    Ok((mut conn, r)) => match r {
    Ok(Either::A(f)) => {
    if let Some(cmd_recv) = f.into_inner() {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
    }))
    } else {
    future::err(format_err!("Command receiver is gone"))
    }
    }
  • replacement in src/xmpp/mod.rs at line 592
    [3.6591][2.4234:4312](),[2.4312][3.6670:7141](),[3.6670][3.6670:7141]()
    Box::new(conn.process_command(cmd).then(|r| {
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: match r {
    Ok(conn) => conn.into(),
    Err(account) => account.into(),
    },
    }))
    [3.6591]
    [3.7141]
    conn.process_command(cmd);
    future::ok(future::Loop::Continue(XmppProcessState {
    cmd_recv,
    signal,
    conn: conn.into(),
  • edit in src/xmpp/mod.rs at line 599
    [3.7177][3.7177:7253](),[3.7177][3.7177:7253]()
    as Box<dyn Future<Item = _, Error = _>>
  • replacement in src/xmpp/mod.rs at line 600
    [3.7290][3.7290:7368](),[3.7290][3.7290:7368]()
    Box::new(future::ok(future::Loop::Break(())))
    [3.7290]
    [3.7368]
    future::ok(future::Loop::Break(()))
  • replacement in src/xmpp/mod.rs at line 603
    [3.9168][3.7399:7499]()
    Err(_) => Box::new(future::err(format_err!("Command receiver is broken"))),
    [3.9168]
    [3.7499]
    Err(_) => future::err(format_err!("Command receiver is broken")),
  • replacement in src/xmpp/mod.rs at line 605
    [3.7522][3.7522:7582]()
    Err((account, r)) => Box::new(match r {
    [3.7522]
    [3.7582]
    Err((account, r)) => match r {
  • replacement in src/xmpp/mod.rs at line 630
    [3.8862][3.8862:8886]()
    }),
    [3.8862]
    [3.8886]
    },