Add additional requirement for command stream

[?]
Dec 31, 2018, 7:15 PM
QTCUURXNMKPKDLG64QH72BIVP6SELSTQLUGGN67LKFVFN6FP2ZUQC

Dependencies

  • [2] WJNXI6Z4 Fill roster
  • [3] HU3NZX5Z Process self-presence via new processing code
  • [4] O2GM5J4F Don't split xmpp receiving and sending
  • [5] VS6AHRWI Move XMPP to separate dir
  • [6] FV6BJ5K6 Send self-presence and store account info in Rc so it willbe used in some future in parallel
  • [7] QYY3KRGL Use failure instead Box<dyn Error>
  • [8] UWY5EVZ6 Add dummy roster data
  • [9] 5A5UVGNM Move receiver closing logic out of xmpp processing
  • [10] 5OBTKGDL Update deps
  • [11] L77O4T7M Formatting and fixes
  • [12] NDDQQP2P Update deps
  • [13] HOAZX2PB Reorganize roster processing. Output roster
  • [14] PFC7OJQF Query roster
  • [15] AYQZ2UIA Update deps
  • [16] BTOZT4JP Use failure
  • [17] QWE26TMV update deps
  • [18] OGMBXBKP Move online to XmppConnection
  • [19] AGIW6YR3 Use shared future for signal everywhere
  • [20] 4LRBIGVT Show info about xmpp errors
  • [21] ALP2YJIU Rename XmppState to XmppProcessState
  • [22] OANBCLN5 Move xmpp client into XmppState
  • [23] XGP44R5H Rework stopping xmpp connection
  • [24] 3GEU7TC7 Welcome to 2018!
  • [25] IK3YDPTY Update deps
  • [26] TDOR5XQU Accept destination
  • [27] X6L47BHQ Use different structure for established xmpp connection
  • [28] ZI4GJ72V Add message to xmpp command
  • [29] 5IKA4GO7 Rename xmpp client field from "inner" to "client"
  • [30] V5HDBSZM Use jid for receiver address
  • [31] PVCRPP3B Some servers don't send to in initial presence
  • [*] FVVPKFTL Initial commit

Change contents

  • edit in src/xmpp/stanzas.rs at line 2
    [3.57][3.0:26]()
    use xmpp_parsers::iq::Iq;
  • edit in src/xmpp/stanzas.rs at line 3
    [3.141][3.27:61]()
    use xmpp_parsers::roster::Roster;
  • edit in src/xmpp/stanzas.rs at line 12
    [3.419][3.62:272]()
    pub fn make_get_roster(id: &str) -> Element {
    let mut get_roster = Iq::from_get(Roster {
    ver: None,
    items: vec![],
    });
    get_roster.id = Some(id.to_string());
    get_roster.into()
    }
  • edit in src/xmpp/mod.rs at line 9
    [3.434][3.273:320]()
    const ID_GET_ROSTER: &str = "id_get_roster0";
  • replacement in src/xmpp/mod.rs at line 44
    [3.1011][3.536:581]()
    F: future::Future + Clone + 'static,
    [3.1011]
    [3.1056]
    F: future::Future + 'static,
  • edit in src/xmpp/mod.rs at line 56
    [3.1405][3.748:777]()
    .clone()
  • replacement in src/xmpp/mod.rs at line 57
    [3.1464][3.778:844]()
    future::loop_fn(account, move |account| {
    [3.1464]
    [3.1530]
    future::loop_fn(account, |account| {
  • edit in src/xmpp/mod.rs at line 65
    [3.2010][3.845:998]()
    let stop_future2 = stop_future.clone();
    let stop_future3 = stop_future.clone();
  • replacement in src/xmpp/mod.rs at line 72
    [3.589][3.1073:1481]()
    .processing(XmppConnection::online, stop_future.clone())
    .map(|(conn, _)| conn)
    .map_err(|(acc, _)| acc)
    .and_then(|conn| conn.initial_roster(stop_future2))
    .and_then(|conn| conn.self_presence(stop_future3))
    [3.589]
    [3.720]
    .online()
    .and_then(XmppConnection::self_presence)
  • replacement in src/xmpp/mod.rs at line 101
    [3.3788][3.1482:1590]()
    fn xmpp_processing(&mut self, event: &Event) {
    info!("Incoming xmpp event: {:?}", event);
    }
    [3.3788]
    [3.3896]
    fn xmpp_processing(&mut self, _event: &Event) {}
  • replacement in src/xmpp/mod.rs at line 111
    [3.4137][3.1591:1773]()
    Item = (Self, Result<Either<F, T>, failure::Error>),
    Error = (
    std::rc::Rc<config::Account>,
    Result<Either<F, T>, failure::Error>,
    ),
    [3.4137]
    [3.4319]
    Item = (Self, Result<Either<F, T>, E>),
    Error = (std::rc::Rc<config::Account>, Result<Either<F, T>, E>),
  • replacement in src/xmpp/mod.rs at line 116
    [3.4375][3.1774:1875]()
    E: Into<failure::Error>,
    S: FnMut(&mut Self, Event) -> Result<bool, failure::Error>,
    [3.4375]
    [3.4476]
    S: FnMut(&mut Self, &Event) -> bool,
  • replacement in src/xmpp/mod.rs at line 130
    [3.907][3.907:1295](),[3.907][3.907:1295](),[3.1295][3.3111:3145](),[3.3111][3.3111:3145](),[3.3145][3.1296:1375]()
    match stop_condition(&mut xmpp, event) {
    Ok(true) => {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    }
    Ok(false) => {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
    }
    Err(e) => future::err((xmpp.account, Err(e))),
    [3.907]
    [3.3255]
    if stop_condition(&mut xmpp, &event) {
    future::ok(future::Loop::Break((xmpp, Ok(Either::A(b)))))
    } else {
    future::ok(future::Loop::Continue((xmpp, b, stop_condition)))
  • replacement in src/xmpp/mod.rs at line 154
    [2.939][3.2644:2691](),[3.2644][3.2644:2691]()
    Err(e.into()),
    [2.939]
    [3.2691]
    Err(e),
  • replacement in src/xmpp/mod.rs at line 157
    [3.2756][3.2756:2822](),[3.2756][3.2756:2822]()
    future::err((account, Err(e.into())))
    [3.2756]
    [3.5513]
    future::err((account, Err(e)))
  • replacement in src/xmpp/mod.rs at line 167
    [3.6737][3.4692:6307](),[3.6307][2.940:1025](),[2.1025][3.6307:6802](),[3.6307][3.6307:6802]()
    fn online(&mut self, event: Event) -> Result<bool, failure::Error> {
    match event {
    Event::Online => {
    info!("Online!");
    Ok(true)
    }
    Event::Stanza(s) => {
    warn!("Stanza before online: {:?}", s);
    Ok(false)
    }
    _ => {
    error!("Disconnected while online");
    Err(format_err!("Disconnected while online"))
    }
    }
    }
    fn process_initial_roster(&mut self, event: Event) -> Result<bool, failure::Error> {
    if let Event::Stanza(s) = event {
    use try_from::TryInto;
    match s.try_into() as Result<xmpp_parsers::iq::Iq, _> {
    Ok(iq) => {
    if let Some(id) = iq.id {
    if id == ID_GET_ROSTER {
    match iq.payload {
    xmpp_parsers::iq::IqType::Error(_e) => {
    Err(format_err!("Get error instead of roster"))
    }
    xmpp_parsers::iq::IqType::Result(Some(result)) => {
    match result.try_into()
    as Result<xmpp_parsers::roster::Roster, _>
    {
    Ok(roster) => {
    info!("Got roster:");
    for i in roster.items {
    info!(" >>> {:?}", i);
    self.state.roster.insert(i.jid, ());
    }
    Ok(true)
    }
    Err(e) => Err(format_err!("Cann't parse roster: {}", e)),
    }
    }
    _ => Err(format_err!("Unknown result of roster")),
    }
    } else {
    Ok(false)
    [3.6737]
    [3.5558]
    fn online(self) -> impl Future<Item = XmppConnection, Error = std::rc::Rc<config::Account>> {
    Box::new(future::loop_fn(
    (self.inner, self.account),
    |(client, account)| {
    client.into_future().then(|r| match r {
    Ok((event, client)) => match event {
    Some(Event::Online) => {
    info!("Online");
    future::ok(future::Loop::Break(XmppConnection {
    account,
    inner: client,
    }))
    }
    Some(Event::Stanza(s)) => {
    info!("xmpp stanza: {:?}", s);
    future::ok(future::Loop::Continue((client, account)))
    }
    _ => {
    warn!("Disconnected");
    future::err(account)
  • replacement in src/xmpp/mod.rs at line 188
    [3.5584][3.6803:6897]()
    } else {
    Err(format_err!("Iq stanza without id"))
    [3.5584]
    [3.8819]
    },
    Err((e, _)) => {
    error!("xmpp receive error: {}", e);
    future::err(account)
  • replacement in src/xmpp/mod.rs at line 193
    [3.8841][3.6898:7060](),[3.8564][3.3783:3789](),[3.5720][3.3783:3789](),[3.7060][3.3783:3789](),[3.5926][3.3783:3789](),[3.3280][3.3783:3789](),[3.3996][3.3783:3789](),[3.5642][3.3783:3789](),[3.4300][3.3783:3789](),[3.4794][3.3783:3789](),[3.3307][3.3783:3789](),[3.10350][3.3783:3789](),[3.5692][3.3783:3789](),[3.8718][3.3783:3789](),[3.3783][3.3783:3789](),[3.3789][3.10351:10352](),[3.10352][3.7061:7279](),[3.7279][3.2865:2915](),[3.2915][2.1026:1053](),[2.1053][3.2964:2982](),[3.2964][3.2964:2982](),[3.2982][3.7333:7528](),[3.7333][3.7333:7528](),[3.7528][3.2983:2998](),[3.2998][3.7562:7838](),[3.7562][3.7562:7838](),[3.7838][2.1054:1089](),[3.3056][3.7887:8518](),[2.1089][3.7887:8518](),[3.7887][3.7887:8518]()
    }
    Err(_e) => Ok(false),
    }
    } else {
    Err(format_err!("Wrong event while waiting roster"))
    }
    }
    fn initial_roster<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection {
    account,
    inner: client,
    } = self;
    use tokio::prelude::Sink;
    let get_roster = stanzas::make_get_roster(ID_GET_ROSTER);
    let account2 = account.clone();
    info!("Quering roster... {:?}", get_roster);
    client
    .send(get_roster)
    .map_err(move |e| {
    error!("Error on querying roster: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection {
    inner: client,
    account,
    }
    .processing(XmppConnection::process_initial_roster, stop_future)
    })
    .then(|r| match r {
    Err((account, e)) => {
    error!(
    "Cann't wait roster: {}",
    e.err().map_or_else(
    || std::borrow::Cow::Borrowed("None"),
    |e| e.to_string().into()
    )
    );
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
    [3.8841]
    [3.8518]
    })
    },
    ))
  • replacement in src/xmpp/mod.rs at line 198
    [3.8525][3.8525:8742](),[3.8525][3.8525:8742](),[3.8525][3.8525:8742](),[3.8742][3.3057:3107](),[3.3107][2.1090:1117](),[2.1117][3.3156:3174](),[3.3156][3.3156:3174]()
    fn self_presence<F, E>(
    self,
    stop_future: F,
    ) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>>
    where
    F: Future<Error = E>,
    E: Into<failure::Error>,
    {
    let XmppConnection {
    account,
    inner: client,
    } = self;
    [3.8525]
    [3.10624]
    fn self_presence(self) -> impl Future<Item = Self, Error = std::rc::Rc<config::Account>> {
    let XmppConnection { account, inner } = self;
    let client = inner;
  • edit in src/xmpp/mod.rs at line 204
    [3.10716]
    [3.10716]
    info!("Sending presence...");
  • replacement in src/xmpp/mod.rs at line 206
    [3.10756][3.8797:8851](),[3.8851][3.3175:3190](),[3.3190][3.8885:9157](),[3.8885][3.8885:9157](),[3.9157][2.1118:1153](),[3.3248][3.9206:10285](),[2.1153][3.9206:10285](),[3.9206][3.9206:10285]()
    info!("Sending presence... {:?}", presence);
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    (account2, Err(failure::SyncFailure::new(e).into()))
    })
    .and_then(move |client| {
    XmppConnection {
    inner: client,
    account,
    }
    .processing(
    move |conn, event| {
    if let Event::Stanza(s) = event {
    if s.name() == "presence"
    && s.attr("from").map_or(false, |f| f == conn.account.jid)
    && s.attr("to").map_or(false, |f| f == conn.account.jid)
    {
    Ok(true)
    } else {
    Ok(false)
    }
    } else {
    Err(format_err!("Wrong event while waiting self-presence"))
    }
    },
    stop_future,
    )
    })
    .then(|r| match r {
    Err((account, _e)) => {
    error!("Cann't wait self-presence");
    future::err(account)
    }
    Ok((conn, _)) => future::ok(conn),
    })
    [3.10756]
    [3.12130]
    Box::new(
    client
    .send(presence)
    .map_err(|e| {
    error!("Error on send self-presence: {}", e);
    "Cann't send self-presence".to_owned()
    })
    .and_then(move |client| {
    future::loop_fn((account2.clone(), client), |(account, client)| {
    client
    .into_future()
    .map_err(|(e, _)| {
    error!("Error on reading self-presence: {}", e);
    "Cann't read self-presence".to_owned()
    })
    .and_then(|(event, client)| match event {
    Some(event) => {
    if let tokio_xmpp::Event::Stanza(e) = event {
    info!("Get stanza: {:?}", e);
    if e.name() == "presence"
    && e.attr("from").map_or(false, |f| f == account.jid)
    && e.attr("to").map_or(false, |f| f == account.jid)
    {
    info!("Self presence");
    future::ok(future::Loop::Break(client))
    } else {
    future::ok(future::Loop::Continue((account, client)))
    }
    } else {
    future::err("Got wrong event".to_owned())
    }
    }
    None => future::err("Got closed stream".to_owned()),
    })
    })
    .map_err(|e| format!("waiting self-presence: {}", e))
    })
    .then(|r| match r {
    Err(e) => {
    error!("Self-presence waiting error: {}", e);
    future::err(account)
    }
    Ok(inner) => future::ok(XmppConnection { account, inner }),
    }),
    )
  • replacement in src/xmpp/mod.rs at line 281
    [3.8651][3.465:518](),[3.5896][3.465:518](),[3.6054][3.465:518](),[3.5793][3.465:518](),[3.4398][3.465:518](),[3.12380][3.465:518](),[3.5737][3.465:518](),[3.8987][3.465:518](),[3.4355][3.465:518]()
    S: stream::Stream<Item = XmppCommand> + 'static,
    [3.12380]
    [3.4355]
    S: stream::Stream<Item = XmppCommand, Error = ()> + 'static,
  • replacement in src/xmpp/mod.rs at line 320
    [3.6342][3.10615:10654]()
    future::err(e)
    [3.6342]
    [3.6572]
    future::err(format_err!("Cmd error"))
  • edit in Cargo.toml at line 23
    [3.7111][3.10655:10704]()
    try_from = "=0.2.2" # dependency of xmpp-parsers
  • edit in Cargo.lock at line 1110
    [3.44807][3.10705:10780]()
    "try_from 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",