use std::convert::TryInto;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, BufMut, Bytes};
use futures_core::ready;
use http_body::Body;
use pin_project_lite::pin_project;
pin_project! {
pub struct ConcatBody<B> {
#[pin]
body: B,
state: State,
}
}
enum State {
Init,
Once(Bytes),
Streaming(Vec<u8>),
}
impl<B: Body> ConcatBody<B> {
pub fn new(body: B) -> Self {
ConcatBody {
body,
state: State::Init,
}
}
}
impl<B: Body> Future for ConcatBody<B> {
type Output = Result<Bytes, B::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while let Some(mut data) = ready!(this.body.as_mut().poll_data(cx)?) {
match this.state {
// If the body consists of only one chunk, we can just return the chunk...
State::Init => *this.state = State::Once(data.copy_to_bytes(data.remaining())),
// and if not, we have to allocate a new buffer.
State::Once(first) => {
let cap = first.remaining()
+ data.remaining()
+ this.body.size_hint().lower().try_into().unwrap_or(0);
let mut buf = Vec::with_capacity(cap);
buf.put(first);
buf.put(data);
*this.state = State::Streaming(buf);
}
State::Streaming(ref mut buf) => buf.put(data),
}
}
match mem::replace(this.state, State::Init) {
State::Init => Poll::Ready(Ok(Bytes::new())),
State::Once(buf) => Poll::Ready(Ok(buf)),
State::Streaming(buf) => Poll::Ready(Ok(buf.into())),
}
}
}