use futures_util::future::Either;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use tokio_xmpp::SimpleClient;

use log::{error, info, warn};

use super::stanzas;

use super::element_processor;

use crate::config;

#[derive(Debug)]
pub enum XmppCommand {
    /// Send message to someone by jid
    Chat {
        xmpp_to: xmpp_parsers::Jid,
        message: String,
    },
    /// Send message to MUC
    Chatroom { muc_id: String, message: String },
    /// Send ping request. By default to the server to test connection.
    Ping {
        opt_xmpp_to: Option<xmpp_parsers::Jid>,
    },
    /// Set presence status
    Presence {
        show: xmpp_parsers::presence::Show,
        message: String,
    },
    /// Send presence status to MUC
    ChatroomPresence {
        muc_id: String,
        show: xmpp_parsers::presence::Show,
        message: String,
    },
    /// Check iq requests if some have expired timeouts
    TimeoutCleanup,
}

/// trait of processing iq
/// each function consumes handlers and
/// returns false if connection should be reset
trait IqHandler {
    /// process result
    fn result(
        self: Box<Self>,
        conn: &mut XmppConnection,
        opt_element: Option<xmpp_parsers::Element>,
    ) -> bool;

    /// process error
    fn error(
        self: Box<Self>,
        conn: &mut XmppConnection,
        error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool;

    /// process tmeout
    fn timeout(self: Box<Self>, conn: &mut XmppConnection) -> bool;
}

struct AddRosterIqHandler {
    jid: xmpp_parsers::BareJid,
}

impl IqHandler for AddRosterIqHandler {
    fn result(
        self: Box<Self>,
        conn: &mut XmppConnection,
        opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
        match opt_element {
            Some(element) => {
                warn!(
                    "Wrong payload when adding {} to roster: {}",
                    self.jid,
                    String::from(&element)
                );
            }
            None => {
                if conn.state.data.roster.contains_key(&self.jid) {
                    info!("Jid {} updated to roster", self.jid);
                    conn.process_jid(&self.jid);
                } else {
                    info!("Jid {} added in roster", self.jid);
                    conn.state.data.roster.insert(
                        self.jid.clone(),
                        (
                            xmpp_parsers::roster::Subscription::None,
                            xmpp_parsers::roster::Ask::None,
                        ),
                    );
                }
            }
        }
        true
    }

    fn error(
        self: Box<Self>,
        _conn: &mut XmppConnection,
        _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
        true
    }

    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
        true // ignore
    }
}

struct PingIqHandler {
    ignorable: bool,
}

impl IqHandler for PingIqHandler {
    fn result(
        self: Box<Self>,
        _conn: &mut XmppConnection,
        _opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
        info!("ping successed");
        true
    }

    fn error(
        self: Box<Self>,
        _conn: &mut XmppConnection,
        _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
        self.ignorable
    }

    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
        self.ignorable
    }
}

struct InitRosterIqHandler {}

impl IqHandler for InitRosterIqHandler {
    fn result(
        self: Box<Self>,
        conn: &mut XmppConnection,
        opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
        if let Some(result) = opt_element {
            match result.try_into() as Result<xmpp_parsers::roster::Roster, _> {
                Ok(roster) => {
                    conn.state.data.roster_init = true;
                    conn.state.data.roster.clear();
                    info!("Got first roster:");
                    for i in roster.items {
                        info!(" >>> {:?}", i);
                        conn.state
                            .data
                            .roster
                            .insert(i.jid, (i.subscription, i.ask));
                    }
                    true
                }
                Err(e) => {
                    error!("Cann't parse roster: {}", e);
                    false
                }
            }
        } else {
            error!("No roster responded");
            false
        }
    }

    fn error(
        self: Box<Self>,
        _conn: &mut XmppConnection,
        _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
        false
    }

    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
        false
    }
}

struct SelfDiscoveryIqHandler {}

impl IqHandler for SelfDiscoveryIqHandler {
    fn result(
        self: Box<Self>,
        conn: &mut XmppConnection,
        opt_element: Option<xmpp_parsers::Element>,
    ) -> bool {
        if let Some(result) = opt_element {
            match result.try_into() as Result<xmpp_parsers::disco::DiscoInfoResult, _> {
                Ok(self_discovery) => {
                    conn.state.data.self_discovery_init = true;
                    conn.state.data.self_pubsub_pep = false;
                    for i in self_discovery.identities {
                        if i.category == "pubsub" && i.type_ == "pep" {
                            conn.state.data.self_pubsub_pep = true;
                            break;
                        }
                    }
                    info!(
                        "Support XEP-0163: Personal Eventing Protocol: {}",
                        conn.state.data.self_pubsub_pep
                    );
                    true
                }
                Err(e) => {
                    error!("Cann't parse self discovery: {}", e);
                    false
                }
            }
        } else {
            error!("No self discovery");
            false
        }
    }

    fn error(
        self: Box<Self>,
        _conn: &mut XmppConnection,
        _error: xmpp_parsers::stanza_error::StanzaError,
    ) -> bool {
        false
    }

    fn timeout(self: Box<Self>, _conn: &mut XmppConnection) -> bool {
        false
    }
}

/// data for awaiting iq answer
struct IqWait {
    /// time of sending iq
    sent: Instant,
    /// timeout
    timeout: Duration,
    /// handler for iq processing
    handler: Box<dyn IqHandler>,
}

impl IqWait {
    pub fn new<T: IqHandler + 'static>(timeout_secs: u64, handler: T) -> IqWait {
        IqWait {
            sent: Instant::now(),
            timeout: Duration::from_secs(timeout_secs),
            handler: Box::new(handler),
        }
    }
}

#[derive(Default)]
struct XmppData {
    /// known roster data
    roster: HashMap<
        xmpp_parsers::BareJid,
        (
            xmpp_parsers::roster::Subscription,
            xmpp_parsers::roster::Ask,
        ),
    >,
    /// if roster was initialized
    /// ToDo: remove it as it is used only for initialization
    roster_init: bool,
    /// if self discovery was initialized
    /// ToDo: remove it as it is used only for initialization
    self_discovery_init: bool,
    /// if XEP-0163 Personal Eventing Protocol supported
    self_pubsub_pep: bool,
    /// if self-presence accepted
    /// ToDo: remove it as it is used only for initialization
    self_presence: bool,
    /// ids counter
    counter: usize,
    /// stanzas to send
    send_queue: VecDeque<minidom::Element>,
    /// outgoing mailbox: bare jid to message, fulljid, id
    outgoing_mailbox: HashMap<xmpp_parsers::BareJid, Vec<(String, xmpp_parsers::Jid, String)>>,
    /// muc id to muc jid
    mucs: HashMap<String, xmpp_parsers::Jid>,
    /// map from iq's id to handler of this type of iqs
    pending_ids: HashMap<String, IqWait>,
    /// domains needed to pinged, add here one for mucs, messages and pings
    used_domains: HashSet<String>,
    /// errored MUCs needed to re-enter
    error_mucs: HashSet<xmpp_parsers::Jid>,
}

struct XmppState {
    client: SimpleClient,
    data: XmppData,
}

pub struct XmppConnection {
    account: std::rc::Rc<config::Account>,
    state: XmppState,
}

trait IqRequestHandler {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq;
}

struct IqRequestUnknown {
    element: xmpp_parsers::Element,
    type_: &'static str,
}

impl IqRequestHandler for IqRequestUnknown {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        warn!(
            "Unsupported IQ {} request from {:?}: {}",
            self.type_,
            from,
            String::from(&self.element)
        );
        stanzas::make_iq_unsupported_error(id, conn.account.jid.clone(), from)
    }
}

struct IqSetRoster {}

impl IqRequestHandler for IqSetRoster {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got roster push {} from {:?}", id, from);
        stanzas::make_roster_push_answer(id, conn.account.jid.clone(), from)
    }
}

struct IqGetPing {}

impl IqRequestHandler for IqGetPing {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got ping {} from {:?}", id, from);
        stanzas::make_pong(id, conn.account.jid.clone(), from)
    }
}

struct IqGetVersion {}

impl IqRequestHandler for IqGetVersion {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got version query {} from {:?}", id, from);
        stanzas::make_version(id, conn.account.jid.clone(), from)
    }
}

struct IqGetDiscoInfo {}

impl IqRequestHandler for IqGetDiscoInfo {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got disco query {} from {:?}", id, from);
        stanzas::make_disco_info_result(id, conn.account.jid.clone(), from)
    }
}

struct IqGetDiscoItems {}

impl IqRequestHandler for IqGetDiscoItems {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got disco items query {} from {:?}", id, from);
        stanzas::make_disco_items_result(id, conn.account.jid.clone(), from)
    }
}

struct IqGetDiscoCommands {}

impl IqRequestHandler for IqGetDiscoCommands {
    fn process(
        self: Box<Self>,
        conn: &mut XmppConnection,
        id: String,
        from: Option<xmpp_parsers::Jid>,
    ) -> xmpp_parsers::iq::Iq {
        info!("Got disco commands query {} from {:?}", id, from);
        stanzas::make_disco_items_commands(id, conn.account.jid.clone(), from)
    }
}

lazy_static::lazy_static! {
    static ref INCOMING: element_processor::Processor<XmppConnection, bool, xmpp_parsers::Element> = {
        let mut incoming = element_processor::Processor::new(&|_, e| {
            warn!("Unknown stanza {}", String::from(&e));
            true
        });
        incoming.register(&XmppConnection::incoming_iq_processing);
        incoming.register(&XmppConnection::incoming_presence_processing);
        incoming.register(&XmppConnection::incoming_message_processing);
        incoming
    };
    static ref INCOMING_IQ_SET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {
        let mut iq_set =
            element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {
                Box::new(IqRequestUnknown {
                    element,
                    type_: "set",
                }) as Box<dyn IqRequestHandler>
            });
        iq_set.register(&XmppConnection::incoming_iq_processing_set_roster);
        iq_set
    };
    static ref INCOMING_IQ_GET: element_processor::Processor<XmppConnection, Box<dyn IqRequestHandler>, xmpp_parsers::Element> = {
        let mut iq_get =
            element_processor::Processor::new(&|_conn: &mut XmppConnection, element| {
                Box::new(IqRequestUnknown {
                    element,
                    type_: "get",
                }) as Box<dyn IqRequestHandler>
            });
        iq_get.register(&XmppConnection::incoming_iq_processing_get_ping);
        iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_info);
        iq_get.register(&XmppConnection::incoming_iq_processing_get_disco_items);
        iq_get.register(&XmppConnection::incoming_iq_processing_get_version);
        iq_get
    };
}

pub struct MaybeXmppConnection {
    account: std::rc::Rc<config::Account>,
    state: Option<XmppState>,
}

impl From<XmppConnection> for MaybeXmppConnection {
    fn from(from: XmppConnection) -> MaybeXmppConnection {
        MaybeXmppConnection {
            account: from.account,
            state: Some(from.state),
        }
    }
}

impl From<config::Account> for MaybeXmppConnection {
    fn from(from: config::Account) -> MaybeXmppConnection {
        MaybeXmppConnection {
            account: std::rc::Rc::new(from),
            state: None,
        }
    }
}

impl From<std::rc::Rc<config::Account>> for MaybeXmppConnection {
    fn from(from: std::rc::Rc<config::Account>) -> MaybeXmppConnection {
        MaybeXmppConnection {
            account: from,
            state: None,
        }
    }
}

impl MaybeXmppConnection {
    /// connects if nothing connected
    /// don't connect only if stop_future resolved
    pub async fn connect<F>(
        self,
        stop_future: F,
    ) -> Result<XmppConnection, (std::rc::Rc<config::Account>, failure::Error)>
    where
        F: std::future::Future<Output = ()> + Clone + 'static,
    {
        info!("xmpp check connection...");
        let MaybeXmppConnection { account, state } = self;

        if let Some(state) = state {
            Ok(XmppConnection { account, state })
        } else {
            loop {
                let pin_stop_future = stop_future.clone();
                tokio::pin!(pin_stop_future);
                let connect = futures_util::future::select(
                    pin_stop_future,
                    Box::pin(async {
                        info!("xmpp initialization...");
                        let client = match SimpleClient::new_with_jid(
                            account.jid.clone(),
                            account.password.clone(),
                        )
                        .await
                        {
                            Ok(client) => client,
                            Err(e) => {
                                error!("Initialization error: {}", e);
                                return (Err(account.clone()), Either::Right(()));
                            }
                        };

                        let stop_future = stop_future.clone();

                        let (connected, stopped) = XmppConnection {
                            state: XmppState {
                                client,
                                data: std::default::Default::default(),
                            },
                            account: account.clone(),
                        }
                        .initial_roster(Box::pin(stop_future))
                        .await;

                        let (connected, stopped) = match (connected, stopped) {
                            (Ok(connection), Either::Left(stop_future)) => {
                                connection.self_presence(stop_future).await
                            }
                            (connected, stopped) => (connected, stopped),
                        };

                        let (connected, stopped) = match (connected, stopped) {
                            (Ok(connection), Either::Left(stop_future)) => {
                                connection.enter_mucs(stop_future).await
                            }
                            (connected, stopped) => (connected, stopped),
                        };

                        match (connected, stopped) {
                            (Ok(connection), Either::Left(stop_future)) => {
                                connection.self_discovery(stop_future).await
                            }
                            (connected, stopped) => (connected, stopped),
                        }
                    }),
                )
                .await;
                match connect {
                    Either::Left((_, _)) => {
                        break Err((
                            account.clone(),
                            failure::format_err!("Stop XMPP connection"),
                        ))
                    }
                    Either::Right(((_, Either::Right(_)), _)) => {
                        break Err((
                            account.clone(),
                            failure::format_err!("Stop XMPP connection(2)"),
                        ))
                    }
                    Either::Right(((result, Either::Left(_)), _)) => match result {
                        Ok(connection) => break Ok(connection),
                        Err(_account) => {}
                    },
                }
            }
        }
    }
}

impl XmppConnection {
    /// base XMPP processing
    /// Returns false on error to disconnect
    fn xmpp_processing(
        &mut self,
        event: &Result<xmpp_parsers::Element, tokio_xmpp::Error>,
    ) -> bool {
        match event {
            Ok(stanza) => INCOMING.process(self, stanza.clone()),
            Err(error) => {
                error!("Unexpected event {:?}", error);
                false
            }
        }
    }

    /// Process roster push
    /// see RFC 6212 2.1.6. Roster Push
    fn incoming_iq_processing_set_roster(
        &mut self,
        roster: xmpp_parsers::roster::Roster,
    ) -> Box<dyn IqRequestHandler> {
        for i in roster.items {
            if let Some(ref mut rdata) = self.state.data.roster.get_mut(&i.jid) {
                info!("Update {} in roster", i.jid);
                rdata.0 = i.subscription;
                rdata.1 = i.ask;
            } else {
                info!("Add {} to roster", i.jid);
                self.state
                    .data
                    .roster
                    .insert(i.jid.clone(), (i.subscription, i.ask));
            }
        }
        Box::new(IqSetRoster {})
    }

    /// Enforce to answer to IQ "set"
    fn incoming_iq_processing_set(
        &mut self,
        id: String,
        from: Option<xmpp_parsers::Jid>,
        element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
        INCOMING_IQ_SET
            .process(self, element)
            .process(self, id, from)
    }

    /// Process ping request
    /// see XEP-0199: XMPP Ping
    fn incoming_iq_processing_get_ping(
        &mut self,
        _ping: xmpp_parsers::ping::Ping,
    ) -> Box<dyn IqRequestHandler> {
        Box::new(IqGetPing {})
    }

    /// Process disco query
    /// see XEP-0030: Service Discovery
    fn incoming_iq_processing_get_disco_info(
        &mut self,
        disco: xmpp_parsers::disco::DiscoInfoQuery,
    ) -> Box<dyn IqRequestHandler> {
        if let Some(ref node) = disco.node {
            warn!("Unsupported node {}", node);
            Box::new(IqRequestUnknown {
                element: disco.into(),
                type_: "get",
            })
        } else {
            Box::new(IqGetDiscoInfo {})
        }
    }

    /// Process disco items query
    /// see XEP-0030: Service Discovery
    fn incoming_iq_processing_get_disco_items(
        &mut self,
        disco: xmpp_parsers::disco::DiscoItemsQuery,
    ) -> Box<dyn IqRequestHandler> {
        match &disco.node {
            Some(node) if node == "http://jabber.org/protocol/commands" => {
                Box::new(IqGetDiscoCommands {})
            }
            Some(node) => {
                warn!("Unsupported node {}", node);
                Box::new(IqRequestUnknown {
                    element: disco.into(),
                    type_: "get",
                })
            }
            None => Box::new(IqGetDiscoItems {}),
        }
    }

    /// Process version query
    /// see XEP-0092: Software Version
    fn incoming_iq_processing_get_version(
        &mut self,
        _version: xmpp_parsers::version::VersionQuery,
    ) -> Box<dyn IqRequestHandler> {
        Box::new(IqGetVersion {})
    }

    /// Enforce to answer to IQ "get"
    fn incoming_iq_processing_get(
        &mut self,
        id: String,
        from: Option<xmpp_parsers::Jid>,
        element: minidom::Element,
    ) -> xmpp_parsers::iq::Iq {
        INCOMING_IQ_GET
            .process(self, element)
            .process(self, id, from)
    }

    fn incoming_iq_processing(&mut self, iq: xmpp_parsers::iq::Iq) -> bool {
        match iq.payload {
            xmpp_parsers::iq::IqType::Set(element) => {
                let iq_answer = self.incoming_iq_processing_set(iq.id, iq.from, element);
                self.state.data.send_queue.push_back(iq_answer.into());
            }
            xmpp_parsers::iq::IqType::Error(e) => {
                if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
                    return data.handler.error(self, e);
                }
                error!("iq error: {:?}", e);
                return false;
            }
            xmpp_parsers::iq::IqType::Get(element) => {
                let iq_answer = self.incoming_iq_processing_get(iq.id, iq.from, element);
                self.state.data.send_queue.push_back(iq_answer.into());
            }
            xmpp_parsers::iq::IqType::Result(opt_element) => {
                if let Some((_, data)) = self.state.data.pending_ids.remove_entry(&iq.id) {
                    let now = Instant::now();
                    if data.sent + data.timeout > now {
                        info!(
                            "Timeout for {} was {:?} expected {:?}",
                            iq.id,
                            now - data.sent,
                            data.timeout
                        );
                        return data.handler.result(self, opt_element);
                    } else {
                        warn!(
                            "Timeout for {} was {:?} expected {:?}",
                            iq.id,
                            now - data.sent,
                            data.timeout
                        );
                        return data.handler.timeout(self);
                    }
                }
                warn!(
                    "Unwanted iq result id {} from {:?}: {:?}",
                    iq.id,
                    iq.from,
                    opt_element.map(|e| String::from(&e))
                );
            }
        }
        true
    }

    fn incoming_presence_processing(&mut self, presence: xmpp_parsers::presence::Presence) -> bool {
        if presence.from.as_ref() == Some(&self.account.jid) {
            info!("Self-presence accepted");
            self.state.data.self_presence = true;
        } else {
            match presence.type_ {
                xmpp_parsers::presence::Type::Error => {
                    if presence.to.as_ref() == Some(&self.account.jid) {
                        if let Some(room) = self
                            .account
                            .chatrooms
                            .values()
                            .find(|&j| Some(j) == presence.from.as_ref())
                        {
                            error!(
                                "Got error from MUC {}. Try again later: {:?}",
                                room, presence
                            );
                            self.state.data.error_mucs.insert(room.clone());
                            return true;
                        }
                    }
                    error!("Incoming presence stanza error: {:?}", presence);
                    if let Some(from) = presence.from {
                        if let Some(ref mut mailbox) = self
                            .state
                            .data
                            .outgoing_mailbox
                            .remove(&from.clone().into())
                        {
                            if !mailbox.is_empty() {
                                error!(
                                    "Removed {} undeliverable messages for {}",
                                    mailbox.len(),
                                    from
                                );
                            }
                        }
                    }
                }
                xmpp_parsers::presence::Type::Unavailable => {
                    if presence.to.as_ref() == Some(&self.account.jid) {
                        if let Some(room) = self
                            .account
                            .chatrooms
                            .values()
                            .find(|&j| Some(j) == presence.from.as_ref())
                        {
                            warn!(
                                "Got disconnected from MUC {}. Try again: {:?}",
                                room, presence
                            );
                            self.state.data.counter += 1;
                            let id_muc_presence =
                                format!("id_muc_presence{}", self.state.data.counter);
                            let muc_presence = stanzas::make_muc_presence(
                                &id_muc_presence,
                                self.account.jid.clone(),
                                room.clone(),
                                None,
                                None,
                            );
                            self.state.data.send_queue.push_back(muc_presence);
                        } else {
                            warn!(
                                "Incoming Unavailable presence stanza to self: {:?}",
                                presence
                            );
                        }
                    } else {
                        warn!("Incoming Unavailable presence stanza: {:?}", presence);
                    }
                }
                _ => {
                    warn!("Incoming presence stanza: {:?}", presence);
                }
            }
        }
        true
    }

    fn incoming_message_processing(&mut self, message: xmpp_parsers::message::Message) -> bool {
        for payload in message.payloads.iter() {
            if let Some(_delay) =
                payload.clone().try_into().ok() as Option<xmpp_parsers::delay::Delay>
            {
                return true; // ignore delayed messages
            }
        }
        warn!("Incoming message stanza: {:?}", message);
        true
    }

    /// first data: return Ok(conn) if still connected, Err(account) if disconnected
    /// second data: true if stopped
    pub async fn processing<S, F, O>(
        mut self,
        mut stop_condition: S,
        stop_future: F,
        stop_fatal: bool,
    ) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, O>)
    where
        S: FnMut(&mut Self, Result<xmpp_parsers::Element, tokio_xmpp::Error>) -> Result<bool, ()>
            + 'static,
        F: std::future::Future<Output = O> + Unpin,
    {
        let mut stop_future = stop_future;
        loop {
            if let Some(send_element) = self.state.data.send_queue.pop_front() {
                info!("Sending {}", String::from(&send_element));
                let stop = {
                    let send_future = self.state.client.send_stanza(send_element);
                    tokio::pin!(send_future);
                    match futures_util::future::select(stop_future, send_future).await {
                        Either::Left((stop, send_future)) => {
                            if stop_fatal {
                                warn!("Sending interrupted!");
                            } else {
                                match send_future.await {
                                    Ok(_) => {
                                        info!("Sent!");
                                    }
                                    Err(e) => {
                                        error!("Send stanza error: {}", e);
                                        break (Err(self.account), Either::Right(stop));
                                    }
                                }
                            }
                            stop
                        }
                        Either::Right((Ok(_), f)) => {
                            info!("Sent!");
                            stop_future = f;
                            continue;
                        }
                        Either::Right((Err(e), f)) => {
                            error!("Send stanza error: {}", e);
                            break (Err(self.account), Either::Left(f));
                        }
                    }
                };
                break (Ok(self), Either::Right(stop));
            } else {
                use futures_util::StreamExt;

                let recv_future = self.state.client.next();
                tokio::pin!(recv_future);
                match futures_util::future::select(stop_future, recv_future).await {
                    Either::Left((stop, _s)) => break (Ok(self), Either::Right(stop)),
                    Either::Right((None, f)) => {
                        break (Err(self.account), Either::Left(f));
                    }
                    Either::Right((Some(event), f)) => {
                        stop_future = f;
                        if self.xmpp_processing(&event) {
                            match stop_condition(&mut self, event) {
                                Ok(true) => break (Ok(self), Either::Left(stop_future)),
                                Ok(false) => {}
                                Err(_) => break (Err(self.account), Either::Left(stop_future)),
                            }
                        } else {
                            break (Err(self.account), Either::Left(stop_future));
                        }
                    }
                }
            }
        }
    }

    async fn initial_roster<F>(
        mut self,
        stop_future: F,
    ) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
    where
        F: std::future::Future<Output = ()> + Unpin,
    {
        self.state.data.counter += 1;
        let id_init_roster = format!("id_init_roster{}", self.state.data.counter);
        let get_roster = stanzas::make_get_roster(&id_init_roster);
        info!("Quering roster... {}", String::from(&get_roster));
        self.state.data.pending_ids.insert(
            id_init_roster.clone(),
            IqWait::new(60, InitRosterIqHandler {}),
        );
        let stop_future = {
            let send_future = self.state.client.send_stanza(get_roster);
            tokio::pin!(send_future);
            match futures_util::future::select(stop_future, send_future).await {
                Either::Left(((), _)) => None,
                Either::Right((Ok(_), stop_future)) => Some(stop_future),
                Either::Right((Err(e), f)) => {
                    error!("Send initial roster stanza error: {}", e);
                    return (Err(self.account), Either::Left(f));
                }
            }
        };
        if let Some(stop_future) = stop_future {
            self.processing(
                move |conn, _| Ok(conn.state.data.roster_init),
                stop_future,
                true,
            )
            .await
        } else {
            (Ok(self), Either::Right(()))
        }
    }

    async fn self_presence<F>(
        mut self,
        stop_future: F,
    ) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
    where
        F: std::future::Future<Output = ()> + Unpin,
    {
        self.state.data.counter += 1;

        let id_presence = format!("id_init_presence{}", self.state.data.counter);
        let presence = stanzas::make_presence(id_presence, None, "Online!".to_string());
        info!("Sending presence... {}", String::from(&presence));
        let stop_future = {
            let send_future = self.state.client.send_stanza(presence);
            tokio::pin!(send_future);
            match futures_util::future::select(stop_future, send_future).await {
                Either::Left(((), _send_future)) => None,
                Either::Right((Ok(_), stop_future)) => Some(stop_future),
                Either::Right((Err(e), f)) => {
                    error!("Send self-presence stanza error: {}", e);
                    return (Err(self.account), Either::Left(f));
                }
            }
        };
        if let Some(stop_future) = stop_future {
            self.processing(
                move |conn, _| Ok(conn.state.data.self_presence),
                stop_future,
                true,
            )
            .await
        } else {
            (Ok(self), Either::Right(()))
        }
    }

    pub fn process_command(mut self, cmd: XmppCommand) -> MaybeXmppConnection {
        match cmd {
            XmppCommand::Presence { show, message } => {
                self.state.data.counter += 1;
                let id_presence = format!("id_presence{}", self.state.data.counter);
                self.state.data.send_queue.push_back(stanzas::make_presence(
                    id_presence,
                    Some(show),
                    message,
                ));
            }
            XmppCommand::Ping { opt_xmpp_to } => {
                let ignorable = opt_xmpp_to.is_some();
                self.state.data.counter += 1;
                let id_ping = format!("id_ping{}", self.state.data.counter);
                let ping = stanzas::make_ping(
                    &id_ping,
                    self.state.client.bound_jid().clone(),
                    opt_xmpp_to.clone(),
                );
                self.state.data.send_queue.push_back(ping);
                self.state
                    .data
                    .pending_ids
                    .insert(id_ping, IqWait::new(30, PingIqHandler { ignorable }));
                if opt_xmpp_to.is_none() {
                    for domains in &self.state.data.used_domains {
                        self.state.data.counter += 1;
                        let id_ping = format!("id_ping{}", self.state.data.counter);
                        let ping = stanzas::make_ping(
                            &id_ping,
                            self.state.client.bound_jid().clone(),
                            Some(xmpp_parsers::Jid::Bare(xmpp_parsers::BareJid::domain(
                                domains,
                            ))),
                        );
                        self.state.data.send_queue.push_back(ping);
                        self.state
                            .data
                            .pending_ids
                            .insert(id_ping, IqWait::new(30, PingIqHandler { ignorable: true }));
                    }
                    for muc in self.state.data.error_mucs.drain() {
                        self.state.data.counter += 1;
                        let id_muc_presence = format!("id_muc_presence{}", self.state.data.counter);
                        let muc_presence = stanzas::make_muc_presence(
                            &id_muc_presence,
                            self.account.jid.clone(),
                            muc,
                            None,
                            None,
                        );
                        self.state.data.send_queue.push_back(muc_presence);
                    }
                }
                if let Some(xmpp_to) = opt_xmpp_to {
                    self.add_domain_to_ping(xmpp_to.into());
                }
            }
            XmppCommand::TimeoutCleanup => {
                let now = Instant::now();
                let timeouted: Vec<String> = self
                    .state
                    .data
                    .pending_ids
                    .iter()
                    .filter_map(|(id, data)| {
                        if now >= data.sent + data.timeout {
                            warn!(
                                "Timeout for {} was {:?} expected {:?}",
                                id,
                                now - data.sent,
                                data.timeout
                            );
                            Some(id.to_string())
                        } else {
                            None
                        }
                    })
                    .collect();
                let mut correct = true;
                timeouted.into_iter().for_each(|id| {
                    if let Some(data) = self.state.data.pending_ids.remove(&id) {
                        correct &= data.handler.timeout(&mut self);
                    }
                })
            }
            XmppCommand::Chat { xmpp_to, message } => {
                self.state.data.counter += 1;
                let id_send_message = format!("id_send_message{}", self.state.data.counter);
                let bare_xmpp_to: xmpp_parsers::BareJid = xmpp_to.clone().into();
                self.add_domain_to_ping(bare_xmpp_to.clone());
                self.state
                    .data
                    .outgoing_mailbox
                    .entry(bare_xmpp_to.clone())
                    .or_default()
                    .push((message, xmpp_to, id_send_message));
                self.process_jid(&bare_xmpp_to);
            }
            XmppCommand::Chatroom { muc_id, message } => {
                if let Some(muc) = self.state.data.mucs.get(&muc_id) {
                    self.state
                        .data
                        .send_queue
                        .push_back(stanzas::make_muc_message(muc.clone(), message));
                } else {
                    error!("Not found MUC {}", muc_id);
                }
            }
            XmppCommand::ChatroomPresence {
                muc_id,
                show,
                message,
            } => {
                if let Some(muc) = self.state.data.mucs.get(&muc_id) {
                    self.state.data.counter += 1;
                    let id_presence = format!("id_presence{}", self.state.data.counter);
                    self.state
                        .data
                        .send_queue
                        .push_back(stanzas::make_muc_presence(
                            &id_presence,
                            self.account.jid.clone(),
                            muc.clone(),
                            Some(show),
                            Some(message),
                        ));
                } else {
                    error!("Not found MUC {}", muc_id);
                }
            }
        }
        self.into()
    }

    fn process_jid(&mut self, xmpp_to: &xmpp_parsers::BareJid) {
        if let Some(ref mut mailbox) = self.state.data.outgoing_mailbox.get_mut(xmpp_to) {
            if !mailbox.is_empty() {
                if let Some(ref mut rdata) = self.state.data.roster.get_mut(xmpp_to) {
                    info!("Jid {} in roster", xmpp_to);
                    let sub_to = matches!(
                        rdata.0,
                        xmpp_parsers::roster::Subscription::To
                            | xmpp_parsers::roster::Subscription::Both
                    );
                    if sub_to {
                        info!("Subscribed to {}", xmpp_to);
                        self.state
                            .data
                            .send_queue
                            .extend(mailbox.drain(..).map(|message| {
                                stanzas::make_chat_message(message.2, message.1, message.0)
                            }));
                    } else if rdata.1 == xmpp_parsers::roster::Ask::None {
                        info!("Not subscribed to {}", xmpp_to);
                        self.state.data.counter += 1;
                        let id_ask_subscribe =
                            format!("id_ask_subscribe{}", self.state.data.counter);
                        self.state
                            .data
                            .send_queue
                            .push_back(stanzas::make_ask_subscribe(
                                id_ask_subscribe,
                                xmpp_to.clone().into(),
                            ));
                    } else {
                        warn!(
                            "Unsupported subscription state {:?} to {}. Try to send messages",
                            rdata, xmpp_to
                        );
                        self.state
                            .data
                            .send_queue
                            .extend(mailbox.drain(..).map(|message| {
                                stanzas::make_chat_message(message.2, message.1, message.0)
                            }));
                        self.state.data.counter += 1;
                        let id_ask_subscribe =
                            format!("id_ask_subscribe{}", self.state.data.counter);
                        self.state
                            .data
                            .send_queue
                            .push_back(stanzas::make_ask_subscribe(
                                id_ask_subscribe,
                                xmpp_to.clone().into(),
                            ));
                    }
                    let sub_from = matches!(
                        rdata.0,
                        xmpp_parsers::roster::Subscription::From
                            | xmpp_parsers::roster::Subscription::Both
                    );
                    if !sub_from {
                        info!("Not subscription from {}", xmpp_to);
                        self.state.data.counter += 1;
                        let id_allow_subscribe =
                            format!("id_allow_subscribe{}", self.state.data.counter);
                        self.state
                            .data
                            .send_queue
                            .push_back(stanzas::make_allow_subscribe(
                                id_allow_subscribe,
                                xmpp_to.clone().into(),
                            ));
                    }
                } else {
                    info!("Jid {} not in roster", xmpp_to);
                    self.state.data.counter += 1;
                    let id_add_roster = format!("id_add_roster{}", self.state.data.counter);
                    let add_roster = stanzas::make_add_roster(&id_add_roster, xmpp_to.clone());
                    info!("Adding jid {} to roster id {}", xmpp_to, id_add_roster);
                    self.state.data.pending_ids.insert(
                        id_add_roster,
                        IqWait::new(
                            60,
                            AddRosterIqHandler {
                                jid: xmpp_to.clone(),
                            },
                        ),
                    );
                    self.state.data.send_queue.push_back(add_roster);
                }
            }
        }
    }

    async fn enter_mucs<F>(
        mut self,
        mut stop_future: F,
    ) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
    where
        F: std::future::Future<Output = ()> + Unpin,
    {
        let mut domains: Vec<xmpp_parsers::BareJid> = vec![];
        for chatroom in &self.account.chatrooms {
            self.state.data.counter += 1;
            let id_muc_presence = format!("id_muc_presence{}", self.state.data.counter);
            let muc_presence = stanzas::make_muc_presence(
                &id_muc_presence,
                self.account.jid.clone(),
                chatroom.1.clone(),
                None,
                None,
            );
            info!("Sending muc presence... {}", String::from(&muc_presence));
            let opt_stop_future = {
                let send_future = self.state.client.send_stanza(muc_presence);
                tokio::pin!(send_future);
                match futures_util::future::select(stop_future, send_future).await {
                    Either::Left(((), _)) => None,
                    Either::Right((Ok(_), stop_future)) => {
                        self.state
                            .data
                            .mucs
                            .insert(chatroom.0.clone(), chatroom.1.clone());
                        domains.push(chatroom.1.clone().into());
                        Some(stop_future)
                    }
                    Either::Right((Err(e), f)) => {
                        error!("Send muc presence stanza error: {}", e);
                        return (Err(self.account), Either::Left(f));
                    }
                }
            };
            if let Some(f) = opt_stop_future {
                stop_future = f;
            } else {
                return (Ok(self), Either::Right(()));
            }
        }
        domains.into_iter().for_each(|d| self.add_domain_to_ping(d));
        (Ok(self), Either::Left(stop_future))
    }

    async fn self_discovery<F>(
        mut self,
        mut stop_future: F,
    ) -> (Result<Self, std::rc::Rc<config::Account>>, Either<F, ()>)
    where
        F: std::future::Future<Output = ()> + Unpin,
    {
        self.state.data.counter += 1;
        let id_self_discovery = format!("id_self_discovery{}", self.state.data.counter);
        let self_discovery = stanzas::make_disco_get(
            id_self_discovery.clone(),
            Some(self.account.jid.clone()),
            Some(xmpp_parsers::BareJid::from(self.account.jid.clone()).into()),
        );
        self.state.data.pending_ids.insert(
            id_self_discovery,
            IqWait::new(60, SelfDiscoveryIqHandler {}),
        );
        info!(
            "Sending self discovery... {}",
            String::from(&self_discovery)
        );
        let opt_stop_future = {
            let send_future = self.state.client.send_stanza(self_discovery);
            tokio::pin!(send_future);
            match futures_util::future::select(stop_future, send_future).await {
                Either::Left(((), _)) => None,
                Either::Right((Ok(_), stop_future)) => Some(stop_future),
                Either::Right((Err(e), f)) => {
                    error!("Send self discovery error: {}", e);
                    return (Err(self.account), Either::Left(f));
                }
            }
        };
        if let Some(f) = opt_stop_future {
            stop_future = f;
        } else {
            return (Ok(self), Either::Right(()));
        }
        (Ok(self), Either::Left(stop_future))
    }

    fn add_domain_to_ping(&mut self, jid: xmpp_parsers::BareJid) {
        if jid.domain != (*self.account).jid.clone().domain() {
            self.state.data.used_domains.insert(jid.domain);
        }
    }
}