fuchsia_component_server/
until_stalled.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for running the ServiceFs until stalled.
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use futures::channel::oneshot::{self, Canceled};
14use futures::future::FusedFuture;
15use futures::{FutureExt, Stream, StreamExt};
16use pin_project::pin_project;
17use vfs::directory::immutable::connection::ImmutableConnection;
18use vfs::directory::immutable::Simple;
19use vfs::execution_scope::{ActiveGuard, ExecutionScope};
20use vfs::ToObjectRequest;
21use zx::MonotonicDuration;
22use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
23
24use super::{ServiceFs, ServiceObjTrait};
25
26/// The future type that resolves when an outgoing directory connection has stalled
27/// for a timeout or completed.
28type StalledFut = Pin<Box<dyn FusedFuture<Output = Option<zx::Channel>>>>;
29
30/// A wrapper around the base [`ServiceFs`] that streams out capability connection requests.
31/// Additionally, it will yield [`Item::Stalled`] if there is no work happening in the fs
32/// and the main outgoing directory connection has not received messages for some time.
33///
34/// Use [`ServiceFs::until_stalled`] to produce an instance. Refer to details there.
35#[pin_project]
36pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
37    #[pin]
38    fs: ServiceFs<ServiceObjTy>,
39    connector: OutgoingConnector,
40    state: State,
41    debounce_interval: zx::MonotonicDuration,
42    is_terminated: bool,
43}
44
45/// The item yielded by a [`StallableServiceFs`] stream.
46pub enum Item<Output> {
47    /// A new connection request to a capability. `ServiceObjTy::Output` contains more
48    /// information identifying the capability requested. The [`ActiveGuard`] should be
49    /// held alive as long as you are processing the connection, or doing any other work
50    /// where you would like to prevent the [`ServiceFs`] from shutting down.
51    Request(Output, ActiveGuard),
52
53    /// The [`ServiceFs`] has stalled. The unbound outgoing directory server endpoint will
54    /// be returned here. The stream will complete right after this. You should typically
55    /// escrow the server endpoint back to component manager, and then exit the component.
56    Stalled(zx::Channel),
57}
58
59// Implementation detail below
60
61/// We use a state machine to detect stalling. The general structure is:
62/// - When the service fs is running, wait for the outgoing directory connection to stall.
63/// - If the outgoing directory stalled, unbind it and wait for readable.
64/// - If it is readable, we'll add back the connection to the service fs and back to wait for stall.
65/// - If the service fs finished while the outgoing directory is unbound, we'll
66///   complete the stream and return the endpoint to the user. Note that the service fs might take
67///   a while to finish even after the outgoing directory has been unbound, due to
68///   [`ActiveGuard`]s held by the user or due to other long-running connections.
69enum State {
70    Running { stalled: StalledFut },
71    // If the `channel` is `None`, the outgoing directory stream completed without stalling.
72    // We just need to wait for the `ServiceFs` to finish.
73    Stalled { channel: Option<fasync::OnSignals<'static, zx::Channel>> },
74}
75
76impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
77    type Item = Item<ServiceObjTy::Output>;
78
79    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        let mut this = self.project();
81        if *this.is_terminated {
82            return Poll::Ready(None);
83        }
84
85        // Poll the underlying service fs to handle requests.
86        let poll_fs = this.fs.poll_next_unpin(cx);
87        if let Poll::Ready(Some(request)) = poll_fs {
88            // If there is some connection request, always return that to the user first.
89            return Poll::Ready(Some(Item::Request(request, this.connector.scope.active_guard())));
90        }
91
92        // If we get here, the underlying service fs is either finished, or pending.
93        // Poll in a loop until the state no longer changes.
94        loop {
95            match &mut this.state {
96                State::Running { stalled } => {
97                    let channel = std::task::ready!(stalled.as_mut().poll(cx));
98                    let channel = channel
99                        .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
100                    // The state will be polled on the next loop iteration.
101                    *this.state = State::Stalled { channel };
102                }
103                State::Stalled { channel } => {
104                    if let Poll::Ready(None) = poll_fs {
105                        // The service fs finished. Return the channel if we have it.
106                        *this.is_terminated = true;
107                        return Poll::Ready(
108                            channel.take().map(|wait| Item::Stalled(wait.take_handle().into())),
109                        );
110                    }
111                    if channel.is_none() {
112                        // The outgoing directory FIDL stream completed (client closed or
113                        // errored) without stalling, but the service fs is processing
114                        // other requests. Simply wait for that to finish.
115                        return Poll::Pending;
116                    }
117                    // Otherwise, arrange to be polled again if the channel is readable.
118                    let readable = channel.as_mut().unwrap().poll_unpin(cx);
119                    let _ = std::task::ready!(readable);
120                    // Server endpoint is readable again. Restore the connection.
121                    let wait = channel.take().unwrap();
122                    let stalled =
123                        this.connector.serve(wait.take_handle().into(), *this.debounce_interval);
124                    // The state will be polled on the next loop iteration.
125                    *this.state = State::Running { stalled };
126                }
127            }
128        }
129    }
130}
131
132struct OutgoingConnector {
133    flags: fio::Flags,
134    scope: ExecutionScope,
135    dir: Arc<Simple>,
136}
137
138impl OutgoingConnector {
139    /// Adds a stallable outgoing directory connection.
140    ///
141    /// If the request stream completed, the returned future will resolve with `None`.
142    /// If the request stream did not encounter new requests for `debounce_interval`, it will be
143    /// unbound, and the returned future will resolve with `Some(channel)`.
144    fn serve(
145        &mut self,
146        server_end: ServerEnd<fio::DirectoryMarker>,
147        debounce_interval: MonotonicDuration,
148    ) -> StalledFut {
149        let (unbound_sender, unbound_receiver) = oneshot::channel();
150        let object_request = self.flags.to_object_request(server_end);
151        let scope = self.scope.clone();
152        let dir = self.dir.clone();
153        let flags = self.flags;
154        scope.clone().spawn(object_request.handle_async(async move |object_request| {
155            ImmutableConnection::create_transform_stream(
156                scope,
157                dir,
158                flags,
159                object_request,
160                move |stream| {
161                    StallableRequestStream::new(
162                        stream,
163                        debounce_interval,
164                        // This function will be called with the server endpoint when
165                        // the directory request stream is stalled for `debounce_interval`
166                        move |maybe_channel: Option<zx::Channel>| {
167                            _ = unbound_sender.send(maybe_channel);
168                        },
169                    )
170                },
171            )
172            .await
173        }));
174        Box::pin(
175            unbound_receiver
176                .map(|result| match result {
177                    Ok(maybe_channel) => maybe_channel,
178                    Err(Canceled) => None,
179                })
180                .fuse(),
181        )
182    }
183}
184
185impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
186    pub(crate) fn new(
187        mut fs: ServiceFs<ServiceObjTy>,
188        debounce_interval: zx::MonotonicDuration,
189    ) -> Self {
190        let channel_queue =
191            fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
192        assert!(
193            channel_queue.len() == 1,
194            "Must have exactly one connection to serve, \
195            e.g. did you call ServiceFs::take_and_serve_directory_handle?"
196        );
197        let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
198        let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
199        let scope = fs.scope.clone();
200        let dir = fs.dir.clone();
201        let mut connector = OutgoingConnector { flags, scope, dir };
202        let stalled = connector.serve(server_end, debounce_interval);
203        Self {
204            fs,
205            connector,
206            state: State::Running { stalled },
207            debounce_interval,
208            is_terminated: false,
209        }
210    }
211
212    /// Returns an [`ActiveGuard`] that will prevent the [`ServiceFs`] from shutting down until
213    /// the [`ActiveGuard`] is dropped.
214    pub fn active_guard(&self) -> ActiveGuard {
215        self.connector.scope.active_guard()
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use assert_matches::assert_matches;
223    use fasync::TestExecutor;
224    use fidl::endpoints::ClientEnd;
225    use fidl_fuchsia_component_client_test::{
226        ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
227    };
228    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
229    use fuchsia_component_directory::open_directory_async;
230    use futures::future::BoxFuture;
231    use futures::{pin_mut, select, TryStreamExt};
232    use std::sync::atomic::{AtomicBool, Ordering};
233    use std::sync::Mutex;
234    use test_util::Counter;
235    use zx::AsHandleRef;
236
237    enum Requests {
238        ServiceA(ProtocolARequestStream),
239    }
240
241    #[derive(Clone)]
242    struct MockServer {
243        call_count: Arc<Counter>,
244        stalled: Arc<AtomicBool>,
245        server_end: Arc<Mutex<Option<zx::Channel>>>,
246    }
247
248    impl MockServer {
249        fn new() -> Self {
250            let call_count = Arc::new(Counter::new(0));
251            let stalled = Arc::new(AtomicBool::new(false));
252            let server_end = Arc::new(Mutex::new(None));
253            Self { call_count, stalled, server_end }
254        }
255
256        fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
257            let stalled = self.stalled.clone();
258            let call_count = self.call_count.clone();
259            let server_end = self.server_end.clone();
260            async move {
261                match item {
262                    Item::Request(requests, active_guard) => {
263                        let _active_guard = active_guard;
264                        let Requests::ServiceA(mut request_stream) = requests;
265                        while let Ok(Some(request)) = request_stream.try_next().await {
266                            match request {
267                                ProtocolARequest::Foo { responder } => {
268                                    call_count.inc();
269                                    let _ = responder.send();
270                                }
271                            }
272                        }
273                    }
274                    Item::Stalled(channel) => {
275                        *server_end.lock().unwrap() = Some(channel);
276                        stalled.store(true, Ordering::SeqCst);
277                    }
278                }
279            }
280            .boxed()
281        }
282
283        #[track_caller]
284        fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
285            let reclaimed_server_end: zx::Channel = self.server_end.lock().unwrap().take().unwrap();
286            assert_eq!(
287                client_end.get_koid().unwrap(),
288                reclaimed_server_end.basic_info().unwrap().related_koid
289            )
290        }
291    }
292
293    /// Initializes fake time; creates VFS with a single mock server, and returns them.
294    async fn setup_test(
295        server_end: ServerEnd<fio::DirectoryMarker>,
296    ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
297        let initial = fasync::MonotonicInstant::from_nanos(0);
298        TestExecutor::advance_to(initial).await;
299        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
300
301        let mut fs = ServiceFs::new();
302        fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
303
304        let mock_server = MockServer::new();
305        let mock_server_clone = mock_server.clone();
306        let fs = fs
307            .until_stalled(IDLE_DURATION)
308            .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
309
310        (initial, mock_server, fs)
311    }
312
313    #[fuchsia::test(allow_stalls = false)]
314    async fn drain_request() {
315        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
316        const NUM_FOO_REQUESTS: usize = 10;
317        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
318        let (initial, mock_server, fs) = setup_test(server_end).await;
319        pin_mut!(fs);
320
321        let mut proxies = Vec::new();
322        for _ in 0..NUM_FOO_REQUESTS {
323            proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
324        }
325
326        // Accept the connections.
327        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
328
329        // Active FIDL connections block idle, no matter the wait.
330        TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
331        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
332
333        // Make some requests.
334        for proxy in proxies.iter() {
335            select! {
336                result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
337                _ = fs => unreachable!(),
338            };
339        }
340
341        // Dropping FIDL connections free the ServiceFs to complete.
342        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
343        drop(proxies);
344        fs.await;
345
346        // Requests were handled.
347        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
348        assert!(mock_server.stalled.load(Ordering::SeqCst));
349        mock_server.assert_fs_gave_back_server_end(client_end);
350    }
351
352    #[fuchsia::test(allow_stalls = false)]
353    async fn no_request() {
354        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
355        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
356        let (initial, mock_server, fs) = setup_test(server_end).await;
357        pin_mut!(fs);
358
359        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
360        TestExecutor::advance_to(initial + IDLE_DURATION).await;
361        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
362
363        assert_eq!(mock_server.call_count.get(), 0);
364        assert!(mock_server.stalled.load(Ordering::SeqCst));
365        mock_server.assert_fs_gave_back_server_end(client_end);
366    }
367
368    #[fuchsia::test(allow_stalls = false)]
369    async fn outgoing_dir_client_closed() {
370        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
371        let (_initial, mock_server, fs) = setup_test(server_end).await;
372        pin_mut!(fs);
373
374        drop(client_end);
375        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
376
377        assert_eq!(mock_server.call_count.get(), 0);
378        assert!(!mock_server.stalled.load(Ordering::SeqCst));
379        assert!(mock_server.server_end.lock().unwrap().is_none());
380    }
381
382    #[fuchsia::test(allow_stalls = false)]
383    async fn request_then_stalled() {
384        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
385
386        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
387        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
388
389        let foo = proxy.foo().fuse();
390        pin_mut!(foo);
391        assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
392
393        let (initial, mock_server, fs) = setup_test(server_end).await;
394        pin_mut!(fs);
395
396        // Poll the fs to process the FIDL.
397        assert_eq!(mock_server.call_count.get(), 0);
398        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
399        assert_eq!(mock_server.call_count.get(), 1);
400        assert_matches!(foo.await, Ok(_));
401
402        drop(proxy);
403        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
404        TestExecutor::advance_to(initial + IDLE_DURATION).await;
405        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
406
407        assert_eq!(mock_server.call_count.get(), 1);
408        assert!(mock_server.stalled.load(Ordering::SeqCst));
409        mock_server.assert_fs_gave_back_server_end(client_end);
410    }
411
412    #[fuchsia::test(allow_stalls = false)]
413    async fn stalled_then_request() {
414        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
415        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
416        let (initial, mock_server, fs) = setup_test(server_end).await;
417        pin_mut!(fs);
418
419        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
420        TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
421        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
422
423        let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
424        select! {
425            result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
426            _ = fs => unreachable!(),
427        };
428        assert_eq!(mock_server.call_count.get(), 1);
429
430        drop(proxy);
431        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
432        TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
433        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
434
435        assert!(mock_server.stalled.load(Ordering::SeqCst));
436        mock_server.assert_fs_gave_back_server_end(client_end);
437    }
438
439    /// If periodic FIDL connections are made at an interval below the idle
440    /// duration, the service fs should not stall.
441    ///
442    /// If periodic FIDL connections are made at an interval above the idle
443    /// duration, the service fs should stall.
444    #[fuchsia::test(allow_stalls = false)]
445    async fn periodic_requests() {
446        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
447        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
448        let (mut current_time, mock_server, fs) = setup_test(server_end).await;
449        let fs = fasync::Task::local(fs);
450
451        // Interval below the idle duration.
452        const NUM_FOO_REQUESTS: usize = 10;
453        for _ in 0..NUM_FOO_REQUESTS {
454            let request_interval = IDLE_DURATION / 2;
455            current_time += request_interval;
456            TestExecutor::advance_to(current_time).await;
457            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
458            assert_matches!(proxy.foo().await, Ok(_));
459        }
460        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
461
462        // Interval above the idle duration.
463        for _ in 0..NUM_FOO_REQUESTS {
464            let request_interval = IDLE_DURATION * 2;
465            current_time += request_interval;
466            TestExecutor::advance_to(current_time).await;
467            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
468            let foo = proxy.foo();
469            pin_mut!(foo);
470            assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
471        }
472        assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
473
474        fs.await;
475        mock_server.assert_fs_gave_back_server_end(client_end);
476    }
477
478    /// If there are other connections to the outgoing directory, then the fs will not return unless
479    /// those connections are closed by the client. That's because we currently don't have a way to
480    /// escrow those connections, so we don't want to disrupt them.
481    #[fuchsia::test(allow_stalls = false)]
482    async fn some_other_outgoing_dir_connection_blocks_stalling() {
483        const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
484        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
485        let (initial, mock_server, fs) = setup_test(server_end).await;
486        pin_mut!(fs);
487
488        assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
489
490        {
491            // We can open another connection that's not the main outgoing directory connection,
492            let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
493
494            TestExecutor::advance_to(initial + IDLE_DURATION).await;
495            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
496
497            assert_matches!(
498                fuchsia_fs::directory::readdir(&svc).await,
499                Ok(ref entries)
500                if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
501            );
502            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
503
504            // ... and the service fs won't stall even if we wait past the timeout.
505            TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
506            assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
507        }
508
509        // Closing that connection frees the fs to stall.
510        fs.await;
511        assert!(mock_server.stalled.load(Ordering::SeqCst));
512        mock_server.assert_fs_gave_back_server_end(client_end);
513    }
514
515    /// Emulates a component that receives a bunch of requests, processes them, and then stalls.
516    /// After that, if the outgoing directory is readable, serve it again. No request should be
517    /// dropped, and the fs should stall a bunch of times.
518    #[fuchsia::test(allow_stalls = false)]
519    async fn end_to_end() {
520        let initial = fasync::MonotonicInstant::from_nanos(0);
521        TestExecutor::advance_to(initial).await;
522
523        let mock_server = MockServer::new();
524        let mock_server_clone = mock_server.clone();
525
526        const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
527        let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
528        let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
529
530        let component_task = async move {
531            let mut server_end = Some(server_end);
532            let mut loop_count = 0;
533            loop {
534                let mut fs = ServiceFs::new();
535                fs.serve_connection(server_end.unwrap())
536                    .unwrap()
537                    .dir("svc")
538                    .add_fidl_service(Requests::ServiceA);
539
540                let mock_server_clone = mock_server_clone.clone();
541                fs.until_stalled(idle_duration)
542                    .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
543                    .await;
544
545                let stalled_server_end = mock_server.server_end.lock().unwrap().take();
546                let Some(stalled_server_end) = stalled_server_end else {
547                    // Client closed.
548                    return loop_count;
549                };
550
551                fasync::OnSignals::new(
552                    &stalled_server_end,
553                    zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
554                )
555                .await
556                .unwrap();
557                server_end = Some(stalled_server_end.into());
558                loop_count += 1;
559            }
560        };
561        let component_task = fasync::Task::local(component_task);
562
563        // Make connection requests at increasing intervals, starting from below the idle duration,
564        // to above the idle duration.
565        let mut deadline = initial;
566        const NUM_REQUESTS: usize = 30;
567        for delay_factor in 0..NUM_REQUESTS {
568            let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
569            proxy.foo().await.unwrap();
570            drop(proxy);
571            deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
572            TestExecutor::advance_to(deadline).await;
573        }
574
575        drop(client_end);
576        let loop_count = component_task.await;
577        // Why 25: there are 30 requests. The first 5 intervals are below the idle duration.
578        assert_eq!(loop_count, 25);
579        assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
580        assert!(mock_server.stalled.load(Ordering::SeqCst));
581    }
582}