fuchsia_component_server/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Tools for providing Fuchsia services.
6
7#![deny(missing_docs)]
8
9use anyhow::Error;
10use fidl::endpoints::{
11    DiscoverableProtocolMarker, Proxy as _, RequestStream, ServerEnd, ServiceMarker, ServiceRequest,
12};
13use fuchsia_component_client::connect_channel_to_protocol;
14use futures::channel::mpsc;
15use futures::future::BoxFuture;
16use futures::{FutureExt, Stream, StreamExt};
17use log::warn;
18use pin_project::pin_project;
19use std::marker::PhantomData;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use thiserror::Error;
24use vfs::directory::entry::DirectoryEntry;
25use vfs::directory::entry_container::Directory;
26use vfs::directory::helper::DirectlyMutable;
27use vfs::directory::immutable::Simple as PseudoDir;
28use vfs::execution_scope::ExecutionScope;
29use vfs::file::vmo::VmoFile;
30use vfs::name::Name;
31use vfs::path::Path;
32use vfs::remote::remote_dir;
33use vfs::service::endpoint;
34use vfs::ObjectRequest;
35use zx::MonotonicDuration;
36use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
37
38mod service;
39pub use service::{
40    FidlService, FidlServiceMember, FidlServiceServerConnector, Service, ServiceObj,
41    ServiceObjLocal, ServiceObjTrait,
42};
43mod until_stalled;
44pub use until_stalled::{Item, StallableServiceFs};
45
46/// A filesystem which connects clients to services.
47///
48/// This type implements the `Stream` trait and will yield the values
49/// returned from calling `Service::connect` on the services it hosts.
50///
51/// This can be used to, for example, yield streams of channels, request
52/// streams, futures to run, or any other value that should be processed
53/// as the result of a request.
54#[must_use]
55#[pin_project]
56pub struct ServiceFs<ServiceObjTy: ServiceObjTrait> {
57    // The execution scope for the backing VFS.
58    scope: ExecutionScope,
59
60    // The root directory.
61    dir: Arc<PseudoDir>,
62
63    // New connections are sent via an mpsc. The tuple is (index, channel) where index is the index
64    // into the `services` member.
65    new_connection_sender: mpsc::UnboundedSender<(usize, zx::Channel)>,
66    new_connection_receiver: mpsc::UnboundedReceiver<(usize, zx::Channel)>,
67
68    // A collection of objects that are able to handle new connections and convert them into a
69    // stream of ServiceObjTy::Output requests.  There will be one for each service in the
70    // filesystem (irrespective of its place in the hierarchy).
71    services: Vec<ServiceObjTy>,
72
73    // A future that completes when the VFS no longer has any connections.  These connections are
74    // distinct from connections that might be to services or remotes within this filesystem.
75    shutdown: BoxFuture<'static, ()>,
76
77    // The filesystem does not start servicing any requests until ServiceFs is first polled.  This
78    // preserves behaviour of ServiceFs from when it didn't use the Rust VFS, and is relied upon in
79    // some cases.  The queue is used until first polled.  After that, `channel_queue` will be None
80    // and requests to service channels will be actioned immediately (potentially on different
81    // threads depending on the executor).
82    channel_queue: Option<Vec<fidl::endpoints::ServerEnd<fio::DirectoryMarker>>>,
83}
84
85impl<'a, Output: 'a> ServiceFs<ServiceObjLocal<'a, Output>> {
86    /// Create a new `ServiceFs` that is singlethreaded-only and does not
87    /// require services to implement `Send`.
88    pub fn new_local() -> Self {
89        Self::new_impl()
90    }
91}
92
93impl<'a, Output: 'a> ServiceFs<ServiceObj<'a, Output>> {
94    /// Create a new `ServiceFs` that is multithreaded-capable and requires
95    /// services to implement `Send`.
96    pub fn new() -> Self {
97        Self::new_impl()
98    }
99}
100
101/// A directory within a `ServiceFs`.
102///
103/// Services and subdirectories can be added to it.
104pub struct ServiceFsDir<'a, ServiceObjTy: ServiceObjTrait> {
105    fs: &'a mut ServiceFs<ServiceObjTy>,
106    dir: Arc<PseudoDir>,
107}
108
109/// A `Service` implementation that proxies requests
110/// to the outside environment.
111///
112/// Not intended for direct use. Use the `add_proxy_service`
113/// function instead.
114#[doc(hidden)]
115pub struct Proxy<P, O>(PhantomData<(P, fn() -> O)>);
116
117impl<P: DiscoverableProtocolMarker, O> Service for Proxy<P, O> {
118    type Output = O;
119    fn connect(&mut self, channel: zx::Channel) -> Option<O> {
120        if let Err(e) = connect_channel_to_protocol::<P>(channel) {
121            eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
122        }
123        None
124    }
125}
126
127/// A `Service` implementation that proxies requests to the given component.
128///
129/// Not intended for direct use. Use the `add_proxy_service_to` function instead.
130#[doc(hidden)]
131pub struct ProxyTo<P, O> {
132    directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
133    _phantom: PhantomData<(P, fn() -> O)>,
134}
135
136impl<P: DiscoverableProtocolMarker, O> Service for ProxyTo<P, O> {
137    type Output = O;
138    fn connect(&mut self, channel: zx::Channel) -> Option<O> {
139        if let Err(e) =
140            fdio::service_connect_at(self.directory_request.channel(), P::PROTOCOL_NAME, channel)
141        {
142            eprintln!("failed to proxy request to {}: {:?}", P::PROTOCOL_NAME, e);
143        }
144        None
145    }
146}
147
148// Not part of a trait so that clients won't have to import a trait
149// in order to call these functions.
150macro_rules! add_functions {
151    () => {
152        /// Adds a service connector to the directory.
153        ///
154        /// ```rust
155        /// let mut fs = ServiceFs::new_local();
156        /// fs
157        ///     .add_service_connector(|server_end: ServerEnd<EchoMarker>| {
158        ///         connect_channel_to_protocol::<EchoMarker>(
159        ///             server_end.into_channel(),
160        ///         )
161        ///     })
162        ///     .add_service_connector(|server_end: ServerEnd<CustomMarker>| {
163        ///         connect_channel_to_protocol::<CustomMarker>(
164        ///             server_end.into_channel(),
165        ///         )
166        ///     })
167        ///     .take_and_serve_directory_handle()?;
168        /// ```
169        ///
170        /// The FIDL service will be hosted at the name provided by the
171        /// `[Discoverable]` annotation in the FIDL source.
172        pub fn add_service_connector<F, P>(&mut self, service: F) -> &mut Self
173        where
174            F: FnMut(ServerEnd<P>) -> ServiceObjTy::Output,
175            P: DiscoverableProtocolMarker,
176            FidlServiceServerConnector<F, P, ServiceObjTy::Output>: Into<ServiceObjTy>,
177        {
178            self.add_service_at(P::PROTOCOL_NAME, FidlServiceServerConnector::from(service))
179        }
180
181        /// Adds a service to the directory at the given path.
182        ///
183        /// The path must be a single component containing no `/` characters.
184        ///
185        /// Panics if any node has already been added at the given path.
186        pub fn add_service_at(
187            &mut self,
188            path: impl Into<String>,
189            service: impl Into<ServiceObjTy>,
190        ) -> &mut Self {
191            let index = self.fs().services.len();
192            self.fs().services.push(service.into());
193            let sender = self.fs().new_connection_sender.clone();
194            self.add_entry_at(
195                path,
196                endpoint(move |_, channel| {
197                    // It's possible for this send to fail in the case where ServiceFs has been
198                    // dropped.  When that happens, ServiceFs will drop ExecutionScope which
199                    // contains the RemoteHandle for this task which will then cause this task to be
200                    // dropped but not necessarily immediately.  This will only occur when ServiceFs
201                    // has been dropped, so it's safe to ignore the error here.
202                    let _ = sender.unbounded_send((index, channel.into()));
203                }),
204            )
205        }
206
207        /// Adds a FIDL service to the directory.
208        ///
209        /// `service` is a closure that accepts a `RequestStream`.
210        /// Each service being served must return an instance of the same type
211        /// (`ServiceObjTy::Output`). This is necessary in order to multiplex
212        /// multiple services over the same dispatcher code. The typical way
213        /// to do this is to create an `enum` with variants for each service
214        /// you want to serve.
215        ///
216        /// ```rust
217        /// enum MyServices {
218        ///     EchoServer(EchoRequestStream),
219        ///     CustomServer(CustomRequestStream),
220        ///     // ...
221        /// }
222        /// ```
223        ///
224        /// The constructor for a variant of the `MyServices` enum can be passed
225        /// as the `service` parameter.
226        ///
227        /// ```rust
228        /// let mut fs = ServiceFs::new_local();
229        /// fs
230        ///     .add_fidl_service(MyServices::EchoServer)
231        ///     .add_fidl_service(MyServices::CustomServer)
232        ///     .take_and_serve_directory_handle()?;
233        /// ```
234        ///
235        /// `ServiceFs` can now be treated as a `Stream` of type `MyServices`.
236        ///
237        /// ```rust
238        /// const MAX_CONCURRENT: usize = 10_000;
239        /// fs.for_each_concurrent(MAX_CONCURRENT, |request: MyServices| {
240        ///     match request {
241        ///         MyServices::EchoServer(request) => handle_echo(request),
242        ///         MyServices::CustomServer(request) => handle_custom(request),
243        ///     }
244        /// }).await;
245        /// ```
246        ///
247        /// The FIDL service will be hosted at the name provided by the
248        /// `[Discoverable]` annotation in the FIDL source.
249        pub fn add_fidl_service<F, RS>(&mut self, service: F) -> &mut Self
250        where
251            F: FnMut(RS) -> ServiceObjTy::Output,
252            RS: RequestStream,
253            RS::Protocol: DiscoverableProtocolMarker,
254            FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
255        {
256            self.add_fidl_service_at(RS::Protocol::PROTOCOL_NAME, service)
257        }
258
259        /// Adds a FIDL service to the directory at the given path.
260        ///
261        /// The path must be a single component containing no `/` characters.
262        ///
263        /// See [`add_fidl_service`](#method.add_fidl_service) for details.
264        pub fn add_fidl_service_at<F, RS>(
265            &mut self,
266            path: impl Into<String>,
267            service: F,
268        ) -> &mut Self
269        where
270            F: FnMut(RS) -> ServiceObjTy::Output,
271            RS: RequestStream,
272            RS::Protocol: DiscoverableProtocolMarker,
273            FidlService<F, RS, ServiceObjTy::Output>: Into<ServiceObjTy>,
274        {
275            self.add_service_at(path, FidlService::from(service))
276        }
277
278        /// Adds a named instance of a FIDL service to the directory.
279        ///
280        /// The FIDL service will be hosted at `[SERVICE_NAME]/[instance]/` where `SERVICE_NAME` is
281        /// constructed from the FIDL library path and the name of the FIDL service.
282        ///
283        /// The `instance` must be a single component containing no `/` characters.
284        ///
285        /// # Example
286        ///
287        /// For the following FIDL definition,
288        /// ```fidl
289        /// library lib.foo;
290        ///
291        /// service Bar {
292        ///   ...
293        /// }
294        /// ```
295        ///
296        /// The `SERVICE_NAME` of FIDL Service `Bar` would be `lib.foo.Bar`.
297        pub fn add_fidl_service_instance<F, SR>(
298            &mut self,
299            instance: impl Into<String>,
300            service: F,
301        ) -> &mut Self
302        where
303            F: Fn(SR) -> ServiceObjTy::Output,
304            F: Clone,
305            SR: ServiceRequest,
306            FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
307        {
308            self.add_fidl_service_instance_at(SR::Service::SERVICE_NAME, instance, service)
309        }
310
311        /// Adds a named instance of a FIDL service to the directory at the given path.
312        ///
313        /// The FIDL service will be hosted at `[path]/[instance]/`.
314        ///
315        /// The `path` and `instance` must be single components containing no `/` characters.
316        pub fn add_fidl_service_instance_at<F, SR>(
317            &mut self,
318            path: impl Into<String>,
319            instance: impl Into<String>,
320            service: F,
321        ) -> &mut Self
322        where
323            F: Fn(SR) -> ServiceObjTy::Output,
324            F: Clone,
325            SR: ServiceRequest,
326            FidlServiceMember<F, SR, ServiceObjTy::Output>: Into<ServiceObjTy>,
327        {
328            // Create the service directory, with an instance subdirectory.
329            let mut dir = self.dir(path);
330            let mut dir = dir.dir(instance);
331
332            // Attach member protocols under the instance directory.
333            for member in SR::member_names() {
334                dir.add_service_at(*member, FidlServiceMember::new(service.clone(), member));
335            }
336            self
337        }
338
339        /// Adds a service that proxies requests to the current environment.
340        // NOTE: we'd like to be able to remove the type parameter `O` here,
341        //  but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
342        //  makes type checking angry.
343        pub fn add_proxy_service<P: DiscoverableProtocolMarker, O>(&mut self) -> &mut Self
344        where
345            ServiceObjTy: From<Proxy<P, O>>,
346            ServiceObjTy: ServiceObjTrait<Output = O>,
347        {
348            self.add_service_at(P::PROTOCOL_NAME, Proxy::<P, ServiceObjTy::Output>(PhantomData))
349        }
350
351        /// Adds a service that proxies requests to the given component.
352        // NOTE: we'd like to be able to remove the type parameter `O` here,
353        //  but unfortunately the bound `ServiceObjTy: From<Proxy<P, ServiceObjTy::Output>>`
354        //  makes type checking angry.
355        pub fn add_proxy_service_to<P: DiscoverableProtocolMarker, O>(
356            &mut self,
357            directory_request: Arc<fidl::endpoints::ClientEnd<fio::DirectoryMarker>>,
358        ) -> &mut Self
359        where
360            ServiceObjTy: From<ProxyTo<P, O>>,
361            ServiceObjTy: ServiceObjTrait<Output = O>,
362        {
363            self.add_service_at(
364                P::PROTOCOL_NAME,
365                ProxyTo::<P, ServiceObjTy::Output> { directory_request, _phantom: PhantomData },
366            )
367        }
368
369        /// Adds a VMO file to the directory at the given path.
370        ///
371        /// The path must be a single component containing no `/` characters. The vmo should have
372        /// content size set as required.
373        ///
374        /// Panics if any node has already been added at the given path.
375        pub fn add_vmo_file_at(&mut self, path: impl Into<String>, vmo: zx::Vmo) -> &mut Self {
376            self.add_entry_at(
377                path,
378                VmoFile::new(
379                    vmo, /*readable*/ true, /*writable*/ false, /*executable*/ false,
380                ),
381            )
382        }
383
384        /// Adds an entry to the directory at the given path.
385        ///
386        /// The path must be a single component.
387        /// The path must be a valid `fuchsia.io` [`Name`].
388        ///
389        /// Panics if any node has already been added at the given path.
390        pub fn add_entry_at(
391            &mut self,
392            path: impl Into<String>,
393            entry: Arc<dyn DirectoryEntry>,
394        ) -> &mut Self {
395            let path: String = path.into();
396            let name: Name = path.try_into().expect("Invalid path");
397            // This will fail if the name is invalid or already exists.
398            self.dir.add_entry_impl(name, entry, false).expect("Unable to add entry");
399            self
400        }
401
402        /// Returns a reference to the subdirectory at the given path,
403        /// creating one if none exists.
404        ///
405        /// The path must be a single component.
406        /// The path must be a valid `fuchsia.io` [`Name`].
407        ///
408        /// Panics if a service has already been added at the given path.
409        pub fn dir(&mut self, path: impl Into<String>) -> ServiceFsDir<'_, ServiceObjTy> {
410            let path: String = path.into();
411            let name: Name = path.try_into().expect("Invalid path");
412            let dir = Arc::downcast(self.dir.get_or_insert(name, new_simple_dir).into_any())
413                .unwrap_or_else(|_| panic!("Not a directory"));
414            ServiceFsDir { fs: self.fs(), dir }
415        }
416
417        /// Adds a new remote directory served over the given DirectoryProxy.
418        ///
419        /// The name must be a valid `fuchsia.io` [`Name`].
420        pub fn add_remote(
421            &mut self,
422            name: impl Into<String>,
423            proxy: fio::DirectoryProxy,
424        ) -> &mut Self {
425            let name: String = name.into();
426            let name: Name = name.try_into().expect("Invalid path");
427            self.dir.add_entry_impl(name, remote_dir(proxy), false).expect("Unable to add entry");
428            self
429        }
430    };
431}
432
433impl<ServiceObjTy: ServiceObjTrait> ServiceFsDir<'_, ServiceObjTy> {
434    fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
435        self.fs
436    }
437
438    add_functions!();
439}
440
441impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
442    fn new_impl() -> Self {
443        let (new_connection_sender, new_connection_receiver) = mpsc::unbounded();
444        let scope = ExecutionScope::new();
445        let dir = new_simple_dir();
446        Self {
447            scope: scope.clone(),
448            dir,
449            new_connection_sender,
450            new_connection_receiver,
451            services: Vec::new(),
452            shutdown: async move { scope.wait().await }.boxed(),
453            channel_queue: Some(Vec::new()),
454        }
455    }
456
457    fn fs(&mut self) -> &mut ServiceFs<ServiceObjTy> {
458        self
459    }
460
461    /// Get a reference to the root directory as a `ServiceFsDir`.
462    ///
463    /// This can be useful when writing code which hosts some set of services on
464    /// a directory and wants to be agnostic to whether that directory
465    /// is the root `ServiceFs` or a subdirectory.
466    ///
467    /// Such a function can take an `&mut ServiceFsDir<...>` as an argument,
468    /// allowing callers to provide either a subdirectory or `fs.root_dir()`.
469    pub fn root_dir(&mut self) -> ServiceFsDir<'_, ServiceObjTy> {
470        let dir = self.dir.clone();
471        ServiceFsDir { fs: self, dir }
472    }
473
474    add_functions!();
475
476    /// When a connection is first made to the `ServiceFs` in the absence of a parent connection,
477    /// it will be granted these rights.
478    const fn base_connection_flags() -> fio::Flags {
479        return fio::Flags::PROTOCOL_DIRECTORY
480            .union(fio::PERM_READABLE)
481            .union(fio::PERM_WRITABLE)
482            .union(fio::PERM_EXECUTABLE);
483    }
484
485    fn serve_connection_impl(&self, chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>) {
486        self.dir
487            .clone()
488            .open3(
489                self.scope.clone(),
490                Path::dot(),
491                Self::base_connection_flags(),
492                &mut ObjectRequest::new(
493                    Self::base_connection_flags(),
494                    &Default::default(),
495                    chan.into_channel(),
496                ),
497            )
498            .expect("failed to serve root ServiceFs connection");
499    }
500
501    /// Creates a protocol connector that can access the capabilities exposed by this ServiceFs.
502    pub fn create_protocol_connector<O>(&mut self) -> Result<ProtocolConnector, Error>
503    where
504        ServiceObjTy: ServiceObjTrait<Output = O>,
505    {
506        let (directory_request, directory_server_end) = fidl::endpoints::create_endpoints();
507        self.serve_connection(directory_server_end)?;
508
509        Ok(ProtocolConnector { directory_request })
510    }
511}
512
513fn new_simple_dir() -> Arc<PseudoDir> {
514    let dir = PseudoDir::new();
515    dir.clone().set_not_found_handler(Box::new(move |path| {
516        warn!(
517            "ServiceFs received request to `{}` but has not been configured to serve this path.",
518            path
519        );
520    }));
521    dir
522}
523
524/// `ProtocolConnector` allows connecting to capabilities exposed by ServiceFs
525pub struct ProtocolConnector {
526    directory_request: fidl::endpoints::ClientEnd<fio::DirectoryMarker>,
527}
528
529impl ProtocolConnector {
530    /// Connect to a protocol provided by this environment.
531    #[inline]
532    pub fn connect_to_service<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
533        self.connect_to_protocol::<P>()
534    }
535
536    /// Connect to a protocol provided by this environment.
537    #[inline]
538    pub fn connect_to_protocol<P: DiscoverableProtocolMarker>(&self) -> Result<P::Proxy, Error> {
539        let (client_channel, server_channel) = zx::Channel::create();
540        self.pass_to_protocol::<P>(server_channel)?;
541        Ok(P::Proxy::from_channel(fasync::Channel::from_channel(client_channel)))
542    }
543
544    /// Connect to a protocol by passing a channel for the server.
545    #[inline]
546    pub fn pass_to_protocol<P: DiscoverableProtocolMarker>(
547        &self,
548        server_channel: zx::Channel,
549    ) -> Result<(), Error> {
550        self.pass_to_named_protocol(P::PROTOCOL_NAME, server_channel)
551    }
552
553    /// Connect to a protocol by name.
554    #[inline]
555    pub fn pass_to_named_protocol(
556        &self,
557        protocol_name: &str,
558        server_channel: zx::Channel,
559    ) -> Result<(), Error> {
560        fdio::service_connect_at(self.directory_request.channel(), protocol_name, server_channel)?;
561        Ok(())
562    }
563}
564
565/// An error indicating the startup handle on which the FIDL server
566/// attempted to start was missing.
567#[derive(Debug, Error)]
568#[error("The startup handle on which the FIDL server attempted to start was missing.")]
569pub struct MissingStartupHandle;
570
571impl<ServiceObjTy: ServiceObjTrait> ServiceFs<ServiceObjTy> {
572    /// Removes the `DirectoryRequest` startup handle for the current
573    /// component and adds connects it to this `ServiceFs` as a client.
574    ///
575    /// Multiple calls to this function from the same component will
576    /// result in `Err(MissingStartupHandle)`.
577    pub fn take_and_serve_directory_handle(&mut self) -> Result<&mut Self, Error> {
578        let startup_handle = fuchsia_runtime::take_startup_handle(
579            fuchsia_runtime::HandleType::DirectoryRequest.into(),
580        )
581        .ok_or(MissingStartupHandle)?;
582
583        self.serve_connection(fidl::endpoints::ServerEnd::new(zx::Channel::from(startup_handle)))
584    }
585
586    /// Add a channel to serve this `ServiceFs` filesystem on. The `ServiceFs`
587    /// will continue to be provided over previously added channels, including
588    /// the one added if `take_and_serve_directory_handle` was called.
589    pub fn serve_connection(
590        &mut self,
591        chan: fidl::endpoints::ServerEnd<fio::DirectoryMarker>,
592    ) -> Result<&mut Self, Error> {
593        if let Some(channels) = &mut self.channel_queue {
594            channels.push(chan);
595        } else {
596            self.serve_connection_impl(chan);
597        }
598        Ok(self)
599    }
600
601    /// TODO(https://fxbug.dev/326626515): this is an experimental method to run a FIDL
602    /// directory connection until stalled, with the purpose to cleanly stop a component.
603    /// We'll expect to revisit how this works to generalize to all connections later.
604    /// Try not to use this function for other purposes.
605    ///
606    /// Normally the [`ServiceFs`] stream will block until all connections are closed.
607    /// In order to escrow the outgoing directory server endpoint, you may use this
608    /// function to get a [`StallableServiceFs`] that detects when no new requests
609    /// hit the outgoing directory for `debounce_interval`, and all hosted protocols
610    /// and other VFS connections to finish, then yield back the outgoing directory handle.
611    ///
612    /// The [`ServiceFs`] stream yields [`ServiceObjTy::Output`], which could be an enum
613    /// of FIDL connection requests in a typical component. By contrast, [`StallableServiceFs`]
614    /// yields an enum of either the request, or the unbound outgoing directory endpoint,
615    /// allowing you to escrow it back to `component_manager` before exiting the component.
616    pub fn until_stalled(
617        self,
618        debounce_interval: MonotonicDuration,
619    ) -> StallableServiceFs<ServiceObjTy> {
620        StallableServiceFs::<ServiceObjTy>::new(self, debounce_interval)
621    }
622}
623
624impl<ServiceObjTy: ServiceObjTrait> Stream for ServiceFs<ServiceObjTy> {
625    type Item = ServiceObjTy::Output;
626
627    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
628        if let Some(channels) = self.channel_queue.take() {
629            for chan in channels {
630                self.serve_connection_impl(chan);
631            }
632        }
633        while let Poll::Ready(Some((index, channel))) =
634            self.new_connection_receiver.poll_next_unpin(cx)
635        {
636            if let Some(stream) = self.services[index].service().connect(channel) {
637                return Poll::Ready(Some(stream));
638            }
639        }
640        self.shutdown.poll_unpin(cx).map(|_| None)
641    }
642}