fuchsia_component_server/
until_stalled.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running the ServiceFs until stalled.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use futures::channel::oneshot::{self, Canceled};
14use futures::future::FusedFuture;
15use futures::{FutureExt, Stream, StreamExt};
16use pin_project::pin_project;
17use vfs::directory::immutable::connection::ImmutableConnection;
18use vfs::directory::immutable::Simple;
19use vfs::execution_scope::{ActiveGuard, ExecutionScope};
20use vfs::ToObjectRequest;
21use zx::MonotonicDuration;
22use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
23
24use super::{ServiceFs, ServiceObjTrait};
25
26/// The future type that resolves when an outgoing directory connection has stalled
27/// for a timeout or completed.
28type StalledFut = Pin<Box<dyn FusedFuture<Output = Option<zx::Channel>>>>;
29
30/// A wrapper around the base [`ServiceFs`] that streams out capability connection requests.
31/// Additionally, it will yield [`Item::Stalled`] if there is no work happening in the fs
32/// and the main outgoing directory connection has not received messages for some time.
33///
34/// Use [`ServiceFs::until_stalled`] to produce an instance. Refer to details there.
35#[pin_project]
36pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
37    #[pin]
38    fs: ServiceFs<ServiceObjTy>,
39    connector: OutgoingConnector,
40    state: State,
41    debounce_interval: zx::MonotonicDuration,
42    is_terminated: bool,
43}
44
45/// The item yielded by a [`StallableServiceFs`] stream.
46pub enum Item<Output> {
47    /// A new connection request to a capability. `ServiceObjTy::Output` contains more
48    /// information identifying the capability requested. The [`ActiveGuard`] should be
49    /// held alive as long as you are processing the connection, or doing any other work
50    /// where you would like to prevent the [`ServiceFs`] from shutting down.
51    Request(Output, ActiveGuard),
52
53    /// The [`ServiceFs`] has stalled. The unbound outgoing directory server endpoint will
54    /// be returned here. The stream will complete right after this. You should typically
55    /// escrow the server endpoint back to component manager, and then exit the component.
56    Stalled(zx::Channel),
57}
58
59// Implementation detail below
60
61/// We use a state machine to detect stalling. The general structure is:
62/// - When the service fs is running, wait for the outgoing directory connection to stall.
63/// - If the outgoing directory stalled, unbind it and wait for readable.
64/// - If it is readable, we'll add back the connection to the service fs and back to wait for stall.
65/// - If the service fs finished while the outgoing directory is unbound, we'll
66///   complete the stream and return the endpoint to the user. Note that the service fs might take
67///   a while to finish even after the outgoing directory has been unbound, due to
68///   [`ActiveGuard`]s held by the user or due to other long-running connections.
69enum State {
70    Running { stalled: StalledFut },
71    // If the `channel` is `None`, the outgoing directory stream completed without stalling.
72    // We just need to wait for the `ServiceFs` to finish.
73    Stalled { channel: Option<fasync::OnSignals<'static, zx::Channel>> },
74}
75
76impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
77    type Item = Item<ServiceObjTy::Output>;
78
79    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        let mut this = self.project();
81        if *this.is_terminated {
82            return Poll::Ready(None);
83        }
84
85        // Poll the underlying service fs to handle requests.
86        //
87        // NOTE: Normally, it isn't safe to poll a stream after it returns None, but ServiceFs
88        // supports this.
89        let poll_fs = this.fs.poll_next_unpin(cx);
90        if let Poll::Ready(Some(request)) = poll_fs {
91            // If there is some connection request, always return that to the user first.
92            return match this.connector.scope.try_active_guard() {
93                Some(guard) => Poll::Ready(Some(Item::Request(request, guard))),
94                None => Poll::Ready(None),
95            };
96        }
97
98        // If we get here, the underlying service fs is either finished, or pending.
99        // Poll in a loop until the state no longer changes.
100        loop {
101            match &mut this.state {
102                State::Running { stalled } => {
103                    let channel = std::task::ready!(stalled.as_mut().poll(cx));
104                    let channel = channel
105                        .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
106                    // The state will be polled on the next loop iteration.
107                    *this.state = State::Stalled { channel };
108                }
109                State::Stalled { channel } => {
110                    if let Poll::Ready(None) = poll_fs {
111                        // The service fs finished. Return the channel if we have it.
112                        *this.is_terminated = true;
113
114                        // On a multithreaded executor, it is possible that some other task took an
115                        // active guard between us polling `fs` above and here, which would mean the
116                        // scope doesn't immediately shutdown. This isn't something we need to
117                        // support so we won't worry about this.
118                        this.connector.scope.shutdown();
119
120                        return Poll::Ready(
121                            channel.take().map(|wait| Item::Stalled(wait.take_handle().into())),
122                        );
123                    }
124                    if channel.is_none() {
125                        // The outgoing directory FIDL stream completed (client closed or
126                        // errored) without stalling, but the service fs is processing
127                        // other requests. Simply wait for that to finish.
128                        return Poll::Pending;
129                    }
130                    // Otherwise, arrange to be polled again if the channel is readable.
131                    let readable = channel.as_mut().unwrap().poll_unpin(cx);
132                    let _ = std::task::ready!(readable);
133                    // Server endpoint is readable again. Restore the connection.
134                    let wait = channel.take().unwrap();
135                    let stalled =
136                        this.connector.serve(wait.take_handle().into(), *this.debounce_interval);
137                    // The state will be polled on the next loop iteration.
138                    *this.state = State::Running { stalled };
139                }
140            }
141        }
142    }
143}
144
145struct OutgoingConnector {
146    flags: fio::Flags,
147    scope: ExecutionScope,
148    dir: Arc<Simple>,
149}
150
151impl OutgoingConnector {
152    /// Adds a stallable outgoing directory connection.
153    ///
154    /// If the request stream completed, the returned future will resolve with `None`.
155    /// If the request stream did not encounter new requests for `debounce_interval`, it will be
156    /// unbound, and the returned future will resolve with `Some(channel)`.
157    fn serve(
158        &mut self,
159        server_end: ServerEnd<fio::DirectoryMarker>,
160        debounce_interval: MonotonicDuration,
161    ) -> StalledFut {
162        let (unbound_sender, unbound_receiver) = oneshot::channel();
163        let object_request = self.flags.to_object_request(server_end);
164        let scope = self.scope.clone();
165        let dir = self.dir.clone();
166        let flags = self.flags;
167        scope.clone().spawn(object_request.handle_async(async move |object_request| {
168            ImmutableConnection::create_transform_stream(
169                scope,
170                dir,
171                flags,
172                object_request,
173                move |stream| {
174                    StallableRequestStream::new(
175                        stream,
176                        debounce_interval,
177                        // This function will be called with the server endpoint when
178                        // the directory request stream is stalled for `debounce_interval`
179                        move |maybe_channel: Option<zx::Channel>| {
180                            _ = unbound_sender.send(maybe_channel);
181                        },
182                    )
183                },
184            )
185            .await
186        }));
187        Box::pin(
188            unbound_receiver
189                .map(|result| match result {
190                    Ok(maybe_channel) => maybe_channel,
191                    Err(Canceled) => None,
192                })
193                .fuse(),
194        )
195    }
196}
197
198impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
199    pub(crate) fn new(
200        mut fs: ServiceFs<ServiceObjTy>,
201        debounce_interval: zx::MonotonicDuration,
202    ) -> Self {
203        let channel_queue =
204            fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
205        assert!(
206            channel_queue.len() == 1,
207            "Must have exactly one connection to serve, \
208            e.g. did you call ServiceFs::take_and_serve_directory_handle?"
209        );
210        let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
211        let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
212        let scope = fs.scope.clone();
213        let dir = fs.dir.clone();
214        let mut connector = OutgoingConnector { flags, scope, dir };
215        let stalled = connector.serve(server_end, debounce_interval);
216        Self {
217            fs,
218            connector,
219            state: State::Running { stalled },
220            debounce_interval,
221            is_terminated: false,
222        }
223    }
224
225    /// Returns an [`ActiveGuard`] that will prevent the [`ServiceFs`] from shutting down until the
226    /// [`ActiveGuard`] is dropped.  This will return None if an active guard cannot be obtained
227    /// (e.g.  if the StallableServiceFs stream has terminated, or is just about to terminate).
228    pub fn try_active_guard(&self) -> Option<ActiveGuard> {
229        self.connector.scope.try_active_guard()
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use assert_matches::assert_matches;
237    use fasync::TestExecutor;
238    use fidl::endpoints::ClientEnd;
239    use fidl_fuchsia_component_client_test::{
240        ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
241    };
242    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
243    use fuchsia_component_directory::open_directory_async;
244    use futures::future::BoxFuture;
245    use futures::{pin_mut, select, TryStreamExt};
246    use std::sync::atomic::{AtomicBool, Ordering};
247    use std::sync::Mutex;
248    use test_util::Counter;
249    use zx::AsHandleRef;
250
251    enum Requests {
252        ServiceA(ProtocolARequestStream),
253    }
254
255    #[derive(Clone)]
256    struct MockServer {
257        call_count: Arc<Counter>,
258        stalled: Arc<AtomicBool>,
259        server_end: Arc<Mutex<Option<zx::Channel>>>,
260    }
261
262    impl MockServer {
263        fn new() -> Self {
264            let call_count = Arc::new(Counter::new(0));
265            let stalled = Arc::new(AtomicBool::new(false));
266            let server_end = Arc::new(Mutex::new(None));
267            Self { call_count, stalled, server_end }
268        }
269
270        fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
271            let stalled = self.stalled.clone();
272            let call_count = self.call_count.clone();
273            let server_end = self.server_end.clone();
274            async move {
275                match item {
276                    Item::Request(requests, active_guard) => {
277                        let _active_guard = active_guard;
278                        let Requests::ServiceA(mut request_stream) = requests;
279                        while let Ok(Some(request)) = request_stream.try_next().await {
280                            match request {
281                                ProtocolARequest::Foo { responder } => {
282                                    call_count.inc();
283                                    let _ = responder.send();
284                                }
285                            }
286                        }
287                    }
288                    Item::Stalled(channel) => {
289                        *server_end.lock().unwrap() = Some(channel);
290                        stalled.store(true, Ordering::SeqCst);
291                    }
292                }
293            }
294            .boxed()
295        }
296
297        #[track_caller]
298        fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
299            let reclaimed_server_end: zx::Channel = self.server_end.lock().unwrap().take().unwrap();
300            assert_eq!(
301                client_end.get_koid().unwrap(),
302                reclaimed_server_end.basic_info().unwrap().related_koid
303            )
304        }
305    }
306
307    /// Initializes fake time; creates VFS with a single mock server, and returns them.
308    async fn setup_test(
309        server_end: ServerEnd<fio::DirectoryMarker>,
310    ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
311        let initial = fasync::MonotonicInstant::from_nanos(0);
312        TestExecutor::advance_to(initial).await;
313        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
314
315        let mut fs = ServiceFs::new();
316        fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
317
318        let mock_server = MockServer::new();
319        let mock_server_clone = mock_server.clone();
320        let fs = fs
321            .until_stalled(IDLE_DURATION)
322            .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
323
324        (initial, mock_server, fs)
325    }
326
327    #[fuchsia::test(allow_stalls = false)]
328    async fn drain_request() {
329        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
330        const NUM_FOO_REQUESTS: usize = 10;
331        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
332        let (initial, mock_server, fs) = setup_test(server_end).await;
333        pin_mut!(fs);
334
335        let mut proxies = Vec::new();
336        for _ in 0..NUM_FOO_REQUESTS {
337            proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
338        }
339
340        // Accept the connections.
341        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
342
343        // Active FIDL connections block idle, no matter the wait.
344        TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
345        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
346
347        // Make some requests.
348        for proxy in proxies.iter() {
349            select! {
350                result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
351                _ = fs => unreachable!(),
352            };
353        }
354
355        // Dropping FIDL connections free the ServiceFs to complete.
356        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
357        drop(proxies);
358        fs.await;
359
360        // Requests were handled.
361        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
362        assert!(mock_server.stalled.load(Ordering::SeqCst));
363        mock_server.assert_fs_gave_back_server_end(client_end);
364    }
365
366    #[fuchsia::test(allow_stalls = false)]
367    async fn no_request() {
368        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
369        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
370        let (initial, mock_server, fs) = setup_test(server_end).await;
371        pin_mut!(fs);
372
373        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
374        TestExecutor::advance_to(initial + IDLE_DURATION).await;
375        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
376
377        assert_eq!(mock_server.call_count.get(), 0);
378        assert!(mock_server.stalled.load(Ordering::SeqCst));
379        mock_server.assert_fs_gave_back_server_end(client_end);
380    }
381
382    #[fuchsia::test(allow_stalls = false)]
383    async fn outgoing_dir_client_closed() {
384        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
385        let (_initial, mock_server, fs) = setup_test(server_end).await;
386        pin_mut!(fs);
387
388        drop(client_end);
389        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
390
391        assert_eq!(mock_server.call_count.get(), 0);
392        assert!(!mock_server.stalled.load(Ordering::SeqCst));
393        assert!(mock_server.server_end.lock().unwrap().is_none());
394    }
395
396    #[fuchsia::test(allow_stalls = false)]
397    async fn request_then_stalled() {
398        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
399
400        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
401        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
402
403        let foo = proxy.foo().fuse();
404        pin_mut!(foo);
405        assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
406
407        let (initial, mock_server, fs) = setup_test(server_end).await;
408        pin_mut!(fs);
409
410        // Poll the fs to process the FIDL.
411        assert_eq!(mock_server.call_count.get(), 0);
412        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
413        assert_eq!(mock_server.call_count.get(), 1);
414        assert_matches!(foo.await, Ok(_));
415
416        drop(proxy);
417        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
418        TestExecutor::advance_to(initial + IDLE_DURATION).await;
419        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
420
421        assert_eq!(mock_server.call_count.get(), 1);
422        assert!(mock_server.stalled.load(Ordering::SeqCst));
423        mock_server.assert_fs_gave_back_server_end(client_end);
424    }
425
426    #[fuchsia::test(allow_stalls = false)]
427    async fn stalled_then_request() {
428        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
429        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
430        let (initial, mock_server, fs) = setup_test(server_end).await;
431        pin_mut!(fs);
432
433        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
434        TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
435        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
436
437        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
438        select! {
439            result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
440            _ = fs => unreachable!(),
441        };
442        assert_eq!(mock_server.call_count.get(), 1);
443
444        drop(proxy);
445        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
446        TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
447        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
448
449        assert!(mock_server.stalled.load(Ordering::SeqCst));
450        mock_server.assert_fs_gave_back_server_end(client_end);
451    }
452
453    /// If periodic FIDL connections are made at an interval below the idle
454    /// duration, the service fs should not stall.
455    ///
456    /// If periodic FIDL connections are made at an interval above the idle
457    /// duration, the service fs should stall.
458    #[fuchsia::test(allow_stalls = false)]
459    async fn periodic_requests() {
460        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
461        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
462        let (mut current_time, mock_server, fs) = setup_test(server_end).await;
463        let fs = fasync::Task::local(fs);
464
465        // Interval below the idle duration.
466        const NUM_FOO_REQUESTS: usize = 10;
467        for _ in 0..NUM_FOO_REQUESTS {
468            let request_interval = IDLE_DURATION / 2;
469            current_time += request_interval;
470            TestExecutor::advance_to(current_time).await;
471            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
472            assert_matches!(proxy.foo().await, Ok(_));
473        }
474        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
475
476        // Interval above the idle duration.
477        for _ in 0..NUM_FOO_REQUESTS {
478            let request_interval = IDLE_DURATION * 2;
479            current_time += request_interval;
480            TestExecutor::advance_to(current_time).await;
481            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
482            let foo = proxy.foo();
483            pin_mut!(foo);
484            assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
485        }
486        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
487
488        fs.await;
489        mock_server.assert_fs_gave_back_server_end(client_end);
490    }
491
492    /// If there are other connections to the outgoing directory, then the fs will not return unless
493    /// those connections are closed by the client. That's because we currently don't have a way to
494    /// escrow those connections, so we don't want to disrupt them.
495    #[fuchsia::test(allow_stalls = false)]
496    async fn some_other_outgoing_dir_connection_blocks_stalling() {
497        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
498        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
499        let (initial, mock_server, fs) = setup_test(server_end).await;
500        pin_mut!(fs);
501
502        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
503
504        {
505            // We can open another connection that's not the main outgoing directory connection,
506            let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
507
508            TestExecutor::advance_to(initial + IDLE_DURATION).await;
509            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
510
511            assert_matches!(
512                fuchsia_fs::directory::readdir(&svc).await,
513                Ok(ref entries)
514                if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
515            );
516            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
517
518            // ... and the service fs won't stall even if we wait past the timeout.
519            TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
520            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
521        }
522
523        // Closing that connection frees the fs to stall.
524        fs.await;
525        assert!(mock_server.stalled.load(Ordering::SeqCst));
526        mock_server.assert_fs_gave_back_server_end(client_end);
527    }
528
529    /// Emulates a component that receives a bunch of requests, processes them, and then stalls.
530    /// After that, if the outgoing directory is readable, serve it again. No request should be
531    /// dropped, and the fs should stall a bunch of times.
532    #[fuchsia::test(allow_stalls = false)]
533    async fn end_to_end() {
534        let initial = fasync::MonotonicInstant::from_nanos(0);
535        TestExecutor::advance_to(initial).await;
536
537        let mock_server = MockServer::new();
538        let mock_server_clone = mock_server.clone();
539
540        const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
541        let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
542        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
543
544        let component_task = async move {
545            let mut server_end = Some(server_end);
546            let mut loop_count = 0;
547            loop {
548                let mut fs = ServiceFs::new();
549                fs.serve_connection(server_end.unwrap())
550                    .unwrap()
551                    .dir("svc")
552                    .add_fidl_service(Requests::ServiceA);
553
554                let mock_server_clone = mock_server_clone.clone();
555                fs.until_stalled(idle_duration)
556                    .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
557                    .await;
558
559                let stalled_server_end = mock_server.server_end.lock().unwrap().take();
560                let Some(stalled_server_end) = stalled_server_end else {
561                    // Client closed.
562                    return loop_count;
563                };
564
565                fasync::OnSignals::new(
566                    &stalled_server_end,
567                    zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
568                )
569                .await
570                .unwrap();
571                server_end = Some(stalled_server_end.into());
572                loop_count += 1;
573            }
574        };
575        let component_task = fasync::Task::local(component_task);
576
577        // Make connection requests at increasing intervals, starting from below the idle duration,
578        // to above the idle duration.
579        let mut deadline = initial;
580        const NUM_REQUESTS: usize = 30;
581        for delay_factor in 0..NUM_REQUESTS {
582            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
583            proxy.foo().await.unwrap();
584            drop(proxy);
585            deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
586            TestExecutor::advance_to(deadline).await;
587        }
588
589        drop(client_end);
590        let loop_count = component_task.await;
591        // Why 25: there are 30 requests. The first 5 intervals are below the idle duration.
592        assert_eq!(loop_count, 25);
593        assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
594        assert!(mock_server.stalled.load(Ordering::SeqCst));
595    }
596}