Move XMPP to separate dir

[?]
Oct 30, 2018, 11:25 AM
VS6AHRWIPIMQNQAFX7INQGSUB72OW2K6HM6NGOV76RBRF6COQFRQC

Dependencies

Change contents

  • file addition: xmpp (dxwrx-rx-r)
    [2.6]
  • file addition: mod.rs (-xw-x--x--)
    [0.7]
    use tokio::prelude::future::{self, Either};
    use tokio::prelude::stream;
    use tokio::prelude::{Future, Stream};
    use tokio_channel::mpsc::Receiver;
    use tokio_xmpp::{Client, Event};
    use config;
    pub struct XmppConnection {
    account: config::Account,
    inner: Option<(stream::SplitSink<Client>, stream::SplitStream<Client>)>,
    }
    impl XmppConnection {
    fn new(account: config::Account) -> XmppConnection {
    XmppConnection {
    account,
    inner: None,
    }
    }
    /// connects if nothing connected
    /// Error shoud be !
    fn connect<E: 'static>(self) -> impl Future<Item = Self, Error = E> {
    info!("xmpp connection...");
    let XmppConnection { account, inner } = self;
    if let Some(inner) = inner {
    Box::new(future::ok(XmppConnection {
    account,
    inner: Some(inner),
    })) as Box<Future<Item = _, Error = E>>
    } else {
    Box::new(future::loop_fn(account, |account| {
    info!("xmpp initialization...");
    let mut res_client = Client::new(&account.jid, &account.password);
    while let Err(e) = res_client {
    error!("Cann't init xmpp client: {}", e);
    res_client = Client::new(&account.jid, &account.password);
    }
    let client = res_client.expect("Cann't init xmpp client");
    info!("xmpp initialized");
    // future to wait for online
    Self::online(client.split(), account)
    .and_then(Self::self_presence)
    .then(|r| match r {
    Ok(conn) => future::ok(future::Loop::Break(conn)),
    Err(acc) => future::ok(future::Loop::Continue(acc)),
    })
    }))
    }
    }
    /// get connection and wait for online status and set presence
    /// returns error if something went wrong
    fn online(
    (sink, stream): (stream::SplitSink<Client>, stream::SplitStream<Client>),
    account: config::Account,
    ) -> impl Future<Item = Self, Error = config::Account> {
    Box::new(future::loop_fn(
    (sink, stream, account),
    |(sink, stream, account)| {
    stream.into_future().then(|r| match r {
    Ok((event, stream)) => match event {
    Some(Event::Online) => {
    info!("Online");
    future::ok(future::Loop::Break(XmppConnection {
    account,
    inner: Some((sink, stream)),
    }))
    }
    Some(Event::Stanza(s)) => {
    info!("xmpp stanza: {:?}", s);
    future::ok(future::Loop::Continue((sink, stream, account)))
    }
    _ => {
    warn!("Disconnected");
    future::err(account)
    }
    },
    Err((e, _)) => {
    error!("xmpp receive error: {}", e);
    future::err(account)
    }
    })
    },
    ))
    }
    fn self_presence(self) -> impl Future<Item = Self, Error = config::Account> {
    let XmppConnection { account, inner } = self;
    if let Some((sink, stream)) = inner {
    future::ok(XmppConnection {
    account,
    inner: Some((sink, stream)),
    })
    } else {
    warn!("Don't gen connection on self-presence");
    future::err(account)
    }
    }
    }
    pub struct XmppCommand;
    struct XmppState<F> {
    cmd_recv: Receiver<XmppCommand>,
    signal: F,
    conn: XmppConnection,
    }
    impl<F> XmppState<F> {
    fn new(cmd_recv: Receiver<XmppCommand>, signal: F, conn: XmppConnection) -> XmppState<F> {
    XmppState {
    cmd_recv,
    signal,
    conn,
    }
    }
    }
    pub fn xmpp_process<F>(
    signal: F,
    cmd_recv: Receiver<XmppCommand>,
    account: config::Account,
    ) -> impl future::Future<Item = (), Error = tokio::io::Error>
    where
    F: future::Future<Item = ()> + 'static,
    {
    let signal = signal
    .map_err(|_| tokio::io::Error::new(tokio::io::ErrorKind::Other, "Wrong shutdown signal"));
    let conn = XmppConnection::new(account);
    future::loop_fn(XmppState::new(cmd_recv, signal, conn), |s| {
    let XmppState {
    cmd_recv,
    signal,
    conn,
    } = s;
    signal
    .select2(conn.connect().and_then(|conn| {
    info!("xmpp connected!");
    cmd_recv
    .into_future()
    .map_err(|_| {
    error!("Got error on recv cmd");
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "Receive cmd error")
    }).map(|f| (f, conn))
    })).then(|r| {
    match r {
    Ok(Either::A((_x, b))) => {
    info!("Got signal");
    // got signal, breaks
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    }
    Ok(Either::B((x, a))) => {
    info!("Got cmd");
    // got cmd, continue
    Box::new(future::ok(future::Loop::Continue(XmppState::new(
    (x.0).1,
    a,
    x.1,
    )))) as Box<Future<Item = _, Error = _>>
    }
    Err(Either::A((e, b))) => {
    // got signal error, breaks
    error!("Signal error: {}", e);
    Box::new(b.map(|b| future::Loop::Break((Some((b.0).1), b.1))))
    as Box<Future<Item = _, Error = _>>
    }
    Err(Either::B((e, _a))) => {
    // got cmd error, its bad
    error!("Cmd error: {}", e);
    Box::new(future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "Cmd error",
    ))) as Box<Future<Item = _, Error = _>>
    }
    }
    })
    }).and_then(
    |(opt_cmd_recv, _conn): (Option<Receiver<XmppCommand>>, XmppConnection)| {
    if let Some(mut cmd_recv) = opt_cmd_recv {
    // process left commands
    info!("Stop accepting commands");
    cmd_recv.close();
    Box::new(cmd_recv.for_each(|_cmd| future::ok(())).map_err(|_| {
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "cmd receiver last error")
    })) as Box<Future<Item = (), Error = tokio::io::Error>>
    } else {
    Box::new(future::err(tokio::io::Error::new(
    tokio::io::ErrorKind::Other,
    "cmd receiver gone",
    )))
    }
    },
    )
    }