OX4PDDAAYMNRVC2HNEILNGDSHPQPPOMNBIBQCRW7NE6QFKFKB4XAC
use std::future::Future;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::ready;
use http::StatusCode;
use http_body::Body;
use pin_project_lite::pin_project;
use serde::{de, Deserialize};
use tower::util::{Oneshot, ServiceExt};
use crate::error::{Error, ErrorCode, TwitterErrors};
use crate::service::{HttpService, IntoService};
use crate::util::{ConcatBody, Seal};
use crate::RateLimit;
#[derive(Debug)]
pub struct Response<T> {
pub data: T,
pub rate_limit: Option<RateLimit>,
}
pin_project! {
pub struct ResponseFuture<T, S: HttpService<B>, B> {
#[pin]
inner: ResponseFutureInner<S, B>,
marker: PhantomData<fn() -> T>,
}
}
pin_project! {
#[project = ResponseFutureInnerProj]
enum ResponseFutureInner<S: HttpService<B>, B> {
Resp {
#[pin]
response: Oneshot<IntoService<S>, http::Request<B>>,
},
Body {
status: StatusCode,
rate_limit: Option<RateLimit>,
#[pin]
body: ConcatBody<S::ResponseBody>,
},
}
}
impl<T> Deref for Response<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T: de::DeserializeOwned, S, B> ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
pub(crate) fn new(req: http::Request<B>, http: S) -> Self {
ResponseFuture {
inner: ResponseFutureInner::Resp {
response: http._into_service(Seal).oneshot(req),
},
marker: PhantomData,
}
}
}
impl<T: de::DeserializeOwned, S, B> Future for ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
#[allow(clippy::type_complexity)]
type Output = Result<Response<T>, Error<S::Error, <S::ResponseBody as Body>::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project()
{
let res = ready!(response.poll(cx).map_err(Error::Service))?;
self.as_mut()
.project()
.inner
.set(ResponseFutureInner::Body {
status: res.status(),
rate_limit: RateLimit::from_headers(res.headers()),
body: ConcatBody::new(res.into_body()),
});
}
if let ResponseFutureInnerProj::Body {
status,
rate_limit,
body,
} = self.project().inner.project()
{
let body = ready!(body.poll(cx).map_err(Error::Body))?;
Poll::Ready(make_response(*status, *rate_limit, &body, |body| {
serde_json::from_slice(body).map_err(Error::Deserializing)
}))
} else {
unreachable!();
}
}
}
pub(crate) fn make_response<T, F, SE, BE>(
status: StatusCode,
rate_limit: Option<RateLimit>,
body: &[u8],
parse: F,
) -> Result<Response<T>, Error<SE, BE>>
where
F: FnOnce(&[u8]) -> Result<T, Error<SE, BE>>,
{
if let StatusCode::OK = status {
parse(body).map(|data| Response { data, rate_limit })
} else {
#[derive(Default, Deserialize)]
struct Errors {
errors: Vec<ErrorCode>,
}
serde_json::from_slice(body)
.or_else(|_| Ok(Errors::default()))
.and_then(|errors| {
Err(Error::Twitter(TwitterErrors {
status,
errors: errors.errors,
rate_limit,
}))
})
}
}
use pin_project_lite::pin_project;
use serde::{de, Deserialize};
use tower::util::{Oneshot, ServiceExt};
use crate::service::{HttpService, IntoService};
use crate::util::{ConcatBody, Seal};
#[derive(Deserialize)]
pub struct AccessToken {
#[serde(flatten)]
pub credentials: Credentials,
pub user_id: i64,
}
#[derive(Debug)]
pub struct Response<T> {
pub data: T,
pub rate_limit: Option<RateLimit>,
}
pin_project! {
pub struct ResponseFuture<T, S: HttpService<B>, B> {
#[pin]
inner: ResponseFutureInner<S, B>,
marker: PhantomData<fn() -> T>,
}
}
use serde::de;
pin_project! {
#[project = ResponseFutureInnerProj]
enum ResponseFutureInner<S: HttpService<B>, B> {
Resp {
#[pin]
response: Oneshot<IntoService<S>, http::Request<B>>,
},
Body {
status: StatusCode,
rate_limit: Option<RateLimit>,
#[pin]
body: ConcatBody<S::ResponseBody>,
},
}
}
use crate::response::ResponseFuture;
use crate::service::HttpService;
pub const RATE_LIMIT_LIMIT: &str = "x-rate-limit-limit";
pub const RATE_LIMIT_REMAINING: &str = "x-rate-limit-remaining";
pub const RATE_LIMIT_RESET: &str = "x-rate-limit-reset";
impl RateLimit {
fn from_headers(headers: &http::HeaderMap) -> Option<Self> {
pub const RATE_LIMIT_LIMIT: &str = "x-rate-limit-limit";
pub const RATE_LIMIT_REMAINING: &str = "x-rate-limit-remaining";
pub const RATE_LIMIT_RESET: &str = "x-rate-limit-reset";
impl<T> Deref for Response<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T: de::DeserializeOwned, S, B> ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
fn new(req: http::Request<B>, http: S) -> Self {
ResponseFuture {
inner: ResponseFutureInner::Resp {
response: http._into_service(Seal).oneshot(req),
},
marker: PhantomData,
fn header(headers: &http::HeaderMap, name: &str) -> Option<u64> {
headers
.get(name)
.and_then(|value| atoi::atoi(value.as_bytes()))
impl<T: de::DeserializeOwned, S, B> Future for ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
#[allow(clippy::type_complexity)]
type Output = Result<Response<T>, Error<S::Error, <S::ResponseBody as Body>::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project()
{
let res = ready!(response.poll(cx).map_err(Error::Service))?;
self.as_mut()
.project()
.inner
.set(ResponseFutureInner::Body {
status: res.status(),
rate_limit: rate_limit(&res),
body: ConcatBody::new(res.into_body()),
});
}
if let ResponseFutureInnerProj::Body {
status,
rate_limit,
body,
} = self.project().inner.project()
{
let body = ready!(body.poll(cx).map_err(Error::Body))?;
Poll::Ready(make_response(*status, *rate_limit, &body, |body| {
serde_json::from_slice(body).map_err(Error::Deserializing)
}))
} else {
unreachable!();
}
}
}
impl<SE: error::Error + 'static, BE: error::Error + 'static> error::Error for Error<SE, BE> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
Error::Deserializing(ref e) => Some(e),
Error::Service(ref e) => Some(e),
Error::Body(ref e) => Some(e),
Error::Twitter(ref e) => Some(e),
Error::Unexpected => None,
}
Some(RateLimit {
limit: header(headers, RATE_LIMIT_LIMIT)?,
remaining: header(headers, RATE_LIMIT_REMAINING)?,
reset: header(headers, RATE_LIMIT_RESET)?,
})
impl<SE: Display, BE: Display> Display for Error<SE, BE> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Error::Deserializing(_) => f.write_str("Failed to deserialize the response body."),
Error::Service(_) => f.write_str("HTTP error"),
Error::Body(_) => f.write_str("Error while reading the response body"),
Error::Twitter(_) => f.write_str("Twitter returned error(s)"),
Error::Unexpected => f.write_str("Unexpected error occured."),
}
}
}
impl TwitterErrors {
pub fn codes(&self) -> impl Iterator<Item = u32> + '_ {
self.errors.iter().map(|e| e.code)
}
}
impl Display for TwitterErrors {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "status: {}", self.status)?;
let mut errors = self.errors.iter();
if let Some(e) = errors.next() {
write!(f, "; errors: {}", e)?;
for e in errors {
write!(f, ", {}", e)?;
}
}
Ok(())
}
}
impl error::Error for TwitterErrors {}
impl ErrorCode {
pub const YOU_ARENT_ALLOWED_TO_ADD_MEMBERS_TO_THIS_LIST: u32 = 104;
pub const CANNOT_FIND_SPECIFIED_USER: u32 = 108;
pub const NO_STATUS_FOUND_WITH_THAT_ID: u32 = 144;
pub const YOU_HAVE_ALREADY_RETWEETED_THIS_TWEET: u32 = 327;
}
impl Display for ErrorCode {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{} {}", self.code, self.message)
}
}
pub async fn request_token<S, B>(
client_credentials: Credentials<&str>,
client: S,
) -> Result<Response<Credentials<Box<str>>>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/request_token";
let authorization = oauth::Builder::<_, _>::new(client_credentials, oauth::HmacSha1)
.callback("oob")
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = rate_limit(&res);
let body = ConcatBody::new(res.into_body())
.await
.map_err(Error::Body)?;
make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}
pub async fn access_token<'a, S, B>(
oauth_verifier: &'a str,
client_credentials: Credentials<&'a str>,
temporary_credentials: Credentials<&'a str>,
client: S,
) -> Result<Response<AccessToken>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/access_token";
let authorization = oauth::Builder::new(client_credentials, oauth::HmacSha1)
.token(temporary_credentials)
.verifier(oauth_verifier)
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = rate_limit(&res);
let body = ConcatBody::new(res.into_body())
.await
.map_err(Error::Body)?;
make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}
fn make_response<T, F, SE, BE>(
status: StatusCode,
rate_limit: Option<RateLimit>,
body: &[u8],
parse: F,
) -> Result<Response<T>, Error<SE, BE>>
where
F: FnOnce(&[u8]) -> Result<T, Error<SE, BE>>,
{
if let StatusCode::OK = status {
parse(body).map(|data| Response { data, rate_limit })
} else {
#[derive(Default, Deserialize)]
struct Errors {
errors: Vec<ErrorCode>,
}
serde_json::from_slice(body)
.or_else(|_| Ok(Errors::default()))
.and_then(|errors| {
Err(Error::Twitter(TwitterErrors {
status,
errors: errors.errors,
rate_limit,
}))
})
}
}
}
fn rate_limit<T>(res: &http::Response<T>) -> Option<RateLimit> {
Some(RateLimit {
limit: header(res, RATE_LIMIT_LIMIT)?,
remaining: header(res, RATE_LIMIT_REMAINING)?,
reset: header(res, RATE_LIMIT_RESET)?,
})
}
fn header<T>(res: &http::Response<T>, name: &str) -> Option<u64> {
res.headers()
.get(name)
.and_then(|value| atoi::atoi(value.as_bytes()))
use std::error;
use std::fmt::{self, Display, Formatter};
use http::StatusCode;
use serde::Deserialize;
#[derive(Debug)]
pub enum Error<SE, BE> {
Deserializing(serde_json::Error),
Service(SE),
Body(BE),
Twitter(TwitterErrors),
Unexpected,
}
#[derive(Debug)]
pub struct TwitterErrors {
pub status: StatusCode,
pub errors: Vec<ErrorCode>,
pub rate_limit: Option<crate::RateLimit>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorCode {
pub code: u32,
pub message: String,
}
impl<SE: error::Error + 'static, BE: error::Error + 'static> error::Error for Error<SE, BE> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
Error::Deserializing(ref e) => Some(e),
Error::Service(ref e) => Some(e),
Error::Body(ref e) => Some(e),
Error::Twitter(ref e) => Some(e),
Error::Unexpected => None,
}
}
}
impl<SE: Display, BE: Display> Display for Error<SE, BE> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Error::Deserializing(_) => f.write_str("Failed to deserialize the response body."),
Error::Service(_) => f.write_str("HTTP error"),
Error::Body(_) => f.write_str("Error while reading the response body"),
Error::Twitter(_) => f.write_str("Twitter returned error(s)"),
Error::Unexpected => f.write_str("Unexpected error occured."),
}
}
}
impl TwitterErrors {
pub fn codes(&self) -> impl Iterator<Item = u32> + '_ {
self.errors.iter().map(|e| e.code)
}
}
impl Display for TwitterErrors {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "status: {}", self.status)?;
let mut errors = self.errors.iter();
if let Some(e) = errors.next() {
write!(f, "; errors: {}", e)?;
for e in errors {
write!(f, ", {}", e)?;
}
}
Ok(())
}
}
impl error::Error for TwitterErrors {}
impl ErrorCode {
pub const YOU_ARENT_ALLOWED_TO_ADD_MEMBERS_TO_THIS_LIST: u32 = 104;
pub const CANNOT_FIND_SPECIFIED_USER: u32 = 108;
pub const NO_STATUS_FOUND_WITH_THAT_ID: u32 = 144;
pub const YOU_HAVE_ALREADY_RETWEETED_THIS_TWEET: u32 = 327;
}
impl Display for ErrorCode {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{} {}", self.code, self.message)
}
}
use http::header::AUTHORIZATION;
use http::Uri;
use http_body::Body;
use oauth_credentials::Credentials;
use serde::Deserialize;
use tower::util::ServiceExt;
use crate::error::Error;
use crate::response;
use crate::service::HttpService;
use crate::util::{ConcatBody, Seal};
use crate::{RateLimit, Response};
#[derive(Deserialize)]
pub struct AccessToken {
#[serde(flatten)]
pub credentials: Credentials,
pub user_id: i64,
}
pub async fn request_token<S, B>(
client_credentials: Credentials<&str>,
client: S,
) -> Result<Response<Credentials<Box<str>>>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/request_token";
let authorization = oauth::Builder::<_, _>::new(client_credentials, oauth::HmacSha1)
.callback("oob")
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = RateLimit::from_headers(res.headers());
let body = ConcatBody::new(res).await.map_err(Error::Body)?;
response::make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}
pub async fn access_token<'a, S, B>(
oauth_verifier: &'a str,
client_credentials: Credentials<&'a str>,
temporary_credentials: Credentials<&'a str>,
client: S,
) -> Result<Response<AccessToken>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/access_token";
let authorization = oauth::Builder::new(client_credentials, oauth::HmacSha1)
.token(temporary_credentials)
.verifier(oauth_verifier)
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = RateLimit::from_headers(res.headers());
let body = ConcatBody::new(res).await.map_err(Error::Body)?;
response::make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}