fuchsia_component_server/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Tools for providing Fuchsia services.
6
7#![deny(missing_docs)]
8
9use anyhow::Error;
10use fidl::endpoints::{
11    DiscoverableProtocolMarker, Proxy as _, RequestStream, ServerEnd, ServiceMarker, ServiceRequest,
12};
13use fuchsia_component_client::connect_channel_to_protocol;
14use futures::channel::mpsc;
15use futures::future::BoxFuture;
16use futures::{FutureExt, Stream, StreamExt};
17use log::warn;
18use pin_project::pin_project;
19use std::marker::PhantomData;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use thiserror::Error;
24use vfs::directory::entry::DirectoryEntry;
25use vfs::directory::helper::DirectlyMutable;
26use vfs::directory::immutable::Simple as PseudoDir;
27use vfs::execution_scope::ExecutionScope;
28use vfs::file::vmo::VmoFile;
29use vfs::name::Name;
30use vfs::remote::remote_dir;
31use vfs::service::endpoint;
32use zx::MonotonicDuration;
33use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
34
35mod service;
36pub use service::{
37    FidlService, FidlServiceMember, FidlServiceServerConnector, Service, ServiceObj,
38    ServiceObjLocal, ServiceObjTrait,
39};
40mod until_stalled;
41pub use until_stalled::{Item, StallableServiceFs};
42
43/// A filesystem which connects clients to services.
44///
45/// This type implements the `Stream` trait and will yield the values
46/// returned from calling `Service::connect` on the services it hosts.
47///
48/// This can be used to, for example, yield streams of channels, request
49/// streams, futures to run, or any other value that should be processed
50/// as the result of a request.
51#[must_use]
52#[pin_project]
53pub struct ServiceFs<ServiceObjTy: ServiceObjTrait> {
54    // The execution scope for the backing VFS.
55    scope: ExecutionScope,
56
57    // The root directory.
58    dir: Arc<PseudoDir>,
59
60    // New connections are sent via an mpsc. The tuple is (index, channel) where index is the index
61    // into the `services` member.
62    new_connection_sender: mpsc::UnboundedSender<(usize, zx::Channel)>,
63    new_connection_receiver: mpsc::UnboundedReceiver<(usize, zx::Channel)>,
64
65    // A collection of objects that are able to handle new connections and convert them into a
66    // stream of ServiceObjTy::Output requests.  There will be one for each service in the
67    // filesystem (irrespective of its place in the hierarchy).
68    services: Vec<ServiceObjTy>,
69
70    // A future that completes when the VFS no longer has any connections.  These connections are
71    // distinct from connections that might be to services or remotes within this filesystem.
72    shutdown: BoxFuture<'static, ()>,
73
74    // The filesystem does not start servicing any requests until ServiceFs is first polled.  This
75    // preserves behaviour of ServiceFs from when it didn't use the Rust VFS, and is relied upon in
76    // some cases.  The queue is used until first polled.  After that, `channel_queue` will be None
77    // and requests to service channels will be actioned immediately (potentially on different
78    // threads depending on the executor).
79    channel_queue: Option<Vec<fidl::endpoints::ServerEnd<fio::DirectoryMarker>>>,
80}
81
82impl<'a, Output: 'a> ServiceFs<ServiceObjLocal<'a, Output>> {
83    /// Create a new `ServiceFs` that is singlethreaded-only and does not
84    /// require services to implement `Send`.
85    pub fn new_local() -> Self {
86        Self::new_impl()
87    }
88}
89
90impl<'a, Output: 'a> ServiceFs<ServiceObj<'a, Output>> {
91    /// Create a new `ServiceFs` that is multithreaded-capable and requires
92    /// services to implement `Send`.
93    pub fn new() -> Self {
94        Self::new_impl()
95    }
96}
97
98/// A directory within a `ServiceFs`.
99///
100/// Services and subdirectories can be added to it.
101pub struct ServiceFsDir<'a, ServiceObjTy: ServiceObjTrait> {
102    fs: &'a mut ServiceFs<ServiceObjTy>,
103    dir: Arc<PseudoDir>,
104}
105
106/// A `Service` implementation that proxies requests
107/// to the outside environment.
108///
109/// Not intended for direct use. Use the `add_proxy_service`
110/// function instead.
111#[doc(hidden)]
112pub struct Proxy<P, O>(PhantomData<(P, fn() -> O)>);
113
114impl<P: DiscoverableProtocolMarker, O> Service for Proxy<P, O> {
115    type Output = O;
116    fn connect(&mut self, channel: zx::Channel) -> Option<O> {
117        if let Err(e) = connect_channel_to_protocol::<P>(channel) {
118            eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
119        }
120        None
121    }
122}
123
124/// A `Service` implementation that proxies requests to the given component.
125///
126/// Not intended for direct use. Use the `add_proxy_service_to` function instead.
127#[doc(hidden)]
128pub struct ProxyTo<P, O> {
129    directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
130    _phantom: PhantomData<(P, fn() -> O)>,
131}
132
133impl<P: DiscoverableProtocolMarker, O> Service for ProxyTo<P, O> {
134    type Output = O;
135    fn connect(&mut self, channel: zx::Channel) -> Option<O> {
136        if let Err(e) =
137            fdio::service_connect_at(self.directory_request.channel(), P::PROTOCOL_NAME, channel)
138        {
139            eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
140        }
141        None
142    }
143}
144
145// Not part of a trait so that clients won't have to import a trait
146// in order to call these functions.
147macro_rules! add_functions {
148    () => {
149        /// Adds a service connector to the directory.
150        ///
151        /// ```rust
152        /// let mut fs = ServiceFs::new_local();
153        /// fs
154        ///     .add_service_connector(|server_end: ServerEnd<EchoMarker>| {
155        ///         connect_channel_to_protocol::<EchoMarker>(
156        ///             server_end.into_channel(),
157        ///         )
158        ///     })
159        ///     .add_service_connector(|server_end: ServerEnd<CustomMarker>| {
160        ///         connect_channel_to_protocol::<CustomMarker>(
161        ///             server_end.into_channel(),
162        ///         )
163        ///     })
164        ///     .take_and_serve_directory_handle()?;
165        /// ```
166        ///
167        /// The FIDL service will be hosted at the name provided by the
168        /// `[Discoverable]` annotation in the FIDL source.
169        pub fn add_service_connector<F, P>(&mut self, service: F) -> &mut Self
170        where
171            F: FnMut(ServerEnd<P>) -> ServiceObjTy::Output,
172            P: DiscoverableProtocolMarker,
173            FidlServiceServerConnector<F, P, ServiceObjTy::Output>: Into<ServiceObjTy>,
174        {
175            self.add_service_at(P::PROTOCOL_NAME, FidlServiceServerConnector::from(service))
176        }
177
178        /// Adds a service to the directory at the given path.
179        ///
180        /// The path must be a single component containing no `/` characters.
181        ///
182        /// Panics if any node has already been added at the given path.
183        pub fn add_service_at(
184            &mut self,
185            path: impl Into<String>,
186            service: impl Into<ServiceObjTy>,
187        ) -> &mut Self {
188            let index = self.fs().services.len();
189            self.fs().services.push(service.into());
190            let sender = self.fs().new_connection_sender.clone();
191            self.add_entry_at(
192                path,
193                endpoint(move |_, channel| {
194                    // It's possible for this send to fail in the case where ServiceFs has been
195                    // dropped.  When that happens, ServiceFs will drop ExecutionScope which
196                    // contains the RemoteHandle for this task which will then cause this task to be
197                    // dropped but not necessarily immediately.  This will only occur when ServiceFs
198                    // has been dropped, so it's safe to ignore the error here.
199                    let _ = sender.unbounded_send((index, channel.into()));
200                }),
201            )
202        }
203
204        /// Adds a FIDL service to the directory.
205        ///
206        /// `service` is a closure that accepts a `RequestStream`.
207        /// Each service being served must return an instance of the same type
208        /// (`ServiceObjTy::Output`). This is necessary in order to multiplex
209        /// multiple services over the same dispatcher code. The typical way
210        /// to do this is to create an `enum` with variants for each service
211        /// you want to serve.
212        ///
213        /// ```rust
214        /// enum MyServices {
215        ///     EchoServer(EchoRequestStream),
216        ///     CustomServer(CustomRequestStream),
217        ///     // ...
218        /// }
219        /// ```
220        ///
221        /// The constructor for a variant of the `MyServices` enum can be passed
222        /// as the `service` parameter.
223        ///
224        /// ```rust
225        /// let mut fs = ServiceFs::new_local();
226        /// fs
227        ///     .add_fidl_service(MyServices::EchoServer)
228        ///     .add_fidl_service(MyServices::CustomServer)
229        ///     .take_and_serve_directory_handle()?;
230        /// ```
231        ///
232        /// `ServiceFs` can now be treated as a `Stream` of type `MyServices`.
233        ///
234        /// ```rust
235        /// const MAX_CONCURRENT: usize = 10_000;
236        /// fs.for_each_concurrent(MAX_CONCURRENT, |request: MyServices| {
237        ///     match request {
238        ///         MyServices::EchoServer(request) => handle_echo(request),
239        ///         MyServices::CustomServer(request) => handle_custom(request),
240        ///     }
241        /// }).await;
242        /// ```
243        ///
244        /// The FIDL service will be hosted at the name provided by the
245        /// `[Discoverable]` annotation in the FIDL source.
246        pub fn add_fidl_service<F, RS>(&mut self, service: F) -> &mut Self
247        where
248            F: FnMut(RS) -> ServiceObjTy::Output,
249            RS: RequestStream,
250            RS::Protocol: DiscoverableProtocolMarker,
251            FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
252        {
253            self.add_fidl_service_at(RS::Protocol::PROTOCOL_NAME, service)
254        }
255
256        /// Adds a FIDL service to the directory at the given path.
257        ///
258        /// The path must be a single component containing no `/` characters.
259        ///
260        /// See [`add_fidl_service`](#method.add_fidl_service) for details.
261        pub fn add_fidl_service_at<F, RS>(
262            &mut self,
263            path: impl Into<String>,
264            service: F,
265        ) -> &mut Self
266        where
267            F: FnMut(RS) -> ServiceObjTy::Output,
268            RS: RequestStream,
269            RS::Protocol: DiscoverableProtocolMarker,
270            FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
271        {
272            self.add_service_at(path, FidlService::from(service))
273        }
274
275        /// Adds a named instance of a FIDL service to the directory.
276        ///
277        /// The FIDL service will be hosted at `[SERVICE_NAME]/[instance]/` where `SERVICE_NAME` is
278        /// constructed from the FIDL library path and the name of the FIDL service.
279        ///
280        /// The `instance` must be a single component containing no `/` characters.
281        ///
282        /// # Example
283        ///
284        /// For the following FIDL definition,
285        /// ```fidl
286        /// library lib.foo;
287        ///
288        /// service Bar {
289        ///   ...
290        /// }
291        /// ```
292        ///
293        /// The `SERVICE_NAME` of FIDL Service `Bar` would be `lib.foo.Bar`.
294        pub fn add_fidl_service_instance<F, SR>(
295            &mut self,
296            instance: impl Into<String>,
297            service: F,
298        ) -> &mut Self
299        where
300            F: Fn(SR) -> ServiceObjTy::Output,
301            F: Clone,
302            SR: ServiceRequest,
303            FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
304        {
305            self.add_fidl_service_instance_at(SR::Service::SERVICE_NAME, instance, service)
306        }
307
308        /// Adds a named instance of a FIDL service to the directory at the given path.
309        ///
310        /// The FIDL service will be hosted at `[path]/[instance]/`.
311        ///
312        /// The `path` and `instance` must be single components containing no `/` characters.
313        pub fn add_fidl_service_instance_at<F, SR>(
314            &mut self,
315            path: impl Into<String>,
316            instance: impl Into<String>,
317            service: F,
318        ) -> &mut Self
319        where
320            F: Fn(SR) -> ServiceObjTy::Output,
321            F: Clone,
322            SR: ServiceRequest,
323            FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
324        {
325            // Create the service directory, with an instance subdirectory.
326            let mut dir = self.dir(path);
327            let mut dir = dir.dir(instance);
328
329            // Attach member protocols under the instance directory.
330            for member in SR::member_names() {
331                dir.add_service_at(*member, FidlServiceMember::new(service.clone(), member));
332            }
333            self
334        }
335
336        /// Adds a service that proxies requests to the current environment.
337        // NOTE: we'd like to be able to remove the type parameter `O` here,
338        //  but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
339        //  makes type checking angry.
340        pub fn add_proxy_service<P: DiscoverableProtocolMarker, O>(&mut self) -> &mut Self
341        where
342            ServiceObjTy: From<Proxy<P, O>>,
343            ServiceObjTy: ServiceObjTrait<Output = O>,
344        {
345            self.add_service_at(P::PROTOCOL_NAME, Proxy::<P, ServiceObjTy::Output>(PhantomData))
346        }
347
348        /// Adds a service that proxies requests to the given component.
349        // NOTE: we'd like to be able to remove the type parameter `O` here,
350        //  but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
351        //  makes type checking angry.
352        pub fn add_proxy_service_to<P: DiscoverableProtocolMarker, O>(
353            &mut self,
354            directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
355        ) -> &mut Self
356        where
357            ServiceObjTy: From<ProxyTo<P, O>>,
358            ServiceObjTy: ServiceObjTrait<Output = O>,
359        {
360            self.add_service_at(
361                P::PROTOCOL_NAME,
362                ProxyTo::<P, ServiceObjTy::Output> { directory_request, _phantom: PhantomData },
363            )
364        }
365
366        /// Adds a VMO file to the directory at the given path.
367        ///
368        /// The path must be a single component containing no `/` characters. The vmo should have
369        /// content size set as required.
370        ///
371        /// Panics if any node has already been added at the given path.
372        pub fn add_vmo_file_at(&mut self, path: impl Into<String>, vmo: zx::Vmo) -> &mut Self {
373            self.add_entry_at(path, VmoFile::new(vmo))
374        }
375
376        /// Adds an entry to the directory at the given path.
377        ///
378        /// The path must be a single component.
379        /// The path must be a valid `fuchsia.io` [`Name`].
380        ///
381        /// Panics if any node has already been added at the given path.
382        pub fn add_entry_at(
383            &mut self,
384            path: impl Into<String>,
385            entry: Arc<dyn DirectoryEntry>,
386        ) -> &mut Self {
387            let path: String = path.into();
388            let name: Name = path.try_into().expect("Invalid path");
389            // This will fail if the name is invalid or already exists.
390            self.dir.add_entry_impl(name, entry, false).expect("Unable to add entry");
391            self
392        }
393
394        /// Returns a reference to the subdirectory at the given path,
395        /// creating one if none exists.
396        ///
397        /// The path must be a single component.
398        /// The path must be a valid `fuchsia.io` [`Name`].
399        ///
400        /// Panics if a service has already been added at the given path.
401        pub fn dir(&mut self, path: impl Into<String>) -> ServiceFsDir<'_, ServiceObjTy> {
402            let path: String = path.into();
403            let name: Name = path.try_into().expect("Invalid path");
404            let dir = Arc::downcast(self.dir.get_or_insert(name, new_simple_dir).into_any())
405                .unwrap_or_else(|_| panic!("Not a directory"));
406            ServiceFsDir { fs: self.fs(), dir }
407        }
408
409        /// Adds a new remote directory served over the given DirectoryProxy.
410        ///
411        /// The name must be a valid `fuchsia.io` [`Name`].
412        pub fn add_remote(
413            &mut self,
414            name: impl Into<String>,
415            proxy: fio::DirectoryProxy,
416        ) -> &mut Self {
417            let name: String = name.into();
418            let name: Name = name.try_into().expect("Invalid path");
419            self.dir.add_entry_impl(name, remote_dir(proxy), false).expect("Unable to add entry");
420            self
421        }
422    };
423}
424
425impl<ServiceObjTy: ServiceObjTrait> ServiceFsDir<'_, ServiceObjTy> {
426    fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
427        self.fs
428    }
429
430    add_functions!();
431}
432
433impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
434    fn new_impl() -> Self {
435        let (new_connection_sender, new_connection_receiver) = mpsc::unbounded();
436        let scope = ExecutionScope::new();
437        let dir = new_simple_dir();
438        Self {
439            scope: scope.clone(),
440            dir,
441            new_connection_sender,
442            new_connection_receiver,
443            services: Vec::new(),
444            shutdown: async move { scope.wait().await }.boxed(),
445            channel_queue: Some(Vec::new()),
446        }
447    }
448
449    fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
450        self
451    }
452
453    /// Get a reference to the root directory as a `ServiceFsDir`.
454    ///
455    /// This can be useful when writing code which hosts some set of services on
456    /// a directory and wants to be agnostic to whether that directory
457    /// is the root `ServiceFs` or a subdirectory.
458    ///
459    /// Such a function can take an `&mut ServiceFsDir<...>` as an argument,
460    /// allowing callers to provide either a subdirectory or `fs.root_dir()`.
461    pub fn root_dir(&mut self) -> ServiceFsDir<'_, ServiceObjTy> {
462        let dir = self.dir.clone();
463        ServiceFsDir { fs: self, dir }
464    }
465
466    add_functions!();
467
468    /// When a connection is first made to the `ServiceFs` in the absence of a parent connection,
469    /// it will be granted these rights.
470    const fn base_connection_flags() -> fio::Flags {
471        return fio::Flags::PROTOCOL_DIRECTORY
472            .union(fio::PERM_READABLE)
473            .union(fio::PERM_WRITABLE)
474            .union(fio::PERM_EXECUTABLE);
475    }
476
477    fn serve_connection_impl(&self, chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>) {
478        vfs::directory::serve_on(
479            self.dir.clone(),
480            Self::base_connection_flags(),
481            self.scope.clone(),
482            chan,
483        );
484    }
485
486    /// Creates a protocol connector that can access the capabilities exposed by this ServiceFs.
487    pub fn create_protocol_connector<O>(&mut self) -> Result<ProtocolConnector, Error>
488    where
489        ServiceObjTy: ServiceObjTrait<Output = O>,
490    {
491        let (directory_request, directory_server_end) = fidl::endpoints::create_endpoints();
492        self.serve_connection(directory_server_end)?;
493
494        Ok(ProtocolConnector { directory_request })
495    }
496}
497
498fn new_simple_dir() -> Arc<PseudoDir> {
499    let dir = PseudoDir::new();
500    dir.clone().set_not_found_handler(Box::new(move |path| {
501        warn!(
502            "ServiceFs received request to `{}` but has not been configured to serve this path.",
503            path
504        );
505    }));
506    dir
507}
508
509/// `ProtocolConnector` allows connecting to capabilities exposed by ServiceFs
510pub struct ProtocolConnector {
511    directory_request: fidl::endpoints::ClientEnd<fio::DirectoryMarker>,
512}
513
514impl ProtocolConnector {
515    /// Connect to a protocol provided by this environment.
516    #[inline]
517    pub fn connect_to_service<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
518        self.connect_to_protocol::<P>()
519    }
520
521    /// Connect to a protocol provided by this environment.
522    #[inline]
523    pub fn connect_to_protocol<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
524        let (client_channel, server_channel) = zx::Channel::create();
525        self.pass_to_protocol::<P>(server_channel)?;
526        Ok(P::Proxy::from_channel(fasync::Channel::from_channel(client_channel)))
527    }
528
529    /// Connect to a protocol by passing a channel for the server.
530    #[inline]
531    pub fn pass_to_protocol<P: DiscoverableProtocolMarker>(
532        &self,
533        server_channel: zx::Channel,
534    ) -> Result<(), Error> {
535        self.pass_to_named_protocol(P::PROTOCOL_NAME, server_channel)
536    }
537
538    /// Connect to a protocol by name.
539    #[inline]
540    pub fn pass_to_named_protocol(
541        &self,
542        protocol_name: &str,
543        server_channel: zx::Channel,
544    ) -> Result<(), Error> {
545        fdio::service_connect_at(self.directory_request.channel(), protocol_name, server_channel)?;
546        Ok(())
547    }
548}
549
550/// An error indicating the startup handle on which the FIDL server
551/// attempted to start was missing.
552#[derive(Debug, Error)]
553#[error("The startup handle on which the FIDL server attempted to start was missing.")]
554pub struct MissingStartupHandle;
555
556impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
557    /// Removes the `DirectoryRequest` startup handle for the current
558    /// component and adds connects it to this `ServiceFs` as a client.
559    ///
560    /// Multiple calls to this function from the same component will
561    /// result in `Err(MissingStartupHandle)`.
562    pub fn take_and_serve_directory_handle(&mut self) -> Result<&mut Self, Error> {
563        let startup_handle = fuchsia_runtime::take_startup_handle(
564            fuchsia_runtime::HandleType::DirectoryRequest.into(),
565        )
566        .ok_or(MissingStartupHandle)?;
567
568        self.serve_connection(fidl::endpoints::ServerEnd::new(zx::Channel::from(startup_handle)))
569    }
570
571    /// Add a channel to serve this `ServiceFs` filesystem on. The `ServiceFs`
572    /// will continue to be provided over previously added channels, including
573    /// the one added if `take_and_serve_directory_handle` was called.
574    pub fn serve_connection(
575        &mut self,
576        chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>,
577    ) -> Result<&mut Self, Error> {
578        if let Some(channels) = &mut self.channel_queue {
579            channels.push(chan);
580        } else {
581            self.serve_connection_impl(chan);
582        }
583        Ok(self)
584    }
585
586    /// TODO(https://fxbug.dev/326626515): this is an experimental method to run a FIDL
587    /// directory connection until stalled, with the purpose to cleanly stop a component.
588    /// We'll expect to revisit how this works to generalize to all connections later.
589    /// Try not to use this function for other purposes.
590    ///
591    /// Normally the [`ServiceFs`] stream will block until all connections are closed.
592    /// In order to escrow the outgoing directory server endpoint, you may use this
593    /// function to get a [`StallableServiceFs`] that detects when no new requests
594    /// hit the outgoing directory for `debounce_interval`, and all hosted protocols
595    /// and other VFS connections to finish, then yield back the outgoing directory handle.
596    ///
597    /// The [`ServiceFs`] stream yields [`ServiceObjTy::Output`], which could be an enum
598    /// of FIDL connection requests in a typical component. By contrast, [`StallableServiceFs`]
599    /// yields an enum of either the request, or the unbound outgoing directory endpoint,
600    /// allowing you to escrow it back to `component_manager` before exiting the component.
601    pub fn until_stalled(
602        self,
603        debounce_interval: MonotonicDuration,
604    ) -> StallableServiceFs<ServiceObjTy> {
605        StallableServiceFs::<ServiceObjTy>::new(self, debounce_interval)
606    }
607}
608
609impl<ServiceObjTy: ServiceObjTrait> Stream for ServiceFs<ServiceObjTy> {
610    type Item = ServiceObjTy::Output;
611
612    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
613        // NOTE: Normally, it isn't safe to poll a stream after it returns None, but we support this
614        // and StallabkeServiceFs depends on this.
615        if let Some(channels) = self.channel_queue.take() {
616            for chan in channels {
617                self.serve_connection_impl(chan);
618            }
619        }
620        while let Poll::Ready(Some((index, channel))) =
621            self.new_connection_receiver.poll_next_unpin(cx)
622        {
623            if let Some(stream) = self.services[index].service().connect(channel) {
624                return Poll::Ready(Some(stream));
625            }
626        }
627        self.shutdown.poll_unpin(cx).map(|_| None)
628    }
629}