use std::future::Future;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::ready;
use http::StatusCode;
use http_body::Body;
use pin_project_lite::pin_project;
use serde::{de, Deserialize};
use crate::error::{Error, ErrorCode, TwitterErrors};
use crate::traits::HttpTryFuture;
use crate::util::ConcatBody;
use crate::RateLimit;
#[derive(Debug)]
pub struct Response<T> {
pub data: T,
pub rate_limit: Option<RateLimit>,
}
pin_project! {
pub struct ResponseFuture<T, F: HttpTryFuture> {
#[pin]
raw: RawResponseFuture<F>,
marker: PhantomData<fn() -> T>,
}
}
pin_project! {
pub struct RawResponseFuture<F: HttpTryFuture> {
#[pin]
inner: ResponseFutureInner<F>,
}
}
pin_project! {
#[project = ResponseFutureInnerProj]
enum ResponseFutureInner<F: HttpTryFuture> {
Resp {
#[pin]
response: F,
},
Body {
status: StatusCode,
rate_limit: Option<RateLimit>,
#[pin]
body: ConcatBody<F::Body>,
},
}
}
impl<T> Deref for Response<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T, F: HttpTryFuture> From<RawResponseFuture<F>> for ResponseFuture<T, F> {
fn from(raw: RawResponseFuture<F>) -> Self {
ResponseFuture {
raw,
marker: PhantomData,
}
}
}
impl<T: de::DeserializeOwned, F: HttpTryFuture> Future for ResponseFuture<T, F> {
#[allow(clippy::type_complexity)]
type Output = Result<Response<T>, Error<F::Error, <F::Body as Body>::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.project().raw.poll(cx)?);
let data = serde_json::from_slice(&res.data).map_err(Error::Deserializing)?;
Poll::Ready(Ok(Response {
data,
rate_limit: res.rate_limit,
}))
}
}
impl<F: HttpTryFuture> RawResponseFuture<F> {
pub(crate) fn new(response: F) -> Self
where
F: HttpTryFuture,
{
RawResponseFuture {
inner: ResponseFutureInner::Resp { response },
}
}
}
impl<T, F: HttpTryFuture> From<ResponseFuture<T, F>> for RawResponseFuture<F> {
fn from(future: ResponseFuture<T, F>) -> Self {
future.raw
}
}
impl<F: HttpTryFuture> Future for RawResponseFuture<F> {
#[allow(clippy::type_complexity)]
type Output = Result<Response<Bytes>, Error<F::Error, <F::Body as Body>::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project()
{
let res = ready!(response.try_poll(cx).map_err(Error::Service))?;
self.as_mut()
.project()
.inner
.set(ResponseFutureInner::Body {
status: res.status(),
rate_limit: RateLimit::from_headers(res.headers()),
body: ConcatBody::new(res.into_body()),
});
}
if let ResponseFutureInnerProj::Body {
status,
rate_limit,
body,
} = self.project().inner.project()
{
let status = *status;
let rate_limit = *rate_limit;
let body = ready!(body.poll(cx).map_err(Error::Body))?;
let result = if let StatusCode::OK = status {
Ok(Response {
data: body,
rate_limit,
})
} else {
#[derive(Default, Deserialize)]
struct Errors {
errors: Vec<ErrorCode>,
}
serde_json::from_slice(&body)
.or_else(|_| Ok(Errors::default()))
.and_then(|errors| {
Err(Error::Twitter(TwitterErrors {
status,
errors: errors.errors,
rate_limit,
}))
})
};
Poll::Ready(result)
} else {
unreachable!();
}
}
}