fuchsia_async/runtime/fuchsia/executor/atomic_future/
spawnable_future.rs1use super::super::common::TaskHandle;
6use super::{AtomicFutureHandle, Bomb, Meta, DONE, RESULT_TAKEN};
7use crate::scope::Spawnable;
8use crate::ScopeHandle;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::pin::Pin;
12use std::task::{ready, Context, Poll};
13
14pub struct SpawnableFuture<'a, O>(AtomicFutureHandle<'a>, PhantomData<O>);
22
23impl<O> Unpin for SpawnableFuture<'_, O> {}
24
25impl<'a, O> SpawnableFuture<'a, O> {
26 pub fn new<F: Future<Output = O> + Send + 'a>(future: F) -> Self
29 where
30 O: Send + 'a,
31 {
32 Self(AtomicFutureHandle::new(None, 0, future), PhantomData)
33 }
34
35 fn meta(&mut self) -> &mut Meta {
36 unsafe { &mut *self.0 .0.as_mut() }
38 }
39}
40
41impl<O> Spawnable for SpawnableFuture<'static, O> {
42 type Output = O;
43
44 fn into_task(mut self, scope: ScopeHandle) -> TaskHandle {
45 let meta = self.meta();
46 meta.id = scope.executor().next_task_id();
47 meta.scope = Some(scope);
48 self.0
49 }
50}
51
52impl<O> Future for &mut SpawnableFuture<'_, O> {
55 type Output = O;
56
57 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
58 let bomb = Bomb;
60
61 let meta = self.meta();
62 let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
63
64 std::mem::forget(bomb);
65
66 ready!(result);
67
68 let result = unsafe { ((meta.vtable.get_result)(meta.into()) as *const O).read() };
69 *meta.state.get_mut() = 1 | DONE | RESULT_TAKEN;
70
71 Poll::Ready(result)
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::SpawnableFuture;
78 use crate::{Scope, SendExecutor};
79 use std::future::poll_fn;
80 use std::sync::atomic::AtomicU64;
81 use std::sync::atomic::Ordering::Relaxed;
82 use std::sync::Arc;
83 use std::task::Poll;
84
85 #[test]
86 fn test_spawnable_future() {
87 let mut executor = SendExecutor::new(2);
88 executor.run(async move {
89 let counter = Arc::new(AtomicU64::new(0));
90 let counter2 = Arc::clone(&counter);
91 let mut task1 = SpawnableFuture::new(async move {
92 let () = poll_fn(|_cx| {
93 if counter2.fetch_add(1, Relaxed) == 1 {
94 Poll::Ready(())
95 } else {
96 Poll::Pending
97 }
98 })
99 .await;
100 });
101 let old = counter.load(Relaxed);
102 assert!(futures::poll!(&mut task1).is_pending());
103 assert_eq!(counter.load(Relaxed), old + 1);
104
105 Scope::current().spawn(task1).await;
106
107 assert_eq!(counter.load(Relaxed), old + 2);
108 });
109 }
110
111 #[test]
112 fn test_drop_done_spawnable_future() {
113 futures::executor::block_on(async {
114 let mut future = SpawnableFuture::new(async {});
115 assert!(futures::poll!(&mut future).is_ready());
116 });
117 }
118}