fuchsia_component_server/
until_stalled.rs1use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use detect_stall::StallableRequestStream;
12use fidl::endpoints::ServerEnd;
13use futures::channel::oneshot::{self, Canceled};
14use futures::future::FusedFuture;
15use futures::{FutureExt, Stream, StreamExt};
16use pin_project::pin_project;
17use vfs::directory::immutable::connection::ImmutableConnection;
18use vfs::directory::immutable::Simple;
19use vfs::execution_scope::{ActiveGuard, ExecutionScope};
20use vfs::ToObjectRequest;
21use zx::MonotonicDuration;
22use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
23
24use super::{ServiceFs, ServiceObjTrait};
25
26type StalledFut = Pin<Box<dyn FusedFuture<Output = Option<zx::Channel>>>>;
29
30#[pin_project]
36pub struct StallableServiceFs<ServiceObjTy: ServiceObjTrait> {
37 #[pin]
38 fs: ServiceFs<ServiceObjTy>,
39 connector: OutgoingConnector,
40 state: State,
41 debounce_interval: zx::MonotonicDuration,
42 is_terminated: bool,
43}
44
45pub enum Item<Output> {
47 Request(Output, ActiveGuard),
52
53 Stalled(zx::Channel),
57}
58
59enum State {
70 Running { stalled: StalledFut },
71 Stalled { channel: Option<fasync::OnSignals<'static, zx::Channel>> },
74}
75
76impl<ServiceObjTy: ServiceObjTrait> Stream for StallableServiceFs<ServiceObjTy> {
77 type Item = Item<ServiceObjTy::Output>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 let mut this = self.project();
81 if *this.is_terminated {
82 return Poll::Ready(None);
83 }
84
85 let poll_fs = this.fs.poll_next_unpin(cx);
87 if let Poll::Ready(Some(request)) = poll_fs {
88 return Poll::Ready(Some(Item::Request(request, this.connector.scope.active_guard())));
90 }
91
92 loop {
95 match &mut this.state {
96 State::Running { stalled } => {
97 let channel = std::task::ready!(stalled.as_mut().poll(cx));
98 let channel = channel
99 .map(|c| fasync::OnSignals::new(c.into(), zx::Signals::CHANNEL_READABLE));
100 *this.state = State::Stalled { channel };
102 }
103 State::Stalled { channel } => {
104 if let Poll::Ready(None) = poll_fs {
105 *this.is_terminated = true;
107 return Poll::Ready(
108 channel.take().map(|wait| Item::Stalled(wait.take_handle().into())),
109 );
110 }
111 if channel.is_none() {
112 return Poll::Pending;
116 }
117 let readable = channel.as_mut().unwrap().poll_unpin(cx);
119 let _ = std::task::ready!(readable);
120 let wait = channel.take().unwrap();
122 let stalled =
123 this.connector.serve(wait.take_handle().into(), *this.debounce_interval);
124 *this.state = State::Running { stalled };
126 }
127 }
128 }
129 }
130}
131
132struct OutgoingConnector {
133 flags: fio::Flags,
134 scope: ExecutionScope,
135 dir: Arc<Simple>,
136}
137
138impl OutgoingConnector {
139 fn serve(
145 &mut self,
146 server_end: ServerEnd<fio::DirectoryMarker>,
147 debounce_interval: MonotonicDuration,
148 ) -> StalledFut {
149 let (unbound_sender, unbound_receiver) = oneshot::channel();
150 let object_request = self.flags.to_object_request(server_end);
151 let scope = self.scope.clone();
152 let dir = self.dir.clone();
153 let flags = self.flags;
154 scope.clone().spawn(object_request.handle_async(async move |object_request| {
155 ImmutableConnection::create_transform_stream(
156 scope,
157 dir,
158 flags,
159 object_request,
160 move |stream| {
161 StallableRequestStream::new(
162 stream,
163 debounce_interval,
164 move |maybe_channel: Option<zx::Channel>| {
167 _ = unbound_sender.send(maybe_channel);
168 },
169 )
170 },
171 )
172 .await
173 }));
174 Box::pin(
175 unbound_receiver
176 .map(|result| match result {
177 Ok(maybe_channel) => maybe_channel,
178 Err(Canceled) => None,
179 })
180 .fuse(),
181 )
182 }
183}
184
185impl<ServiceObjTy: ServiceObjTrait> StallableServiceFs<ServiceObjTy> {
186 pub(crate) fn new(
187 mut fs: ServiceFs<ServiceObjTy>,
188 debounce_interval: zx::MonotonicDuration,
189 ) -> Self {
190 let channel_queue =
191 fs.channel_queue.as_mut().expect("Must not poll the original ServiceFs");
192 assert!(
193 channel_queue.len() == 1,
194 "Must have exactly one connection to serve, \
195 e.g. did you call ServiceFs::take_and_serve_directory_handle?"
196 );
197 let server_end = std::mem::replace(channel_queue, vec![]).into_iter().next().unwrap();
198 let flags = ServiceFs::<ServiceObjTy>::base_connection_flags();
199 let scope = fs.scope.clone();
200 let dir = fs.dir.clone();
201 let mut connector = OutgoingConnector { flags, scope, dir };
202 let stalled = connector.serve(server_end, debounce_interval);
203 Self {
204 fs,
205 connector,
206 state: State::Running { stalled },
207 debounce_interval,
208 is_terminated: false,
209 }
210 }
211
212 pub fn active_guard(&self) -> ActiveGuard {
215 self.connector.scope.active_guard()
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use assert_matches::assert_matches;
223 use fasync::TestExecutor;
224 use fidl::endpoints::ClientEnd;
225 use fidl_fuchsia_component_client_test::{
226 ProtocolAMarker, ProtocolARequest, ProtocolARequestStream,
227 };
228 use fuchsia_component_client::connect_to_protocol_at_dir_svc;
229 use fuchsia_component_directory::open_directory_async;
230 use futures::future::BoxFuture;
231 use futures::{pin_mut, select, TryStreamExt};
232 use std::sync::atomic::{AtomicBool, Ordering};
233 use std::sync::Mutex;
234 use test_util::Counter;
235 use zx::AsHandleRef;
236
237 enum Requests {
238 ServiceA(ProtocolARequestStream),
239 }
240
241 #[derive(Clone)]
242 struct MockServer {
243 call_count: Arc<Counter>,
244 stalled: Arc<AtomicBool>,
245 server_end: Arc<Mutex<Option<zx::Channel>>>,
246 }
247
248 impl MockServer {
249 fn new() -> Self {
250 let call_count = Arc::new(Counter::new(0));
251 let stalled = Arc::new(AtomicBool::new(false));
252 let server_end = Arc::new(Mutex::new(None));
253 Self { call_count, stalled, server_end }
254 }
255
256 fn handle(&self, item: Item<Requests>) -> BoxFuture<'static, ()> {
257 let stalled = self.stalled.clone();
258 let call_count = self.call_count.clone();
259 let server_end = self.server_end.clone();
260 async move {
261 match item {
262 Item::Request(requests, active_guard) => {
263 let _active_guard = active_guard;
264 let Requests::ServiceA(mut request_stream) = requests;
265 while let Ok(Some(request)) = request_stream.try_next().await {
266 match request {
267 ProtocolARequest::Foo { responder } => {
268 call_count.inc();
269 let _ = responder.send();
270 }
271 }
272 }
273 }
274 Item::Stalled(channel) => {
275 *server_end.lock().unwrap() = Some(channel);
276 stalled.store(true, Ordering::SeqCst);
277 }
278 }
279 }
280 .boxed()
281 }
282
283 #[track_caller]
284 fn assert_fs_gave_back_server_end(self, client_end: ClientEnd<fio::DirectoryMarker>) {
285 let reclaimed_server_end: zx::Channel = self.server_end.lock().unwrap().take().unwrap();
286 assert_eq!(
287 client_end.get_koid().unwrap(),
288 reclaimed_server_end.basic_info().unwrap().related_koid
289 )
290 }
291 }
292
293 async fn setup_test(
295 server_end: ServerEnd<fio::DirectoryMarker>,
296 ) -> (fasync::MonotonicInstant, MockServer, impl FusedFuture<Output = ()>) {
297 let initial = fasync::MonotonicInstant::from_nanos(0);
298 TestExecutor::advance_to(initial).await;
299 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
300
301 let mut fs = ServiceFs::new();
302 fs.serve_connection(server_end).unwrap().dir("svc").add_fidl_service(Requests::ServiceA);
303
304 let mock_server = MockServer::new();
305 let mock_server_clone = mock_server.clone();
306 let fs = fs
307 .until_stalled(IDLE_DURATION)
308 .for_each_concurrent(None, move |item| mock_server_clone.handle(item));
309
310 (initial, mock_server, fs)
311 }
312
313 #[fuchsia::test(allow_stalls = false)]
314 async fn drain_request() {
315 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
316 const NUM_FOO_REQUESTS: usize = 10;
317 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
318 let (initial, mock_server, fs) = setup_test(server_end).await;
319 pin_mut!(fs);
320
321 let mut proxies = Vec::new();
322 for _ in 0..NUM_FOO_REQUESTS {
323 proxies.push(connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap());
324 }
325
326 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
328
329 TestExecutor::advance_to(initial + (IDLE_DURATION * 2)).await;
331 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
332
333 for proxy in proxies.iter() {
335 select! {
336 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
337 _ = fs => unreachable!(),
338 };
339 }
340
341 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
343 drop(proxies);
344 fs.await;
345
346 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
348 assert!(mock_server.stalled.load(Ordering::SeqCst));
349 mock_server.assert_fs_gave_back_server_end(client_end);
350 }
351
352 #[fuchsia::test(allow_stalls = false)]
353 async fn no_request() {
354 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
355 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
356 let (initial, mock_server, fs) = setup_test(server_end).await;
357 pin_mut!(fs);
358
359 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
360 TestExecutor::advance_to(initial + IDLE_DURATION).await;
361 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
362
363 assert_eq!(mock_server.call_count.get(), 0);
364 assert!(mock_server.stalled.load(Ordering::SeqCst));
365 mock_server.assert_fs_gave_back_server_end(client_end);
366 }
367
368 #[fuchsia::test(allow_stalls = false)]
369 async fn outgoing_dir_client_closed() {
370 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
371 let (_initial, mock_server, fs) = setup_test(server_end).await;
372 pin_mut!(fs);
373
374 drop(client_end);
375 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
376
377 assert_eq!(mock_server.call_count.get(), 0);
378 assert!(!mock_server.stalled.load(Ordering::SeqCst));
379 assert!(mock_server.server_end.lock().unwrap().is_none());
380 }
381
382 #[fuchsia::test(allow_stalls = false)]
383 async fn request_then_stalled() {
384 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
385
386 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
387 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
388
389 let foo = proxy.foo().fuse();
390 pin_mut!(foo);
391 assert!(TestExecutor::poll_until_stalled(&mut foo).await.is_pending());
392
393 let (initial, mock_server, fs) = setup_test(server_end).await;
394 pin_mut!(fs);
395
396 assert_eq!(mock_server.call_count.get(), 0);
398 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
399 assert_eq!(mock_server.call_count.get(), 1);
400 assert_matches!(foo.await, Ok(_));
401
402 drop(proxy);
403 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
404 TestExecutor::advance_to(initial + IDLE_DURATION).await;
405 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
406
407 assert_eq!(mock_server.call_count.get(), 1);
408 assert!(mock_server.stalled.load(Ordering::SeqCst));
409 mock_server.assert_fs_gave_back_server_end(client_end);
410 }
411
412 #[fuchsia::test(allow_stalls = false)]
413 async fn stalled_then_request() {
414 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
415 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
416 let (initial, mock_server, fs) = setup_test(server_end).await;
417 pin_mut!(fs);
418
419 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
420 TestExecutor::advance_to(initial + (IDLE_DURATION / 2)).await;
421 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
422
423 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
424 select! {
425 result = proxy.foo().fuse() => assert_matches!(result, Ok(_)),
426 _ = fs => unreachable!(),
427 };
428 assert_eq!(mock_server.call_count.get(), 1);
429
430 drop(proxy);
431 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
432 TestExecutor::advance_to(initial + (IDLE_DURATION / 2) + IDLE_DURATION).await;
433 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_ready());
434
435 assert!(mock_server.stalled.load(Ordering::SeqCst));
436 mock_server.assert_fs_gave_back_server_end(client_end);
437 }
438
439 #[fuchsia::test(allow_stalls = false)]
445 async fn periodic_requests() {
446 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
447 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
448 let (mut current_time, mock_server, fs) = setup_test(server_end).await;
449 let fs = fasync::Task::local(fs);
450
451 const NUM_FOO_REQUESTS: usize = 10;
453 for _ in 0..NUM_FOO_REQUESTS {
454 let request_interval = IDLE_DURATION / 2;
455 current_time += request_interval;
456 TestExecutor::advance_to(current_time).await;
457 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
458 assert_matches!(proxy.foo().await, Ok(_));
459 }
460 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
461
462 for _ in 0..NUM_FOO_REQUESTS {
464 let request_interval = IDLE_DURATION * 2;
465 current_time += request_interval;
466 TestExecutor::advance_to(current_time).await;
467 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
468 let foo = proxy.foo();
469 pin_mut!(foo);
470 assert_matches!(TestExecutor::poll_until_stalled(&mut foo).await, Poll::Pending);
471 }
472 assert_eq!(mock_server.call_count.get(), NUM_FOO_REQUESTS);
473
474 fs.await;
475 mock_server.assert_fs_gave_back_server_end(client_end);
476 }
477
478 #[fuchsia::test(allow_stalls = false)]
482 async fn some_other_outgoing_dir_connection_blocks_stalling() {
483 const IDLE_DURATION: MonotonicDuration = MonotonicDuration::from_nanos(1_000_000);
484 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
485 let (initial, mock_server, fs) = setup_test(server_end).await;
486 pin_mut!(fs);
487
488 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
489
490 {
491 let svc = open_directory_async(&client_end, "svc", fio::R_STAR_DIR).unwrap();
493
494 TestExecutor::advance_to(initial + IDLE_DURATION).await;
495 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
496
497 assert_matches!(
498 fuchsia_fs::directory::readdir(&svc).await,
499 Ok(ref entries)
500 if entries.len() == 1 && entries[0].name == "fuchsia.component.client.test.ProtocolA"
501 );
502 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
503
504 TestExecutor::advance_to(initial + (IDLE_DURATION * 3)).await;
506 assert!(TestExecutor::poll_until_stalled(&mut fs).await.is_pending());
507 }
508
509 fs.await;
511 assert!(mock_server.stalled.load(Ordering::SeqCst));
512 mock_server.assert_fs_gave_back_server_end(client_end);
513 }
514
515 #[fuchsia::test(allow_stalls = false)]
519 async fn end_to_end() {
520 let initial = fasync::MonotonicInstant::from_nanos(0);
521 TestExecutor::advance_to(initial).await;
522
523 let mock_server = MockServer::new();
524 let mock_server_clone = mock_server.clone();
525
526 const MIN_REQUEST_INTERVAL: i64 = 10_000_000;
527 let idle_duration = MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * 5);
528 let (client_end, server_end) = fidl::endpoints::create_endpoints::<fio::DirectoryMarker>();
529
530 let component_task = async move {
531 let mut server_end = Some(server_end);
532 let mut loop_count = 0;
533 loop {
534 let mut fs = ServiceFs::new();
535 fs.serve_connection(server_end.unwrap())
536 .unwrap()
537 .dir("svc")
538 .add_fidl_service(Requests::ServiceA);
539
540 let mock_server_clone = mock_server_clone.clone();
541 fs.until_stalled(idle_duration)
542 .for_each_concurrent(None, move |item| mock_server_clone.handle(item))
543 .await;
544
545 let stalled_server_end = mock_server.server_end.lock().unwrap().take();
546 let Some(stalled_server_end) = stalled_server_end else {
547 return loop_count;
549 };
550
551 fasync::OnSignals::new(
552 &stalled_server_end,
553 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
554 )
555 .await
556 .unwrap();
557 server_end = Some(stalled_server_end.into());
558 loop_count += 1;
559 }
560 };
561 let component_task = fasync::Task::local(component_task);
562
563 let mut deadline = initial;
566 const NUM_REQUESTS: usize = 30;
567 for delay_factor in 0..NUM_REQUESTS {
568 let proxy = connect_to_protocol_at_dir_svc::<ProtocolAMarker>(&client_end).unwrap();
569 proxy.foo().await.unwrap();
570 drop(proxy);
571 deadline += MonotonicDuration::from_nanos(MIN_REQUEST_INTERVAL * (delay_factor as i64));
572 TestExecutor::advance_to(deadline).await;
573 }
574
575 drop(client_end);
576 let loop_count = component_task.await;
577 assert_eq!(loop_count, 25);
579 assert_eq!(mock_server.call_count.get(), NUM_REQUESTS);
580 assert!(mock_server.stalled.load(Ordering::SeqCst));
581 }
582}