mod wrappers;
pub use wrappers::*;
#[cfg(not(any(test, feature = "testing")))]
#[doc(inline)]
pub use iced::Task;
#[cfg(not(any(test, feature = "testing")))]
#[doc(inline)]
pub use iced::task::Handle;
#[cfg(any(test, feature = "testing"))]
pub struct Task<T> {
stream: Option<iced::futures::stream::BoxStream<'static, T>>,
}
#[cfg(any(test, feature = "testing"))]
pub async fn await_next_msg<T>(tasks: &mut Task<T>) -> T {
use iced::time::Duration;
use tokio::time::timeout;
let Task { stream } = tasks;
let stream = stream.as_mut().unwrap();
timeout(Duration::from_secs(5), async move {
iced::futures::stream::StreamExt::next(stream)
.await
.unwrap()
})
.await
.unwrap()
}
#[cfg(any(test, feature = "testing"))]
impl<T> Task<T> {
pub fn none() -> Self {
Self { stream: None }
}
pub fn is_none(&self) -> bool {
self.stream.is_none()
}
pub fn is_some(&self) -> bool {
self.stream.is_some()
}
pub fn done(value: T) -> Task<T>
where
T: Send + 'static,
{
Self::future(iced::futures::future::ready(value))
}
pub fn perform<A>(
future: impl Future<Output = A> + Send + 'static,
f: impl FnOnce(A) -> T + Send + 'static,
) -> Self
where
T: Send + 'static,
A: Send + 'static,
{
Self::future(iced::futures::FutureExt::map(future, f))
}
pub fn run<A>(
stream: impl iced::futures::stream::Stream<Item = A> + Send + 'static,
f: impl Fn(A) -> T + Send + 'static,
) -> Self
where
T: 'static,
{
Self::stream(iced::futures::stream::StreamExt::map(stream, f))
}
pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
where
T: 'static,
{
let mut select_all = iced::futures::stream::SelectAll::new();
for task in tasks.into_iter() {
if let Some(stream) = task.stream {
select_all.push(stream);
}
}
if select_all.is_empty() {
Self::none()
} else {
Self {
stream: Some(iced::futures::stream::StreamExt::boxed(
select_all,
)),
}
}
}
pub fn map<O>(self, mut f: impl FnMut(T) -> O + Send + 'static) -> Task<O>
where
T: Send + 'static,
O: Send + 'static,
{
self.then(move |output| Task::done(f(output)))
}
pub fn then<O>(
self,
mut f: impl FnMut(T) -> Task<O> + Send + 'static,
) -> Task<O>
where
T: Send + 'static,
O: Send + 'static,
{
Task {
stream: match self.stream {
None => None,
Some(stream) => Some(iced::futures::stream::StreamExt::boxed(
iced::futures::stream::StreamExt::flat_map(
stream,
move |output| {
f(output).stream.unwrap_or_else(|| {
iced::futures::stream::StreamExt::boxed(
iced::futures::stream::empty(),
)
})
},
),
)),
},
}
}
pub fn future(future: impl Future<Output = T> + Send + 'static) -> Self
where
T: 'static,
{
Self::stream(iced::futures::stream::once(future))
}
pub fn stream(
stream: impl iced::futures::stream::Stream<Item = T> + Send + 'static,
) -> Self
where
T: 'static,
{
Self {
stream: Some(iced::futures::stream::StreamExt::boxed(stream)),
}
}
pub fn abortable(self) -> (Self, Handle)
where
T: 'static,
{
let (stream, handle) = match self.stream {
Some(stream) => {
let (stream, handle) = iced::futures::stream::abortable(stream);
(
Some(iced_runtime::futures::boxed_stream(stream)),
InternalHandle::Manual(handle),
)
}
None => (
None,
InternalHandle::Manual(
iced::futures::stream::AbortHandle::new_pair().0,
),
),
};
(Self { stream }, Handle { internal: handle })
}
}
#[cfg(any(test, feature = "testing"))]
impl<T> std::fmt::Debug for Task<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(&format!("Task<{}>", std::any::type_name::<T>()))
.finish()
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Clone)]
pub struct Handle {
internal: InternalHandle,
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Clone)]
enum InternalHandle {
Manual(iced::futures::stream::AbortHandle),
AbortOnDrop(std::sync::Arc<iced::futures::stream::AbortHandle>),
}
#[cfg(any(test, feature = "testing"))]
impl InternalHandle {
pub fn as_ref(&self) -> &iced::futures::stream::AbortHandle {
match self {
InternalHandle::Manual(handle) => handle,
InternalHandle::AbortOnDrop(handle) => handle.as_ref(),
}
}
}
#[cfg(any(test, feature = "testing"))]
impl Handle {
pub fn abort(&self) {
self.internal.as_ref().abort();
}
pub fn abort_on_drop(self) -> Self {
match &self.internal {
InternalHandle::Manual(handle) => Self {
internal: InternalHandle::AbortOnDrop(std::sync::Arc::new(
handle.clone(),
)),
},
InternalHandle::AbortOnDrop(_) => self,
}
}
pub fn is_aborted(&self) -> bool {
self.internal.as_ref().is_aborted()
}
}
#[cfg(any(test, feature = "testing"))]
impl Drop for Handle {
fn drop(&mut self) {
if let InternalHandle::AbortOnDrop(handle) = &mut self.internal {
let handle = std::mem::replace(
handle,
std::sync::Arc::new(
iced::futures::stream::AbortHandle::new_pair().0,
),
);
if let Some(handle) = std::sync::Arc::into_inner(handle) {
handle.abort();
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_run_task() {
let mut task = Task::done(123_usize);
let result = await_next_msg(&mut task).await;
assert_eq!(result, 123_usize)
}
}