fuchsia_component_server/
until_stalled.rs1use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use futures::channel::oneshot::{self, Canceled};
14use futures::future::FusedFuture;
15use futures::{FutureExt, Stream, StreamExt};
16use pin_project::pin_project;
17use vfs::directory::immutable::connection::ImmutableConnection;
18use vfs::directory::immutable::Simple;
19use vfs::execution_scope::{ActiveGuard, ExecutionScope};
20use vfs::ToObjectRequest;
21use zx::MonotonicDuration;
22use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
23
24use super::{ServiceFs, ServiceObjTrait};
25
26type StalledFut = Pin<Box<dyn FusedFuture<Output = Option<zx::Channel>>>>;
29
30#[pin_project]
36pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
37 #[pin]
38 fs: ServiceFs<ServiceObjTy>,
39 connector: OutgoingConnector,
40 state: State,
41 debounce_interval: zx::MonotonicDuration,
42 is_terminated: bool,
43}
44
45pub enum Item<Output> {
47 Request(Output, ActiveGuard),
52
53 Stalled(zx::Channel),
57}
58
59enum State {
70 Running { stalled: StalledFut },
71 Stalled { channel: Option<fasync::OnSignals<'static, zx::Channel>> },
74}
75
76impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
77 type Item = Item<ServiceObjTy::Output>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 let mut this = self.project();
81 if *this.is_terminated {
82 return Poll::Ready(None);
83 }
84
85 let poll_fs = this.fs.poll_next_unpin(cx);
90 if let Poll::Ready(Some(request)) = poll_fs {
91 return match this.connector.scope.try_active_guard() {
93 Some(guard) => Poll::Ready(Some(Item::Request(request, guard))),
94 None => Poll::Ready(None),
95 };
96 }
97
98 loop {
101 match &mut this.state {
102 State::Running { stalled } => {
103 let channel = std::task::ready!(stalled.as_mut().poll(cx));
104 let channel = channel
105 .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
106 *this.state = State::Stalled { channel };
108 }
109 State::Stalled { channel } => {
110 if let Poll::Ready(None) = poll_fs {
111 *this.is_terminated = true;
113
114 this.connector.scope.shutdown();
119
120 return Poll::Ready(
121 channel.take().map(|wait| Item::Stalled(wait.take_handle().into())),
122 );
123 }
124 if channel.is_none() {
125 return Poll::Pending;
129 }
130 let readable = channel.as_mut().unwrap().poll_unpin(cx);
132 let _ = std::task::ready!(readable);
133 let wait = channel.take().unwrap();
135 let stalled =
136 this.connector.serve(wait.take_handle().into(), *this.debounce_interval);
137 *this.state = State::Running { stalled };
139 }
140 }
141 }
142 }
143}
144
145struct OutgoingConnector {
146 flags: fio::Flags,
147 scope: ExecutionScope,
148 dir: Arc<Simple>,
149}
150
151impl OutgoingConnector {
152 fn serve(
158 &mut self,
159 server_end: ServerEnd<fio::DirectoryMarker>,
160 debounce_interval: MonotonicDuration,
161 ) -> StalledFut {
162 let (unbound_sender, unbound_receiver) = oneshot::channel();
163 let object_request = self.flags.to_object_request(server_end);
164 let scope = self.scope.clone();
165 let dir = self.dir.clone();
166 let flags = self.flags;
167 scope.clone().spawn(object_request.handle_async(async move |object_request| {
168 ImmutableConnection::create_transform_stream(
169 scope,
170 dir,
171 flags,
172 object_request,
173 move |stream| {
174 StallableRequestStream::new(
175 stream,
176 debounce_interval,
177 move |maybe_channel: Option<zx::Channel>| {
180 _ = unbound_sender.send(maybe_channel);
181 },
182 )
183 },
184 )
185 .await
186 }));
187 Box::pin(
188 unbound_receiver
189 .map(|result| match result {
190 Ok(maybe_channel) => maybe_channel,
191 Err(Canceled) => None,
192 })
193 .fuse(),
194 )
195 }
196}
197
198impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
199 pub(crate) fn new(
200 mut fs: ServiceFs<ServiceObjTy>,
201 debounce_interval: zx::MonotonicDuration,
202 ) -> Self {
203 let channel_queue =
204 fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
205 assert!(
206 channel_queue.len() == 1,
207 "Must have exactly one connection to serve, \
208 e.g. did you call ServiceFs::take_and_serve_directory_handle?"
209 );
210 let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
211 let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
212 let scope = fs.scope.clone();
213 let dir = fs.dir.clone();
214 let mut connector = OutgoingConnector { flags, scope, dir };
215 let stalled = connector.serve(server_end, debounce_interval);
216 Self {
217 fs,
218 connector,
219 state: State::Running { stalled },
220 debounce_interval,
221 is_terminated: false,
222 }
223 }
224
225 pub fn try_active_guard(&self) -> Option<ActiveGuard> {
229 self.connector.scope.try_active_guard()
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use assert_matches::assert_matches;
237 use fasync::TestExecutor;
238 use fidl::endpoints::ClientEnd;
239 use fidl_fuchsia_component_client_test::{
240 ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
241 };
242 use fuchsia_component_client::connect_to_protocol_at_dir_svc;
243 use fuchsia_component_directory::open_directory_async;
244 use futures::future::BoxFuture;
245 use futures::{pin_mut, select, TryStreamExt};
246 use std::sync::atomic::{AtomicBool, Ordering};
247 use std::sync::Mutex;
248 use test_util::Counter;
249 use zx::AsHandleRef;
250
251 enum Requests {
252 ServiceA(ProtocolARequestStream),
253 }
254
255 #[derive(Clone)]
256 struct MockServer {
257 call_count: Arc<Counter>,
258 stalled: Arc<AtomicBool>,
259 server_end: Arc<Mutex<Option<zx::Channel>>>,
260 }
261
262 impl MockServer {
263 fn new() -> Self {
264 let call_count = Arc::new(Counter::new(0));
265 let stalled = Arc::new(AtomicBool::new(false));
266 let server_end = Arc::new(Mutex::new(None));
267 Self { call_count, stalled, server_end }
268 }
269
270 fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
271 let stalled = self.stalled.clone();
272 let call_count = self.call_count.clone();
273 let server_end = self.server_end.clone();
274 async move {
275 match item {
276 Item::Request(requests, active_guard) => {
277 let _active_guard = active_guard;
278 let Requests::ServiceA(mut request_stream) = requests;
279 while let Ok(Some(request)) = request_stream.try_next().await {
280 match request {
281 ProtocolARequest::Foo { responder } => {
282 call_count.inc();
283 let _ = responder.send();
284 }
285 }
286 }
287 }
288 Item::Stalled(channel) => {
289 *server_end.lock().unwrap() = Some(channel);
290 stalled.store(true, Ordering::SeqCst);
291 }
292 }
293 }
294 .boxed()
295 }
296
297 #[track_caller]
298 fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
299 let reclaimed_server_end: zx::Channel = self.server_end.lock().unwrap().take().unwrap();
300 assert_eq!(
301 client_end.get_koid().unwrap(),
302 reclaimed_server_end.basic_info().unwrap().related_koid
303 )
304 }
305 }
306
307 async fn setup_test(
309 server_end: ServerEnd<fio::DirectoryMarker>,
310 ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
311 let initial = fasync::MonotonicInstant::from_nanos(0);
312 TestExecutor::advance_to(initial).await;
313 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
314
315 let mut fs = ServiceFs::new();
316 fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
317
318 let mock_server = MockServer::new();
319 let mock_server_clone = mock_server.clone();
320 let fs = fs
321 .until_stalled(IDLE_DURATION)
322 .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
323
324 (initial, mock_server, fs)
325 }
326
327 #[fuchsia::test(allow_stalls = false)]
328 async fn drain_request() {
329 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
330 const NUM_FOO_REQUESTS: usize = 10;
331 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
332 let (initial, mock_server, fs) = setup_test(server_end).await;
333 pin_mut!(fs);
334
335 let mut proxies = Vec::new();
336 for _ in 0..NUM_FOO_REQUESTS {
337 proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
338 }
339
340 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
342
343 TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
345 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
346
347 for proxy in proxies.iter() {
349 select! {
350 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
351 _ = fs => unreachable!(),
352 };
353 }
354
355 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
357 drop(proxies);
358 fs.await;
359
360 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
362 assert!(mock_server.stalled.load(Ordering::SeqCst));
363 mock_server.assert_fs_gave_back_server_end(client_end);
364 }
365
366 #[fuchsia::test(allow_stalls = false)]
367 async fn no_request() {
368 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
369 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
370 let (initial, mock_server, fs) = setup_test(server_end).await;
371 pin_mut!(fs);
372
373 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
374 TestExecutor::advance_to(initial + IDLE_DURATION).await;
375 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
376
377 assert_eq!(mock_server.call_count.get(), 0);
378 assert!(mock_server.stalled.load(Ordering::SeqCst));
379 mock_server.assert_fs_gave_back_server_end(client_end);
380 }
381
382 #[fuchsia::test(allow_stalls = false)]
383 async fn outgoing_dir_client_closed() {
384 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
385 let (_initial, mock_server, fs) = setup_test(server_end).await;
386 pin_mut!(fs);
387
388 drop(client_end);
389 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
390
391 assert_eq!(mock_server.call_count.get(), 0);
392 assert!(!mock_server.stalled.load(Ordering::SeqCst));
393 assert!(mock_server.server_end.lock().unwrap().is_none());
394 }
395
396 #[fuchsia::test(allow_stalls = false)]
397 async fn request_then_stalled() {
398 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
399
400 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
401 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
402
403 let foo = proxy.foo().fuse();
404 pin_mut!(foo);
405 assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
406
407 let (initial, mock_server, fs) = setup_test(server_end).await;
408 pin_mut!(fs);
409
410 assert_eq!(mock_server.call_count.get(), 0);
412 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
413 assert_eq!(mock_server.call_count.get(), 1);
414 assert_matches!(foo.await, Ok(_));
415
416 drop(proxy);
417 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
418 TestExecutor::advance_to(initial + IDLE_DURATION).await;
419 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
420
421 assert_eq!(mock_server.call_count.get(), 1);
422 assert!(mock_server.stalled.load(Ordering::SeqCst));
423 mock_server.assert_fs_gave_back_server_end(client_end);
424 }
425
426 #[fuchsia::test(allow_stalls = false)]
427 async fn stalled_then_request() {
428 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
429 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
430 let (initial, mock_server, fs) = setup_test(server_end).await;
431 pin_mut!(fs);
432
433 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
434 TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
435 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
436
437 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
438 select! {
439 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
440 _ = fs => unreachable!(),
441 };
442 assert_eq!(mock_server.call_count.get(), 1);
443
444 drop(proxy);
445 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
446 TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
447 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
448
449 assert!(mock_server.stalled.load(Ordering::SeqCst));
450 mock_server.assert_fs_gave_back_server_end(client_end);
451 }
452
453 #[fuchsia::test(allow_stalls = false)]
459 async fn periodic_requests() {
460 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
461 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
462 let (mut current_time, mock_server, fs) = setup_test(server_end).await;
463 let fs = fasync::Task::local(fs);
464
465 const NUM_FOO_REQUESTS: usize = 10;
467 for _ in 0..NUM_FOO_REQUESTS {
468 let request_interval = IDLE_DURATION / 2;
469 current_time += request_interval;
470 TestExecutor::advance_to(current_time).await;
471 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
472 assert_matches!(proxy.foo().await, Ok(_));
473 }
474 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
475
476 for _ in 0..NUM_FOO_REQUESTS {
478 let request_interval = IDLE_DURATION * 2;
479 current_time += request_interval;
480 TestExecutor::advance_to(current_time).await;
481 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
482 let foo = proxy.foo();
483 pin_mut!(foo);
484 assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
485 }
486 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
487
488 fs.await;
489 mock_server.assert_fs_gave_back_server_end(client_end);
490 }
491
492 #[fuchsia::test(allow_stalls = false)]
496 async fn some_other_outgoing_dir_connection_blocks_stalling() {
497 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
498 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
499 let (initial, mock_server, fs) = setup_test(server_end).await;
500 pin_mut!(fs);
501
502 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
503
504 {
505 let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
507
508 TestExecutor::advance_to(initial + IDLE_DURATION).await;
509 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
510
511 assert_matches!(
512 fuchsia_fs::directory::readdir(&svc).await,
513 Ok(ref entries)
514 if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
515 );
516 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
517
518 TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
520 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
521 }
522
523 fs.await;
525 assert!(mock_server.stalled.load(Ordering::SeqCst));
526 mock_server.assert_fs_gave_back_server_end(client_end);
527 }
528
529 #[fuchsia::test(allow_stalls = false)]
533 async fn end_to_end() {
534 let initial = fasync::MonotonicInstant::from_nanos(0);
535 TestExecutor::advance_to(initial).await;
536
537 let mock_server = MockServer::new();
538 let mock_server_clone = mock_server.clone();
539
540 const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
541 let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
542 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
543
544 let component_task = async move {
545 let mut server_end = Some(server_end);
546 let mut loop_count = 0;
547 loop {
548 let mut fs = ServiceFs::new();
549 fs.serve_connection(server_end.unwrap())
550 .unwrap()
551 .dir("svc")
552 .add_fidl_service(Requests::ServiceA);
553
554 let mock_server_clone = mock_server_clone.clone();
555 fs.until_stalled(idle_duration)
556 .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
557 .await;
558
559 let stalled_server_end = mock_server.server_end.lock().unwrap().take();
560 let Some(stalled_server_end) = stalled_server_end else {
561 return loop_count;
563 };
564
565 fasync::OnSignals::new(
566 &stalled_server_end,
567 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
568 )
569 .await
570 .unwrap();
571 server_end = Some(stalled_server_end.into());
572 loop_count += 1;
573 }
574 };
575 let component_task = fasync::Task::local(component_task);
576
577 let mut deadline = initial;
580 const NUM_REQUESTS: usize = 30;
581 for delay_factor in 0..NUM_REQUESTS {
582 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
583 proxy.foo().await.unwrap();
584 drop(proxy);
585 deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
586 TestExecutor::advance_to(deadline).await;
587 }
588
589 drop(client_end);
590 let loop_count = component_task.await;
591 assert_eq!(loop_count, 25);
593 assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
594 assert!(mock_server.stalled.load(Ordering::SeqCst));
595 }
596}