OX4PDDAAYMNRVC2HNEILNGDSHPQPPOMNBIBQCRW7NE6QFKFKB4XAC use std::future::Future;use std::marker::PhantomData;use std::ops::Deref;use std::pin::Pin;use std::task::{Context, Poll};use futures_core::ready;use http::StatusCode;use http_body::Body;use pin_project_lite::pin_project;use serde::{de, Deserialize};use tower::util::{Oneshot, ServiceExt};use crate::error::{Error, ErrorCode, TwitterErrors};use crate::service::{HttpService, IntoService};use crate::util::{ConcatBody, Seal};use crate::RateLimit;#[derive(Debug)]pub struct Response<T> {pub data: T,pub rate_limit: Option<RateLimit>,}pin_project! {pub struct ResponseFuture<T, S: HttpService<B>, B> {#[pin]inner: ResponseFutureInner<S, B>,marker: PhantomData<fn() -> T>,}}pin_project! {#[project = ResponseFutureInnerProj]enum ResponseFutureInner<S: HttpService<B>, B> {Resp {#[pin]response: Oneshot<IntoService<S>, http::Request<B>>,},Body {status: StatusCode,rate_limit: Option<RateLimit>,#[pin]body: ConcatBody<S::ResponseBody>,},}}impl<T> Deref for Response<T> {type Target = T;fn deref(&self) -> &T {&self.data}}impl<T: de::DeserializeOwned, S, B> ResponseFuture<T, S, B>whereS: HttpService<B>,{pub(crate) fn new(req: http::Request<B>, http: S) -> Self {ResponseFuture {inner: ResponseFutureInner::Resp {response: http._into_service(Seal).oneshot(req),},marker: PhantomData,}}}impl<T: de::DeserializeOwned, S, B> Future for ResponseFuture<T, S, B>whereS: HttpService<B>,{#[allow(clippy::type_complexity)]type Output = Result<Response<T>, Error<S::Error, <S::ResponseBody as Body>::Error>>;fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project(){let res = ready!(response.poll(cx).map_err(Error::Service))?;self.as_mut().project().inner.set(ResponseFutureInner::Body {status: res.status(),rate_limit: RateLimit::from_headers(res.headers()),body: ConcatBody::new(res.into_body()),});}if let ResponseFutureInnerProj::Body {status,rate_limit,body,} = self.project().inner.project(){let body = ready!(body.poll(cx).map_err(Error::Body))?;Poll::Ready(make_response(*status, *rate_limit, &body, |body| {serde_json::from_slice(body).map_err(Error::Deserializing)}))} else {unreachable!();}}}pub(crate) fn make_response<T, F, SE, BE>(status: StatusCode,rate_limit: Option<RateLimit>,body: &[u8],parse: F,) -> Result<Response<T>, Error<SE, BE>>whereF: FnOnce(&[u8]) -> Result<T, Error<SE, BE>>,{if let StatusCode::OK = status {parse(body).map(|data| Response { data, rate_limit })} else {#[derive(Default, Deserialize)]struct Errors {errors: Vec<ErrorCode>,}serde_json::from_slice(body).or_else(|_| Ok(Errors::default())).and_then(|errors| {Err(Error::Twitter(TwitterErrors {status,errors: errors.errors,rate_limit,}))})}}
use pin_project_lite::pin_project;use serde::{de, Deserialize};use tower::util::{Oneshot, ServiceExt};use crate::service::{HttpService, IntoService};use crate::util::{ConcatBody, Seal};#[derive(Deserialize)]pub struct AccessToken {#[serde(flatten)]pub credentials: Credentials,pub user_id: i64,}#[derive(Debug)]pub struct Response<T> {pub data: T,pub rate_limit: Option<RateLimit>,}pin_project! {pub struct ResponseFuture<T, S: HttpService<B>, B> {#[pin]inner: ResponseFutureInner<S, B>,marker: PhantomData<fn() -> T>,}}
use serde::de;
pin_project! {#[project = ResponseFutureInnerProj]enum ResponseFutureInner<S: HttpService<B>, B> {Resp {#[pin]response: Oneshot<IntoService<S>, http::Request<B>>,},Body {status: StatusCode,rate_limit: Option<RateLimit>,#[pin]body: ConcatBody<S::ResponseBody>,},}}
use crate::response::ResponseFuture;use crate::service::HttpService;
pub const RATE_LIMIT_LIMIT: &str = "x-rate-limit-limit";pub const RATE_LIMIT_REMAINING: &str = "x-rate-limit-remaining";pub const RATE_LIMIT_RESET: &str = "x-rate-limit-reset";
impl RateLimit {fn from_headers(headers: &http::HeaderMap) -> Option<Self> {pub const RATE_LIMIT_LIMIT: &str = "x-rate-limit-limit";pub const RATE_LIMIT_REMAINING: &str = "x-rate-limit-remaining";pub const RATE_LIMIT_RESET: &str = "x-rate-limit-reset";
impl<T> Deref for Response<T> {type Target = T;fn deref(&self) -> &T {&self.data}}impl<T: de::DeserializeOwned, S, B> ResponseFuture<T, S, B>whereS: HttpService<B>,{fn new(req: http::Request<B>, http: S) -> Self {ResponseFuture {inner: ResponseFutureInner::Resp {response: http._into_service(Seal).oneshot(req),},marker: PhantomData,
fn header(headers: &http::HeaderMap, name: &str) -> Option<u64> {headers.get(name).and_then(|value| atoi::atoi(value.as_bytes()))
impl<T: de::DeserializeOwned, S, B> Future for ResponseFuture<T, S, B>whereS: HttpService<B>,{#[allow(clippy::type_complexity)]type Output = Result<Response<T>, Error<S::Error, <S::ResponseBody as Body>::Error>>;fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project(){let res = ready!(response.poll(cx).map_err(Error::Service))?;self.as_mut().project().inner.set(ResponseFutureInner::Body {status: res.status(),rate_limit: rate_limit(&res),body: ConcatBody::new(res.into_body()),});}if let ResponseFutureInnerProj::Body {status,rate_limit,body,} = self.project().inner.project(){let body = ready!(body.poll(cx).map_err(Error::Body))?;Poll::Ready(make_response(*status, *rate_limit, &body, |body| {serde_json::from_slice(body).map_err(Error::Deserializing)}))} else {unreachable!();}}}impl<SE: error::Error + 'static, BE: error::Error + 'static> error::Error for Error<SE, BE> {fn source(&self) -> Option<&(dyn error::Error + 'static)> {match *self {Error::Deserializing(ref e) => Some(e),Error::Service(ref e) => Some(e),Error::Body(ref e) => Some(e),Error::Twitter(ref e) => Some(e),Error::Unexpected => None,}
Some(RateLimit {limit: header(headers, RATE_LIMIT_LIMIT)?,remaining: header(headers, RATE_LIMIT_REMAINING)?,reset: header(headers, RATE_LIMIT_RESET)?,})
impl<SE: Display, BE: Display> Display for Error<SE, BE> {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {match *self {Error::Deserializing(_) => f.write_str("Failed to deserialize the response body."),Error::Service(_) => f.write_str("HTTP error"),Error::Body(_) => f.write_str("Error while reading the response body"),Error::Twitter(_) => f.write_str("Twitter returned error(s)"),Error::Unexpected => f.write_str("Unexpected error occured."),}}}impl TwitterErrors {pub fn codes(&self) -> impl Iterator<Item = u32> + '_ {self.errors.iter().map(|e| e.code)}}impl Display for TwitterErrors {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {write!(f, "status: {}", self.status)?;let mut errors = self.errors.iter();if let Some(e) = errors.next() {write!(f, "; errors: {}", e)?;for e in errors {write!(f, ", {}", e)?;}}Ok(())}}impl error::Error for TwitterErrors {}impl ErrorCode {pub const YOU_ARENT_ALLOWED_TO_ADD_MEMBERS_TO_THIS_LIST: u32 = 104;pub const CANNOT_FIND_SPECIFIED_USER: u32 = 108;pub const NO_STATUS_FOUND_WITH_THAT_ID: u32 = 144;pub const YOU_HAVE_ALREADY_RETWEETED_THIS_TWEET: u32 = 327;}impl Display for ErrorCode {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {write!(f, "{} {}", self.code, self.message)}}pub async fn request_token<S, B>(client_credentials: Credentials<&str>,client: S,) -> Result<Response<Credentials<Box<str>>>, Error<S::Error, <S::ResponseBody as Body>::Error>>whereS: HttpService<B>,B: Default + From<Vec<u8>>,{const URI: &str = "https://api.twitter.com/oauth/request_token";let authorization = oauth::Builder::<_, _>::new(client_credentials, oauth::HmacSha1).callback("oob").post(URI, &());let req = http::Request::post(Uri::from_static(URI)).header(AUTHORIZATION, authorization).body(Default::default()).unwrap();let res = client._into_service(Seal).oneshot(req).await.map_err(Error::Service)?;let status = res.status();let rate_limit = rate_limit(&res);let body = ConcatBody::new(res.into_body()).await.map_err(Error::Body)?;make_response(status, rate_limit, &body, |body| {serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)})}pub async fn access_token<'a, S, B>(oauth_verifier: &'a str,client_credentials: Credentials<&'a str>,temporary_credentials: Credentials<&'a str>,client: S,) -> Result<Response<AccessToken>, Error<S::Error, <S::ResponseBody as Body>::Error>>whereS: HttpService<B>,B: Default + From<Vec<u8>>,{const URI: &str = "https://api.twitter.com/oauth/access_token";let authorization = oauth::Builder::new(client_credentials, oauth::HmacSha1).token(temporary_credentials).verifier(oauth_verifier).post(URI, &());let req = http::Request::post(Uri::from_static(URI)).header(AUTHORIZATION, authorization).body(Default::default()).unwrap();let res = client._into_service(Seal).oneshot(req).await.map_err(Error::Service)?;let status = res.status();let rate_limit = rate_limit(&res);let body = ConcatBody::new(res.into_body()).await.map_err(Error::Body)?;make_response(status, rate_limit, &body, |body| {serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)})}fn make_response<T, F, SE, BE>(status: StatusCode,rate_limit: Option<RateLimit>,body: &[u8],parse: F,) -> Result<Response<T>, Error<SE, BE>>whereF: FnOnce(&[u8]) -> Result<T, Error<SE, BE>>,{if let StatusCode::OK = status {parse(body).map(|data| Response { data, rate_limit })} else {#[derive(Default, Deserialize)]struct Errors {errors: Vec<ErrorCode>,}serde_json::from_slice(body).or_else(|_| Ok(Errors::default())).and_then(|errors| {Err(Error::Twitter(TwitterErrors {status,errors: errors.errors,rate_limit,}))})}}
}fn rate_limit<T>(res: &http::Response<T>) -> Option<RateLimit> {Some(RateLimit {limit: header(res, RATE_LIMIT_LIMIT)?,remaining: header(res, RATE_LIMIT_REMAINING)?,reset: header(res, RATE_LIMIT_RESET)?,})}fn header<T>(res: &http::Response<T>, name: &str) -> Option<u64> {res.headers().get(name).and_then(|value| atoi::atoi(value.as_bytes()))
use std::error;use std::fmt::{self, Display, Formatter};use http::StatusCode;use serde::Deserialize;#[derive(Debug)]pub enum Error<SE, BE> {Deserializing(serde_json::Error),Service(SE),Body(BE),Twitter(TwitterErrors),Unexpected,}#[derive(Debug)]pub struct TwitterErrors {pub status: StatusCode,pub errors: Vec<ErrorCode>,pub rate_limit: Option<crate::RateLimit>,}#[derive(Debug, Deserialize)]pub struct ErrorCode {pub code: u32,pub message: String,}impl<SE: error::Error + 'static, BE: error::Error + 'static> error::Error for Error<SE, BE> {fn source(&self) -> Option<&(dyn error::Error + 'static)> {match *self {Error::Deserializing(ref e) => Some(e),Error::Service(ref e) => Some(e),Error::Body(ref e) => Some(e),Error::Twitter(ref e) => Some(e),Error::Unexpected => None,}}}impl<SE: Display, BE: Display> Display for Error<SE, BE> {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {match *self {Error::Deserializing(_) => f.write_str("Failed to deserialize the response body."),Error::Service(_) => f.write_str("HTTP error"),Error::Body(_) => f.write_str("Error while reading the response body"),Error::Twitter(_) => f.write_str("Twitter returned error(s)"),Error::Unexpected => f.write_str("Unexpected error occured."),}}}impl TwitterErrors {pub fn codes(&self) -> impl Iterator<Item = u32> + '_ {self.errors.iter().map(|e| e.code)}}impl Display for TwitterErrors {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {write!(f, "status: {}", self.status)?;let mut errors = self.errors.iter();if let Some(e) = errors.next() {write!(f, "; errors: {}", e)?;for e in errors {write!(f, ", {}", e)?;}}Ok(())}}impl error::Error for TwitterErrors {}impl ErrorCode {pub const YOU_ARENT_ALLOWED_TO_ADD_MEMBERS_TO_THIS_LIST: u32 = 104;pub const CANNOT_FIND_SPECIFIED_USER: u32 = 108;pub const NO_STATUS_FOUND_WITH_THAT_ID: u32 = 144;pub const YOU_HAVE_ALREADY_RETWEETED_THIS_TWEET: u32 = 327;}impl Display for ErrorCode {fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {write!(f, "{} {}", self.code, self.message)}}
use http::header::AUTHORIZATION;use http::Uri;use http_body::Body;use oauth_credentials::Credentials;use serde::Deserialize;use tower::util::ServiceExt;use crate::error::Error;use crate::response;use crate::service::HttpService;use crate::util::{ConcatBody, Seal};use crate::{RateLimit, Response};#[derive(Deserialize)]pub struct AccessToken {#[serde(flatten)]pub credentials: Credentials,pub user_id: i64,}pub async fn request_token<S, B>(client_credentials: Credentials<&str>,client: S,) -> Result<Response<Credentials<Box<str>>>, Error<S::Error, <S::ResponseBody as Body>::Error>>whereS: HttpService<B>,B: Default + From<Vec<u8>>,{const URI: &str = "https://api.twitter.com/oauth/request_token";let authorization = oauth::Builder::<_, _>::new(client_credentials, oauth::HmacSha1).callback("oob").post(URI, &());let req = http::Request::post(Uri::from_static(URI)).header(AUTHORIZATION, authorization).body(Default::default()).unwrap();let res = client._into_service(Seal).oneshot(req).await.map_err(Error::Service)?;let status = res.status();let rate_limit = RateLimit::from_headers(res.headers());let body = ConcatBody::new(res).await.map_err(Error::Body)?;response::make_response(status, rate_limit, &body, |body| {serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)})}pub async fn access_token<'a, S, B>(oauth_verifier: &'a str,client_credentials: Credentials<&'a str>,temporary_credentials: Credentials<&'a str>,client: S,) -> Result<Response<AccessToken>, Error<S::Error, <S::ResponseBody as Body>::Error>>whereS: HttpService<B>,B: Default + From<Vec<u8>>,{const URI: &str = "https://api.twitter.com/oauth/access_token";let authorization = oauth::Builder::new(client_credentials, oauth::HmacSha1).token(temporary_credentials).verifier(oauth_verifier).post(URI, &());let req = http::Request::post(Uri::from_static(URI)).header(AUTHORIZATION, authorization).body(Default::default()).unwrap();let res = client._into_service(Seal).oneshot(req).await.map_err(Error::Service)?;let status = res.status();let rate_limit = RateLimit::from_headers(res.headers());let body = ConcatBody::new(res).await.map_err(Error::Body)?;response::make_response(status, rate_limit, &body, |body| {serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)})}