fuchsia_component_server/lib.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Tools for providing Fuchsia services.
6
7#![deny(missing_docs)]
8
9use anyhow::Error;
10use fidl::endpoints::{
11 DiscoverableProtocolMarker, Proxy as _, RequestStream, ServerEnd, ServiceMarker, ServiceRequest,
12};
13use fuchsia_component_client::connect_channel_to_protocol;
14use futures::channel::mpsc;
15use futures::future::BoxFuture;
16use futures::{FutureExt, Stream, StreamExt};
17use log::warn;
18use pin_project::pin_project;
19use std::marker::PhantomData;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use thiserror::Error;
24use vfs::directory::entry::DirectoryEntry;
25use vfs::directory::entry_container::Directory;
26use vfs::directory::helper::DirectlyMutable;
27use vfs::directory::immutable::Simple as PseudoDir;
28use vfs::execution_scope::ExecutionScope;
29use vfs::file::vmo::VmoFile;
30use vfs::name::Name;
31use vfs::path::Path;
32use vfs::remote::remote_dir;
33use vfs::service::endpoint;
34use vfs::ObjectRequest;
35use zx::MonotonicDuration;
36use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
37
38mod service;
39pub use service::{
40 FidlService, FidlServiceMember, FidlServiceServerConnector, Service, ServiceObj,
41 ServiceObjLocal, ServiceObjTrait,
42};
43mod until_stalled;
44pub use until_stalled::{Item, StallableServiceFs};
45
46/// A filesystem which connects clients to services.
47///
48/// This type implements the `Stream` trait and will yield the values
49/// returned from calling `Service::connect` on the services it hosts.
50///
51/// This can be used to, for example, yield streams of channels, request
52/// streams, futures to run, or any other value that should be processed
53/// as the result of a request.
54#[must_use]
55#[pin_project]
56pub struct ServiceFs<ServiceObjTy: ServiceObjTrait> {
57 // The execution scope for the backing VFS.
58 scope: ExecutionScope,
59
60 // The root directory.
61 dir: Arc<PseudoDir>,
62
63 // New connections are sent via an mpsc. The tuple is (index, channel) where index is the index
64 // into the `services` member.
65 new_connection_sender: mpsc::UnboundedSender<(usize, zx::Channel)>,
66 new_connection_receiver: mpsc::UnboundedReceiver<(usize, zx::Channel)>,
67
68 // A collection of objects that are able to handle new connections and convert them into a
69 // stream of ServiceObjTy::Output requests. There will be one for each service in the
70 // filesystem (irrespective of its place in the hierarchy).
71 services: Vec<ServiceObjTy>,
72
73 // A future that completes when the VFS no longer has any connections. These connections are
74 // distinct from connections that might be to services or remotes within this filesystem.
75 shutdown: BoxFuture<'static, ()>,
76
77 // The filesystem does not start servicing any requests until ServiceFs is first polled. This
78 // preserves behaviour of ServiceFs from when it didn't use the Rust VFS, and is relied upon in
79 // some cases. The queue is used until first polled. After that, `channel_queue` will be None
80 // and requests to service channels will be actioned immediately (potentially on different
81 // threads depending on the executor).
82 channel_queue: Option<Vec<fidl::endpoints::ServerEnd<fio::DirectoryMarker>>>,
83}
84
85impl<'a, Output: 'a> ServiceFs<ServiceObjLocal<'a, Output>> {
86 /// Create a new `ServiceFs` that is singlethreaded-only and does not
87 /// require services to implement `Send`.
88 pub fn new_local() -> Self {
89 Self::new_impl()
90 }
91}
92
93impl<'a, Output: 'a> ServiceFs<ServiceObj<'a, Output>> {
94 /// Create a new `ServiceFs` that is multithreaded-capable and requires
95 /// services to implement `Send`.
96 pub fn new() -> Self {
97 Self::new_impl()
98 }
99}
100
101/// A directory within a `ServiceFs`.
102///
103/// Services and subdirectories can be added to it.
104pub struct ServiceFsDir<'a, ServiceObjTy: ServiceObjTrait> {
105 fs: &'a mut ServiceFs<ServiceObjTy>,
106 dir: Arc<PseudoDir>,
107}
108
109/// A `Service` implementation that proxies requests
110/// to the outside environment.
111///
112/// Not intended for direct use. Use the `add_proxy_service`
113/// function instead.
114#[doc(hidden)]
115pub struct Proxy<P, O>(PhantomData<(P, fn() -> O)>);
116
117impl<P: DiscoverableProtocolMarker, O> Service for Proxy<P, O> {
118 type Output = O;
119 fn connect(&mut self, channel: zx::Channel) -> Option<O> {
120 if let Err(e) = connect_channel_to_protocol::<P>(channel) {
121 eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
122 }
123 None
124 }
125}
126
127/// A `Service` implementation that proxies requests to the given component.
128///
129/// Not intended for direct use. Use the `add_proxy_service_to` function instead.
130#[doc(hidden)]
131pub struct ProxyTo<P, O> {
132 directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
133 _phantom: PhantomData<(P, fn() -> O)>,
134}
135
136impl<P: DiscoverableProtocolMarker, O> Service for ProxyTo<P, O> {
137 type Output = O;
138 fn connect(&mut self, channel: zx::Channel) -> Option<O> {
139 if let Err(e) =
140 fdio::service_connect_at(self.directory_request.channel(), P::PROTOCOL_NAME, channel)
141 {
142 eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
143 }
144 None
145 }
146}
147
148// Not part of a trait so that clients won't have to import a trait
149// in order to call these functions.
150macro_rules! add_functions {
151 () => {
152 /// Adds a service connector to the directory.
153 ///
154 /// ```rust
155 /// let mut fs = ServiceFs::new_local();
156 /// fs
157 /// .add_service_connector(|server_end: ServerEnd<EchoMarker>| {
158 /// connect_channel_to_protocol::<EchoMarker>(
159 /// server_end.into_channel(),
160 /// )
161 /// })
162 /// .add_service_connector(|server_end: ServerEnd<CustomMarker>| {
163 /// connect_channel_to_protocol::<CustomMarker>(
164 /// server_end.into_channel(),
165 /// )
166 /// })
167 /// .take_and_serve_directory_handle()?;
168 /// ```
169 ///
170 /// The FIDL service will be hosted at the name provided by the
171 /// `[Discoverable]` annotation in the FIDL source.
172 pub fn add_service_connector<F, P>(&mut self, service: F) -> &mut Self
173 where
174 F: FnMut(ServerEnd<P>) -> ServiceObjTy::Output,
175 P: DiscoverableProtocolMarker,
176 FidlServiceServerConnector<F, P, ServiceObjTy::Output>: Into<ServiceObjTy>,
177 {
178 self.add_service_at(P::PROTOCOL_NAME, FidlServiceServerConnector::from(service))
179 }
180
181 /// Adds a service to the directory at the given path.
182 ///
183 /// The path must be a single component containing no `/` characters.
184 ///
185 /// Panics if any node has already been added at the given path.
186 pub fn add_service_at(
187 &mut self,
188 path: impl Into<String>,
189 service: impl Into<ServiceObjTy>,
190 ) -> &mut Self {
191 let index = self.fs().services.len();
192 self.fs().services.push(service.into());
193 let sender = self.fs().new_connection_sender.clone();
194 self.add_entry_at(
195 path,
196 endpoint(move |_, channel| {
197 // It's possible for this send to fail in the case where ServiceFs has been
198 // dropped. When that happens, ServiceFs will drop ExecutionScope which
199 // contains the RemoteHandle for this task which will then cause this task to be
200 // dropped but not necessarily immediately. This will only occur when ServiceFs
201 // has been dropped, so it's safe to ignore the error here.
202 let _ = sender.unbounded_send((index, channel.into()));
203 }),
204 )
205 }
206
207 /// Adds a FIDL service to the directory.
208 ///
209 /// `service` is a closure that accepts a `RequestStream`.
210 /// Each service being served must return an instance of the same type
211 /// (`ServiceObjTy::Output`). This is necessary in order to multiplex
212 /// multiple services over the same dispatcher code. The typical way
213 /// to do this is to create an `enum` with variants for each service
214 /// you want to serve.
215 ///
216 /// ```rust
217 /// enum MyServices {
218 /// EchoServer(EchoRequestStream),
219 /// CustomServer(CustomRequestStream),
220 /// // ...
221 /// }
222 /// ```
223 ///
224 /// The constructor for a variant of the `MyServices` enum can be passed
225 /// as the `service` parameter.
226 ///
227 /// ```rust
228 /// let mut fs = ServiceFs::new_local();
229 /// fs
230 /// .add_fidl_service(MyServices::EchoServer)
231 /// .add_fidl_service(MyServices::CustomServer)
232 /// .take_and_serve_directory_handle()?;
233 /// ```
234 ///
235 /// `ServiceFs` can now be treated as a `Stream` of type `MyServices`.
236 ///
237 /// ```rust
238 /// const MAX_CONCURRENT: usize = 10_000;
239 /// fs.for_each_concurrent(MAX_CONCURRENT, |request: MyServices| {
240 /// match request {
241 /// MyServices::EchoServer(request) => handle_echo(request),
242 /// MyServices::CustomServer(request) => handle_custom(request),
243 /// }
244 /// }).await;
245 /// ```
246 ///
247 /// The FIDL service will be hosted at the name provided by the
248 /// `[Discoverable]` annotation in the FIDL source.
249 pub fn add_fidl_service<F, RS>(&mut self, service: F) -> &mut Self
250 where
251 F: FnMut(RS) -> ServiceObjTy::Output,
252 RS: RequestStream,
253 RS::Protocol: DiscoverableProtocolMarker,
254 FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
255 {
256 self.add_fidl_service_at(RS::Protocol::PROTOCOL_NAME, service)
257 }
258
259 /// Adds a FIDL service to the directory at the given path.
260 ///
261 /// The path must be a single component containing no `/` characters.
262 ///
263 /// See [`add_fidl_service`](#method.add_fidl_service) for details.
264 pub fn add_fidl_service_at<F, RS>(
265 &mut self,
266 path: impl Into<String>,
267 service: F,
268 ) -> &mut Self
269 where
270 F: FnMut(RS) -> ServiceObjTy::Output,
271 RS: RequestStream,
272 RS::Protocol: DiscoverableProtocolMarker,
273 FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
274 {
275 self.add_service_at(path, FidlService::from(service))
276 }
277
278 /// Adds a named instance of a FIDL service to the directory.
279 ///
280 /// The FIDL service will be hosted at `[SERVICE_NAME]/[instance]/` where `SERVICE_NAME` is
281 /// constructed from the FIDL library path and the name of the FIDL service.
282 ///
283 /// The `instance` must be a single component containing no `/` characters.
284 ///
285 /// # Example
286 ///
287 /// For the following FIDL definition,
288 /// ```fidl
289 /// library lib.foo;
290 ///
291 /// service Bar {
292 /// ...
293 /// }
294 /// ```
295 ///
296 /// The `SERVICE_NAME` of FIDL Service `Bar` would be `lib.foo.Bar`.
297 pub fn add_fidl_service_instance<F, SR>(
298 &mut self,
299 instance: impl Into<String>,
300 service: F,
301 ) -> &mut Self
302 where
303 F: Fn(SR) -> ServiceObjTy::Output,
304 F: Clone,
305 SR: ServiceRequest,
306 FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
307 {
308 self.add_fidl_service_instance_at(SR::Service::SERVICE_NAME, instance, service)
309 }
310
311 /// Adds a named instance of a FIDL service to the directory at the given path.
312 ///
313 /// The FIDL service will be hosted at `[path]/[instance]/`.
314 ///
315 /// The `path` and `instance` must be single components containing no `/` characters.
316 pub fn add_fidl_service_instance_at<F, SR>(
317 &mut self,
318 path: impl Into<String>,
319 instance: impl Into<String>,
320 service: F,
321 ) -> &mut Self
322 where
323 F: Fn(SR) -> ServiceObjTy::Output,
324 F: Clone,
325 SR: ServiceRequest,
326 FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
327 {
328 // Create the service directory, with an instance subdirectory.
329 let mut dir = self.dir(path);
330 let mut dir = dir.dir(instance);
331
332 // Attach member protocols under the instance directory.
333 for member in SR::member_names() {
334 dir.add_service_at(*member, FidlServiceMember::new(service.clone(), member));
335 }
336 self
337 }
338
339 /// Adds a service that proxies requests to the current environment.
340 // NOTE: we'd like to be able to remove the type parameter `O` here,
341 // but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
342 // makes type checking angry.
343 pub fn add_proxy_service<P: DiscoverableProtocolMarker, O>(&mut self) -> &mut Self
344 where
345 ServiceObjTy: From<Proxy<P, O>>,
346 ServiceObjTy: ServiceObjTrait<Output = O>,
347 {
348 self.add_service_at(P::PROTOCOL_NAME, Proxy::<P, ServiceObjTy::Output>(PhantomData))
349 }
350
351 /// Adds a service that proxies requests to the given component.
352 // NOTE: we'd like to be able to remove the type parameter `O` here,
353 // but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
354 // makes type checking angry.
355 pub fn add_proxy_service_to<P: DiscoverableProtocolMarker, O>(
356 &mut self,
357 directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
358 ) -> &mut Self
359 where
360 ServiceObjTy: From<ProxyTo<P, O>>,
361 ServiceObjTy: ServiceObjTrait<Output = O>,
362 {
363 self.add_service_at(
364 P::PROTOCOL_NAME,
365 ProxyTo::<P, ServiceObjTy::Output> { directory_request, _phantom: PhantomData },
366 )
367 }
368
369 /// Adds a VMO file to the directory at the given path.
370 ///
371 /// The path must be a single component containing no `/` characters. The vmo should have
372 /// content size set as required.
373 ///
374 /// Panics if any node has already been added at the given path.
375 pub fn add_vmo_file_at(&mut self, path: impl Into<String>, vmo: zx::Vmo) -> &mut Self {
376 self.add_entry_at(
377 path,
378 VmoFile::new(
379 vmo, /*readable*/ true, /*writable*/ false, /*executable*/ false,
380 ),
381 )
382 }
383
384 /// Adds an entry to the directory at the given path.
385 ///
386 /// The path must be a single component.
387 /// The path must be a valid `fuchsia.io` [`Name`].
388 ///
389 /// Panics if any node has already been added at the given path.
390 pub fn add_entry_at(
391 &mut self,
392 path: impl Into<String>,
393 entry: Arc<dyn DirectoryEntry>,
394 ) -> &mut Self {
395 let path: String = path.into();
396 let name: Name = path.try_into().expect("Invalid path");
397 // This will fail if the name is invalid or already exists.
398 self.dir.add_entry_impl(name, entry, false).expect("Unable to add entry");
399 self
400 }
401
402 /// Returns a reference to the subdirectory at the given path,
403 /// creating one if none exists.
404 ///
405 /// The path must be a single component.
406 /// The path must be a valid `fuchsia.io` [`Name`].
407 ///
408 /// Panics if a service has already been added at the given path.
409 pub fn dir(&mut self, path: impl Into<String>) -> ServiceFsDir<'_, ServiceObjTy> {
410 let path: String = path.into();
411 let name: Name = path.try_into().expect("Invalid path");
412 let dir = Arc::downcast(self.dir.get_or_insert(name, new_simple_dir).into_any())
413 .unwrap_or_else(|_| panic!("Not a directory"));
414 ServiceFsDir { fs: self.fs(), dir }
415 }
416
417 /// Adds a new remote directory served over the given DirectoryProxy.
418 ///
419 /// The name must be a valid `fuchsia.io` [`Name`].
420 pub fn add_remote(
421 &mut self,
422 name: impl Into<String>,
423 proxy: fio::DirectoryProxy,
424 ) -> &mut Self {
425 let name: String = name.into();
426 let name: Name = name.try_into().expect("Invalid path");
427 self.dir.add_entry_impl(name, remote_dir(proxy), false).expect("Unable to add entry");
428 self
429 }
430 };
431}
432
433impl<ServiceObjTy: ServiceObjTrait> ServiceFsDir<'_, ServiceObjTy> {
434 fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
435 self.fs
436 }
437
438 add_functions!();
439}
440
441impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
442 fn new_impl() -> Self {
443 let (new_connection_sender, new_connection_receiver) = mpsc::unbounded();
444 let scope = ExecutionScope::new();
445 let dir = new_simple_dir();
446 Self {
447 scope: scope.clone(),
448 dir,
449 new_connection_sender,
450 new_connection_receiver,
451 services: Vec::new(),
452 shutdown: async move { scope.wait().await }.boxed(),
453 channel_queue: Some(Vec::new()),
454 }
455 }
456
457 fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
458 self
459 }
460
461 /// Get a reference to the root directory as a `ServiceFsDir`.
462 ///
463 /// This can be useful when writing code which hosts some set of services on
464 /// a directory and wants to be agnostic to whether that directory
465 /// is the root `ServiceFs` or a subdirectory.
466 ///
467 /// Such a function can take an `&mut ServiceFsDir<...>` as an argument,
468 /// allowing callers to provide either a subdirectory or `fs.root_dir()`.
469 pub fn root_dir(&mut self) -> ServiceFsDir<'_, ServiceObjTy> {
470 let dir = self.dir.clone();
471 ServiceFsDir { fs: self, dir }
472 }
473
474 add_functions!();
475
476 /// When a connection is first made to the `ServiceFs` in the absence of a parent connection,
477 /// it will be granted these rights.
478 const fn base_connection_flags() -> fio::Flags {
479 return fio::Flags::PROTOCOL_DIRECTORY
480 .union(fio::PERM_READABLE)
481 .union(fio::PERM_WRITABLE)
482 .union(fio::PERM_EXECUTABLE);
483 }
484
485 fn serve_connection_impl(&self, chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>) {
486 self.dir
487 .clone()
488 .open3(
489 self.scope.clone(),
490 Path::dot(),
491 Self::base_connection_flags(),
492 &mut ObjectRequest::new(
493 Self::base_connection_flags(),
494 &Default::default(),
495 chan.into_channel(),
496 ),
497 )
498 .expect("failed to serve root ServiceFs connection");
499 }
500
501 /// Creates a protocol connector that can access the capabilities exposed by this ServiceFs.
502 pub fn create_protocol_connector<O>(&mut self) -> Result<ProtocolConnector, Error>
503 where
504 ServiceObjTy: ServiceObjTrait<Output = O>,
505 {
506 let (directory_request, directory_server_end) = fidl::endpoints::create_endpoints();
507 self.serve_connection(directory_server_end)?;
508
509 Ok(ProtocolConnector { directory_request })
510 }
511}
512
513fn new_simple_dir() -> Arc<PseudoDir> {
514 let dir = PseudoDir::new();
515 dir.clone().set_not_found_handler(Box::new(move |path| {
516 warn!(
517 "ServiceFs received request to `{}` but has not been configured to serve this path.",
518 path
519 );
520 }));
521 dir
522}
523
524/// `ProtocolConnector` allows connecting to capabilities exposed by ServiceFs
525pub struct ProtocolConnector {
526 directory_request: fidl::endpoints::ClientEnd<fio::DirectoryMarker>,
527}
528
529impl ProtocolConnector {
530 /// Connect to a protocol provided by this environment.
531 #[inline]
532 pub fn connect_to_service<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
533 self.connect_to_protocol::<P>()
534 }
535
536 /// Connect to a protocol provided by this environment.
537 #[inline]
538 pub fn connect_to_protocol<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
539 let (client_channel, server_channel) = zx::Channel::create();
540 self.pass_to_protocol::<P>(server_channel)?;
541 Ok(P::Proxy::from_channel(fasync::Channel::from_channel(client_channel)))
542 }
543
544 /// Connect to a protocol by passing a channel for the server.
545 #[inline]
546 pub fn pass_to_protocol<P: DiscoverableProtocolMarker>(
547 &self,
548 server_channel: zx::Channel,
549 ) -> Result<(), Error> {
550 self.pass_to_named_protocol(P::PROTOCOL_NAME, server_channel)
551 }
552
553 /// Connect to a protocol by name.
554 #[inline]
555 pub fn pass_to_named_protocol(
556 &self,
557 protocol_name: &str,
558 server_channel: zx::Channel,
559 ) -> Result<(), Error> {
560 fdio::service_connect_at(self.directory_request.channel(), protocol_name, server_channel)?;
561 Ok(())
562 }
563}
564
565/// An error indicating the startup handle on which the FIDL server
566/// attempted to start was missing.
567#[derive(Debug, Error)]
568#[error("The startup handle on which the FIDL server attempted to start was missing.")]
569pub struct MissingStartupHandle;
570
571impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
572 /// Removes the `DirectoryRequest` startup handle for the current
573 /// component and adds connects it to this `ServiceFs` as a client.
574 ///
575 /// Multiple calls to this function from the same component will
576 /// result in `Err(MissingStartupHandle)`.
577 pub fn take_and_serve_directory_handle(&mut self) -> Result<&mut Self, Error> {
578 let startup_handle = fuchsia_runtime::take_startup_handle(
579 fuchsia_runtime::HandleType::DirectoryRequest.into(),
580 )
581 .ok_or(MissingStartupHandle)?;
582
583 self.serve_connection(fidl::endpoints::ServerEnd::new(zx::Channel::from(startup_handle)))
584 }
585
586 /// Add a channel to serve this `ServiceFs` filesystem on. The `ServiceFs`
587 /// will continue to be provided over previously added channels, including
588 /// the one added if `take_and_serve_directory_handle` was called.
589 pub fn serve_connection(
590 &mut self,
591 chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>,
592 ) -> Result<&mut Self, Error> {
593 if let Some(channels) = &mut self.channel_queue {
594 channels.push(chan);
595 } else {
596 self.serve_connection_impl(chan);
597 }
598 Ok(self)
599 }
600
601 /// TODO(https://fxbug.dev/326626515): this is an experimental method to run a FIDL
602 /// directory connection until stalled, with the purpose to cleanly stop a component.
603 /// We'll expect to revisit how this works to generalize to all connections later.
604 /// Try not to use this function for other purposes.
605 ///
606 /// Normally the [`ServiceFs`] stream will block until all connections are closed.
607 /// In order to escrow the outgoing directory server endpoint, you may use this
608 /// function to get a [`StallableServiceFs`] that detects when no new requests
609 /// hit the outgoing directory for `debounce_interval`, and all hosted protocols
610 /// and other VFS connections to finish, then yield back the outgoing directory handle.
611 ///
612 /// The [`ServiceFs`] stream yields [`ServiceObjTy::Output`], which could be an enum
613 /// of FIDL connection requests in a typical component. By contrast, [`StallableServiceFs`]
614 /// yields an enum of either the request, or the unbound outgoing directory endpoint,
615 /// allowing you to escrow it back to `component_manager` before exiting the component.
616 pub fn until_stalled(
617 self,
618 debounce_interval: MonotonicDuration,
619 ) -> StallableServiceFs<ServiceObjTy> {
620 StallableServiceFs::<ServiceObjTy>::new(self, debounce_interval)
621 }
622}
623
624impl<ServiceObjTy: ServiceObjTrait> Stream for ServiceFs<ServiceObjTy> {
625 type Item = ServiceObjTy::Output;
626
627 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
628 if let Some(channels) = self.channel_queue.take() {
629 for chan in channels {
630 self.serve_connection_impl(chan);
631 }
632 }
633 while let Poll::Ready(Some((index, channel))) =
634 self.new_connection_receiver.poll_next_unpin(cx)
635 {
636 if let Some(stream) = self.services[index].service().connect(channel) {
637 return Poll::Ready(Some(stream));
638 }
639 }
640 self.shutdown.poll_unpin(cx).map(|_| None)
641 }
642}