The code is taken from my other project <https://github.com/tesaguri/pipitor.git>.
FRMT7RO7G2QNNLXMR2524IU6MV42ISMNXRISWT56JSDB2WCBQN6QC
mod concat_body;
pub use concat_body::ConcatBody;
pub struct Seal;
pub trait Sealed<B> {}
use std::convert::TryInto;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, BufMut, Bytes};
use futures_core::ready;
use http_body::Body;
use pin_project_lite::pin_project;
pin_project! {
pub struct ConcatBody<B> {
#[pin]
body: B,
state: State,
}
}
enum State {
Init,
Once(Bytes),
Streaming(Vec<u8>),
}
impl<B: Body> ConcatBody<B> {
pub fn new(body: B) -> Self {
ConcatBody {
body,
state: State::Init,
}
}
}
impl<B: Body> Future for ConcatBody<B> {
type Output = Result<Bytes, B::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while let Some(mut data) = ready!(this.body.as_mut().poll_data(cx)?) {
match this.state {
// If the body consists of only one chunk, we can just return the chunk...
State::Init => *this.state = State::Once(data.copy_to_bytes(data.remaining())),
// and if not, we have to allocate a new buffer.
State::Once(first) => {
let cap = first.remaining()
+ data.remaining()
+ this.body.size_hint().lower().try_into().unwrap_or(0);
let mut buf = Vec::with_capacity(cap);
buf.put(first);
buf.put(data);
*this.state = State::Streaming(buf);
}
State::Streaming(ref mut buf) => buf.put(data),
}
}
match mem::replace(this.state, State::Init) {
State::Init => Poll::Ready(Ok(Bytes::new())),
State::Once(buf) => Poll::Ready(Ok(buf)),
State::Streaming(buf) => Poll::Ready(Ok(buf.into())),
}
}
}
//! A trait alias for [`Service`](tower_service::Service).
pub(crate) use self::private::IntoService;
use std::future::Future;
use std::task::{Context, Poll};
use http::{Request, Response};
use http_body::Body;
use tower_service::Service;
use crate::util::{Seal, Sealed};
#[cfg_attr(not(feature = "hyper"), allow(intra_doc_link_resolution_failure))]
/// An HTTP client (like [`hyper::Client`]).
///
/// This is just an alias for [`tower_service::Service`](tower_service::Service)
/// introduced to reduce the number of type parameters in `Builder::listen_with_client`.
pub trait HttpService<B>: Sealed<B> {
/// Body of the responses given by the service.
type ResponseBody: Body;
type Error;
type Future: Future<Output = Result<Response<Self::ResponseBody>, Self::Error>>;
#[doc(hidden)]
fn _poll_ready(&mut self, cx: &mut Context<'_>, _seal: Seal) -> Poll<Result<(), Self::Error>>;
#[doc(hidden)]
fn _call(&mut self, request: Request<B>, _seal: Seal) -> Self::Future;
#[doc(hidden)]
fn _into_service(self, _seal: Seal) -> IntoService<Self>
where
Self: Sized,
{
IntoService(self)
}
}
impl<S, ReqB, ResB> HttpService<ReqB> for S
where
S: Service<Request<ReqB>, Response = Response<ResB>> + ?Sized,
ResB: Body,
{
type ResponseBody = ResB;
type Error = S::Error;
type Future = S::Future;
#[doc(hidden)]
fn _poll_ready(&mut self, cx: &mut Context<'_>, _seal: Seal) -> Poll<Result<(), S::Error>> {
Service::poll_ready(self, cx)
}
#[doc(hidden)]
fn _call(&mut self, request: Request<ReqB>, _seal: Seal) -> S::Future {
Service::call(self, request)
}
}
impl<S, ReqB, ResB> Sealed<ReqB> for S
where
S: Service<Request<ReqB>, Response = Response<ResB>> + ?Sized,
ResB: Body,
{
}
impl<S, B> Service<Request<B>> for IntoService<S>
where
S: HttpService<B>,
{
type Response = Response<<S as HttpService<B>>::ResponseBody>;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
HttpService::_poll_ready(&mut self.0, cx, Seal)
}
fn call(&mut self, request: Request<B>) -> Self::Future {
HttpService::_call(&mut self.0, request, Seal)
}
}
mod private {
pub struct IntoService<S>(pub S);
}
pub mod service;
mod util;
use std::error;
use std::fmt::{self, Display, Formatter};
use std::future::Future;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::ready;
use http::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use http::{Method, StatusCode, Uri};
use http_body::Body;
use oauth_credentials::Credentials;
use pin_project_lite::pin_project;
use serde::{de, Deserialize};
use tower::util::{Oneshot, ServiceExt};
use crate::service::{HttpService, IntoService};
use crate::util::{ConcatBody, Seal};
#[derive(Deserialize)]
pub struct AccessToken {
#[serde(flatten)]
pub credentials: Credentials,
pub user_id: i64,
}
#[derive(Debug)]
pub struct Response<T> {
pub data: T,
pub rate_limit: Option<RateLimit>,
}
pin_project! {
pub struct ResponseFuture<T, S: HttpService<B>, B> {
#[pin]
inner: ResponseFutureInner<S, B>,
marker: PhantomData<fn() -> T>,
}
}
pin_project! {
#[project = ResponseFutureInnerProj]
enum ResponseFutureInner<S: HttpService<B>, B> {
Resp {
#[pin]
response: Oneshot<IntoService<S>, http::Request<B>>,
},
Body {
status: StatusCode,
rate_limit: Option<RateLimit>,
#[pin]
body: ConcatBody<S::ResponseBody>,
},
}
}
#[derive(Clone, Copy, Debug)]
pub struct RateLimit {
pub limit: u64,
pub remaining: u64,
pub reset: u64,
}
#[derive(Debug)]
pub enum Error<SE, BE> {
Deserializing(serde_json::Error),
Service(SE),
Body(BE),
Twitter(TwitterErrors),
Unexpected,
}
#[derive(Debug)]
pub struct TwitterErrors {
pub status: StatusCode,
pub errors: Vec<ErrorCode>,
pub rate_limit: Option<RateLimit>,
}
#[derive(Debug, Deserialize)]
pub struct ErrorCode {
pub code: u32,
pub message: String,
}
pub trait Request: oauth::Request {
type Data: de::DeserializeOwned;
const METHOD: Method;
const URI: &'static str;
fn send<C, T, S, B>(
&self,
client: &Credentials<C>,
token: &Credentials<T>,
http: S,
) -> ResponseFuture<Self::Data, S, B>
where
C: AsRef<str>,
T: AsRef<str>,
S: HttpService<B>,
B: From<Vec<u8>>,
{
let req = prepare_request(
&Self::METHOD,
Self::URI,
self,
client.as_ref(),
token.as_ref(),
);
ResponseFuture::new(req.map(Into::into), http)
}
}
pub const RATE_LIMIT_LIMIT: &str = "x-rate-limit-limit";
pub const RATE_LIMIT_REMAINING: &str = "x-rate-limit-remaining";
pub const RATE_LIMIT_RESET: &str = "x-rate-limit-reset";
impl<T> Deref for Response<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T: de::DeserializeOwned, S, B> ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
fn new(req: http::Request<B>, http: S) -> Self {
ResponseFuture {
inner: ResponseFutureInner::Resp {
response: http._into_service(Seal).oneshot(req),
},
marker: PhantomData,
}
}
}
impl<T: de::DeserializeOwned, S, B> Future for ResponseFuture<T, S, B>
where
S: HttpService<B>,
{
#[allow(clippy::type_complexity)]
type Output = Result<Response<T>, Error<S::Error, <S::ResponseBody as Body>::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let ResponseFutureInnerProj::Resp { response } = self.as_mut().project().inner.project()
{
let res = ready!(response.poll(cx).map_err(Error::Service))?;
self.as_mut()
.project()
.inner
.set(ResponseFutureInner::Body {
status: res.status(),
rate_limit: rate_limit(&res),
body: ConcatBody::new(res.into_body()),
});
}
if let ResponseFutureInnerProj::Body {
status,
rate_limit,
body,
} = self.project().inner.project()
{
let body = ready!(body.poll(cx).map_err(Error::Body))?;
Poll::Ready(make_response(*status, *rate_limit, &body, |body| {
serde_json::from_slice(body).map_err(Error::Deserializing)
}))
} else {
unreachable!();
}
}
}
impl<SE: error::Error + 'static, BE: error::Error + 'static> error::Error for Error<SE, BE> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
Error::Deserializing(ref e) => Some(e),
Error::Service(ref e) => Some(e),
Error::Body(ref e) => Some(e),
Error::Twitter(ref e) => Some(e),
Error::Unexpected => None,
}
}
}
impl<SE: Display, BE: Display> Display for Error<SE, BE> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Error::Deserializing(_) => f.write_str("Failed to deserialize the response body."),
Error::Service(_) => f.write_str("HTTP error"),
Error::Body(_) => f.write_str("Error while reading the response body"),
Error::Twitter(_) => f.write_str("Twitter returned error(s)"),
Error::Unexpected => f.write_str("Unexpected error occured."),
}
}
}
impl TwitterErrors {
pub fn codes(&self) -> impl Iterator<Item = u32> + '_ {
self.errors.iter().map(|e| e.code)
}
}
impl Display for TwitterErrors {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "status: {}", self.status)?;
let mut errors = self.errors.iter();
if let Some(e) = errors.next() {
write!(f, "; errors: {}", e)?;
for e in errors {
write!(f, ", {}", e)?;
}
}
Ok(())
}
}
impl error::Error for TwitterErrors {}
impl ErrorCode {
pub const YOU_ARENT_ALLOWED_TO_ADD_MEMBERS_TO_THIS_LIST: u32 = 104;
pub const CANNOT_FIND_SPECIFIED_USER: u32 = 108;
pub const NO_STATUS_FOUND_WITH_THAT_ID: u32 = 144;
pub const YOU_HAVE_ALREADY_RETWEETED_THIS_TWEET: u32 = 327;
}
impl Display for ErrorCode {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{} {}", self.code, self.message)
}
}
pub async fn request_token<S, B>(
client_credentials: Credentials<&str>,
client: S,
) -> Result<Response<Credentials<Box<str>>>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/request_token";
let authorization = oauth::Builder::<_, _>::new(client_credentials, oauth::HmacSha1)
.callback("oob")
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = rate_limit(&res);
let body = ConcatBody::new(res.into_body())
.await
.map_err(Error::Body)?;
make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}
pub async fn access_token<'a, S, B>(
oauth_verifier: &'a str,
client_credentials: Credentials<&'a str>,
temporary_credentials: Credentials<&'a str>,
client: S,
) -> Result<Response<AccessToken>, Error<S::Error, <S::ResponseBody as Body>::Error>>
where
S: HttpService<B>,
B: Default + From<Vec<u8>>,
{
const URI: &str = "https://api.twitter.com/oauth/access_token";
let authorization = oauth::Builder::new(client_credentials, oauth::HmacSha1)
.token(temporary_credentials)
.verifier(oauth_verifier)
.post(URI, &());
let req = http::Request::post(Uri::from_static(URI))
.header(AUTHORIZATION, authorization)
.body(Default::default())
.unwrap();
let res = client
._into_service(Seal)
.oneshot(req)
.await
.map_err(Error::Service)?;
let status = res.status();
let rate_limit = rate_limit(&res);
let body = ConcatBody::new(res.into_body())
.await
.map_err(Error::Body)?;
make_response(status, rate_limit, &body, |body| {
serde_urlencoded::from_bytes(body).map_err(|_| Error::Unexpected)
})
}
fn make_response<T, F, SE, BE>(
status: StatusCode,
rate_limit: Option<RateLimit>,
body: &[u8],
parse: F,
) -> Result<Response<T>, Error<SE, BE>>
where
F: FnOnce(&[u8]) -> Result<T, Error<SE, BE>>,
{
if let StatusCode::OK = status {
parse(body).map(|data| Response { data, rate_limit })
} else {
#[derive(Default, Deserialize)]
struct Errors {
errors: Vec<ErrorCode>,
}
serde_json::from_slice(body)
.or_else(|_| Ok(Errors::default()))
.and_then(|errors| {
Err(Error::Twitter(TwitterErrors {
status,
errors: errors.errors,
rate_limit,
}))
})
}
}
fn prepare_request<R>(
method: &Method,
uri: &'static str,
req: &R,
client: Credentials<&str>,
token: Credentials<&str>,
) -> http::Request<Vec<u8>>
where
R: oauth::Request + ?Sized,
{
let form = method == Method::POST;
let mut oauth = oauth::Builder::new(client, oauth::HmacSha1);
oauth.token(token);
let authorization = oauth.build(method.as_str(), uri, req);
let http = http::Request::builder()
.method(method)
.header(AUTHORIZATION, authorization);
if form {
let data = oauth::to_form_urlencoded(req).into_bytes();
http.uri(Uri::from_static(uri))
.header(
CONTENT_TYPE,
HeaderValue::from_static("application/x-www-form-urlencoded"),
)
.body(data)
.unwrap()
} else {
let uri = oauth::to_uri_query(uri.to_owned(), req);
http.uri(uri).body(Vec::default()).unwrap()
}
}
fn rate_limit<T>(res: &http::Response<T>) -> Option<RateLimit> {
Some(RateLimit {
limit: header(res, RATE_LIMIT_LIMIT)?,
remaining: header(res, RATE_LIMIT_REMAINING)?,
reset: header(res, RATE_LIMIT_RESET)?,
})
}
fn header<T>(res: &http::Response<T>, name: &str) -> Option<u64> {
res.headers()
.get(name)
.and_then(|value| atoi::atoi(value.as_bytes()))
}
MIT License
Copyright (c) 2021 Daiki Mizukami
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
[package]
name = "twitter-client"
version = "0.0.0"
authors = ["Daiki Mizukami <tesaguriguma@gmail.com>"]
license = "MIT"
readme = "README.md"
keywords = ["twitter"]
categories = ["api-bindings"]
repository = "https://nest.pijul.com/tesaguri/twitter-client-rs"
documentation = "https://docs.rs/twitter-client/0.0.0/twitter_client/"
description = """
A thin wrapper around Twitter API.
"""
edition = "2018"
[dependencies]
atoi = "0.4"
bytes = "1"
futures-core = "0.3"
http = "0.2"
http-body = "0.4"
oauth = { version = "0.5", package = "oauth1-request" }
oauth-credentials = { version = "0.3", features = ["serde"] }
pin-project-lite = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_urlencoded = "0.7"
tower = { version = "0.4", features = ["util"] }
tower-service = "0.3"
hyper = { version = "0.14", features = ["client", "http1"], optional = true }
[features]
__intra_doc_links = ["hyper"]
/target
Cargo.lock