fdomain_container/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::ClientEnd;
6use fidl::{AsHandleRef, HandleBased};
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27/// A queue. Basically just a `VecDeque` except we can asynchronously wait for
28/// an element to pop if it is empty.
29struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32    /// Create a new queue.
33    fn new() -> Self {
34        Queue(VecDeque::new(), None)
35    }
36
37    /// Whether the queue is empty.
38    fn is_empty(&self) -> bool {
39        self.0.is_empty()
40    }
41
42    /// Removes and discards the first element in the queue.
43    ///
44    /// # Panics
45    /// There *must* be a first element or this will panic.
46    fn destroy_front(&mut self) {
47        assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48    }
49
50    /// Pop the first element from the queue if available.
51    fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52        if let Some(t) = self.0.pop_front() {
53            Poll::Ready(t)
54        } else {
55            self.1 = Some(ctx.waker().clone());
56            Poll::Pending
57        }
58    }
59
60    /// Return an element to the front of the queue. Does not wake any waiters
61    /// as it is assumed the waiter is the one who popped it to begin with.
62    ///
63    /// This is used when we'd *like* to use `front_mut` but we can't borrow the
64    /// source of `self` for that long without giving ourselves lifetime
65    /// headaches.
66    fn push_front_no_wake(&mut self, t: T) {
67        self.0.push_front(t)
68    }
69
70    /// Push a new element to the back of the queue.
71    fn push_back(&mut self, t: T) {
72        self.0.push_back(t);
73        self.1.take().map(Waker::wake);
74    }
75
76    /// Get a mutable reference to the first element in the queue.
77    fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78        if let Some(t) = self.0.front_mut() {
79            Poll::Ready(t)
80        } else {
81            self.1 = Some(ctx.waker().clone());
82            Poll::Pending
83        }
84    }
85}
86
87/// Maximum amount to read for an async socket read.
88const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90/// Wraps the various FIDL Event types that can be produced by an FDomain
91#[derive(Debug)]
92pub enum FDomainEvent {
93    ChannelStreamingReadStart(NonZeroU32, Result<()>),
94    ChannelStreamingReadStop(NonZeroU32, Result<()>),
95    SocketStreamingReadStart(NonZeroU32, Result<()>),
96    SocketStreamingReadStop(NonZeroU32, Result<()>),
97    WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98    SocketData(NonZeroU32, Result<proto::SocketData>),
99    SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100    SocketDispositionSet(NonZeroU32, Result<()>),
101    WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102    ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103    ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104    WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105    ClosedHandle(NonZeroU32, Result<()>),
106    ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109/// An [`FDomainEvent`] that needs a bit more processing before it can be sent.
110/// I.e. it still contains `fidl::Handle` objects that need to be replaced with
111/// FDomain IDs.
112enum UnprocessedFDomainEvent {
113    Ready(FDomainEvent),
114    ChannelData(NonZeroU32, fidl::MessageBufEtc),
115    ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119    fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120        UnprocessedFDomainEvent::Ready(other)
121    }
122}
123
124/// Operations on a handle which are processed from the read queue.
125enum ReadOp {
126    /// Enable or disable async reads on a channel.
127    StreamingChannel(NonZeroU32, bool),
128    /// Enable or disable async reads on a socket.
129    StreamingSocket(NonZeroU32, bool),
130    Socket(NonZeroU32, u64),
131    Channel(NonZeroU32),
132}
133
134/// An in-progress socket write. It may take multiple syscalls to write to a
135/// socket, so this tracks how many bytes were written already and how many
136/// remain to be written.
137struct SocketWrite {
138    tid: NonZeroU32,
139    wrote: usize,
140    to_write: Vec<u8>,
141}
142
143/// Operations on a handle which are processed from the write queue.
144enum WriteOp {
145    Socket(SocketWrite),
146    Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147    SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150/// A handle which is being moved out of the FDomain by a channel write call or
151/// closure/replacement.  There may still be operations to perform on this
152/// handle, so the write should not proceed while the handle is in the `InUse`
153/// state.
154enum ShuttingDownHandle {
155    InUse(proto::HandleId, HandleState),
156    Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160    fn poll_ready(
161        &mut self,
162        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163        ctx: &mut Context<'_>,
164    ) -> Poll<()> {
165        replace_with(self, |this| match this {
166            this @ ShuttingDownHandle::Ready(_) => this,
167            ShuttingDownHandle::InUse(hid, mut state) => {
168                state.poll(event_queue, ctx);
169
170                if state.write_queue.is_empty() {
171                    while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172                        match op {
173                            ReadOp::StreamingChannel(tid, start) => {
174                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175                                    id: hid.id,
176                                }));
177                                let event = if start {
178                                    FDomainEvent::ChannelStreamingReadStart(tid, err)
179                                } else {
180                                    FDomainEvent::ChannelStreamingReadStop(tid, err)
181                                };
182                                event_queue.push_back(event.into());
183                            }
184                            ReadOp::StreamingSocket(tid, start) => {
185                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186                                    id: hid.id,
187                                }));
188                                let event = if start {
189                                    FDomainEvent::SocketStreamingReadStart(tid, err)
190                                } else {
191                                    FDomainEvent::SocketStreamingReadStop(tid, err)
192                                };
193                                event_queue.push_back(event.into());
194                            }
195                            ReadOp::Channel(tid) => {
196                                let err = state
197                                    .handle
198                                    .expected_type(fidl::ObjectType::CHANNEL)
199                                    .err()
200                                    .unwrap_or(proto::Error::ClosedDuringRead(
201                                        proto::ClosedDuringRead,
202                                    ));
203                                event_queue
204                                    .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205                            }
206                            ReadOp::Socket(tid, _max_bytes) => {
207                                let err = state
208                                    .handle
209                                    .expected_type(fidl::ObjectType::SOCKET)
210                                    .err()
211                                    .unwrap_or(proto::Error::ClosedDuringRead(
212                                        proto::ClosedDuringRead,
213                                    ));
214                                event_queue
215                                    .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216                            }
217                        }
218                    }
219
220                    if state.async_read_in_progress {
221                        match &*state.handle {
222                            AnyHandle::Channel(_) => event_queue.push_back(
223                                FDomainEvent::ChannelStreamingData(
224                                    proto::ChannelOnChannelStreamingDataRequest {
225                                        handle: hid,
226                                        channel_sent: proto::ChannelSent::Stopped(
227                                            proto::AioStopped { error: None },
228                                        ),
229                                    },
230                                )
231                                .into(),
232                            ),
233                            AnyHandle::Socket(_) => event_queue.push_back(
234                                FDomainEvent::SocketStreamingData(
235                                    proto::SocketOnSocketStreamingDataRequest {
236                                        handle: hid,
237                                        socket_message: proto::SocketMessage::Stopped(
238                                            proto::AioStopped { error: None },
239                                        ),
240                                    },
241                                )
242                                .into(),
243                            ),
244                            AnyHandle::EventPair(_)
245                            | AnyHandle::Event(_)
246                            | AnyHandle::Unknown(_) => unreachable!(),
247                        }
248                    }
249
250                    state.signal_waiters.clear();
251                    state.io_waiter = None;
252
253                    ShuttingDownHandle::Ready(
254                        Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255                    )
256                } else {
257                    ShuttingDownHandle::InUse(hid, state)
258                }
259            }
260        });
261
262        if matches!(self, ShuttingDownHandle::Ready(_)) {
263            Poll::Ready(())
264        } else {
265            Poll::Pending
266        }
267    }
268}
269
270/// A vector of [`ShuttingDownHandle`] paired with rights for the new handles, which
271/// can transition into being a vector of [`fidl::HandleDisposition`] when all the
272/// handles are ready.
273enum HandlesToWrite {
274    SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
275    AllReady(Vec<fidl::HandleDisposition<'static>>),
276}
277
278impl HandlesToWrite {
279    fn poll_ready(
280        &mut self,
281        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
282        ctx: &mut Context<'_>,
283    ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
284        match self {
285            HandlesToWrite::AllReady(s) => Poll::Ready(s),
286            HandlesToWrite::SomeInUse(handles) => {
287                let mut ready = true;
288                for (handle, _) in handles.iter_mut() {
289                    ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
290                }
291
292                if !ready {
293                    return Poll::Pending;
294                }
295
296                *self = HandlesToWrite::AllReady(
297                    handles
298                        .drain(..)
299                        .map(|(handle, rights)| {
300                            let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
301
302                            fidl::HandleDisposition::new(
303                                fidl::HandleOp::Move(handle.into()),
304                                fidl::ObjectType::NONE,
305                                rights,
306                                fidl::Status::OK,
307                            )
308                        })
309                        .collect(),
310                );
311
312                let HandlesToWrite::AllReady(s) = self else { unreachable!() };
313                Poll::Ready(s)
314            }
315        }
316    }
317}
318
319struct AnyHandleRef(Arc<AnyHandle>);
320
321impl AsHandleRef for AnyHandleRef {
322    fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
323        self.0.as_handle_ref()
324    }
325}
326
327#[cfg(target_os = "fuchsia")]
328type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
329
330#[cfg(not(target_os = "fuchsia"))]
331type OnSignals = fasync::OnSignalsRef<'static>;
332
333/// Represents a `WaitForSignals` transaction from a client. When the contained
334/// `OnSignals` polls to completion we can reply to the transaction.
335struct SignalWaiter {
336    tid: NonZeroU32,
337    waiter: OnSignals,
338}
339
340/// Information about a single handle within the [`FDomain`].
341struct HandleState {
342    /// The handle itself.
343    handle: Arc<AnyHandle>,
344    /// Our handle ID.
345    hid: proto::HandleId,
346    /// Whether this is a datagram socket. We have to handle data coming out of
347    /// datagram sockets a bit differently to preserve their semantics from the
348    /// perspective of the host and avoid data loss.
349    is_datagram_socket: bool,
350    /// Indicates we are sending `On*StreamingData` events to the client
351    /// presently. It is an error for the user to try to move the handle out of
352    /// the FDomain (e.g. send it through a channel or close it) until after
353    /// they request that streaming events stop.
354    async_read_in_progress: bool,
355    /// Queue of client requests to read from the handle. We have to queue read
356    /// requests because they may block, and we don't want to block the event
357    /// loop or be unable to handle further requests while a long read request
358    /// is blocking. Also we want to retire read requests in the order they were
359    /// submitted, otherwise pipelined reads could return data in a strange order.
360    read_queue: Queue<ReadOp>,
361    /// Queue of client requests to write to the handle. We have to queue write
362    /// requests for the same reason we have to queue read requests. Since we
363    /// process the queue one at a time, we need a separate queue for writes
364    /// otherwise we'd effectively make handles half-duplex, with read requests
365    /// unable to proceed if a write request is blocked at the head of the
366    /// queue.
367    write_queue: Queue<WriteOp>,
368    /// List of outstanding `WaitForSignals` transactions.
369    signal_waiters: Vec<SignalWaiter>,
370    /// Contains a waiter on this handle for IO reading and writing. Populated
371    /// whenever we need to block on IO to service a request.
372    io_waiter: Option<OnSignals>,
373}
374
375impl HandleState {
376    fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
377        let is_datagram_socket = match handle.is_datagram_socket() {
378            IsDatagramSocket::Unknown => {
379                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
380                    type_: proto::SocketType::unknown(),
381                }))
382            }
383            other => other.is_datagram(),
384        };
385        Ok(HandleState {
386            handle: Arc::new(handle),
387            hid,
388            async_read_in_progress: false,
389            is_datagram_socket,
390            read_queue: Queue::new(),
391            write_queue: Queue::new(),
392            signal_waiters: Vec::new(),
393            io_waiter: None,
394        })
395    }
396
397    /// Poll this handle state. Lets us handle our IO queues and wait for the
398    /// next IO event.
399    fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
400        self.signal_waiters.retain_mut(|x| {
401            let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
402                return true;
403            };
404
405            event_queue.push_back(
406                FDomainEvent::WaitForSignals(
407                    x.tid,
408                    result
409                        .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
410                        .map_err(|e| proto::Error::TargetError(e.into_raw())),
411                )
412                .into(),
413            );
414
415            false
416        });
417
418        let read_signals = self.handle.read_signals();
419        let write_signals = self.handle.write_signals();
420
421        loop {
422            if let Some(signal_waiter) = self.io_waiter.as_mut() {
423                if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
424                    if let Ok(sigs) = sigs {
425                        if sigs.intersects(read_signals) {
426                            self.process_read_queue(event_queue, ctx);
427                        }
428                        if sigs.intersects(write_signals) {
429                            self.process_write_queue(event_queue, ctx);
430                        }
431                    }
432                } else {
433                    let need_read = matches!(
434                        self.read_queue.front_mut(ctx),
435                        Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
436                    );
437                    let need_write = matches!(
438                        self.write_queue.front_mut(ctx),
439                        Poll::Ready(WriteOp::SetDisposition(_, _, _))
440                    );
441
442                    self.process_read_queue(event_queue, ctx);
443                    self.process_write_queue(event_queue, ctx);
444
445                    if !(need_read || need_write) {
446                        break;
447                    }
448                }
449            }
450
451            let subscribed_signals =
452                if self.async_read_in_progress || !self.read_queue.is_empty() {
453                    read_signals
454                } else {
455                    fidl::Signals::NONE
456                } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
457
458            if !subscribed_signals.is_empty() {
459                self.io_waiter = Some(OnSignals::new(
460                    AnyHandleRef(Arc::clone(&self.handle)),
461                    subscribed_signals,
462                ));
463            } else {
464                self.io_waiter = None;
465                break;
466            }
467        }
468    }
469
470    /// Set `async_read_in_progress` to `true`. Return an error if it was already `true`.
471    fn try_enable_async_read(&mut self) -> Result<()> {
472        if self.async_read_in_progress {
473            Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
474        } else {
475            self.async_read_in_progress = true;
476            Ok(())
477        }
478    }
479
480    /// Set `async_read_in_progress` to `false`. Return an error if it was already `false`.
481    fn try_disable_async_read(&mut self) -> Result<()> {
482        if !self.async_read_in_progress {
483            Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
484        } else {
485            self.async_read_in_progress = false;
486            Ok(())
487        }
488    }
489
490    /// Handle events from the front of the read queue.
491    fn process_read_queue(
492        &mut self,
493        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
494        ctx: &mut Context<'_>,
495    ) {
496        while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
497            match op {
498                ReadOp::StreamingChannel(tid, true) => {
499                    let tid = *tid;
500                    let result = self.try_enable_async_read();
501                    event_queue
502                        .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
503                    self.read_queue.destroy_front();
504                }
505                ReadOp::StreamingChannel(tid, false) => {
506                    let tid = *tid;
507                    let result = self.try_disable_async_read();
508                    event_queue
509                        .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
510                    self.read_queue.destroy_front();
511                }
512                ReadOp::StreamingSocket(tid, true) => {
513                    let tid = *tid;
514                    let result = self.try_enable_async_read();
515                    event_queue
516                        .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
517                    self.read_queue.destroy_front();
518                }
519                ReadOp::StreamingSocket(tid, false) => {
520                    let tid = *tid;
521                    let result = self.try_disable_async_read();
522                    event_queue
523                        .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
524                    self.read_queue.destroy_front();
525                }
526                ReadOp::Socket(tid, max_bytes) => {
527                    let (tid, max_bytes) = (*tid, *max_bytes);
528                    if let Some(event) = self.do_read_socket(tid, max_bytes) {
529                        let _ = self.read_queue.pop_front(ctx);
530                        event_queue.push_back(event.into());
531                    } else {
532                        break;
533                    }
534                }
535                ReadOp::Channel(tid) => {
536                    let tid = *tid;
537                    if let Some(event) = self.do_read_channel(tid) {
538                        let _ = self.read_queue.pop_front(ctx);
539                        event_queue.push_back(event.into());
540                    } else {
541                        break;
542                    }
543                }
544            }
545        }
546
547        if self.async_read_in_progress {
548            // We should have error'd out of any blocking operations if we had a
549            // read in progress.
550            assert!(self.read_queue.is_empty());
551            self.process_async_read(event_queue);
552        }
553    }
554
555    fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
556        assert!(self.async_read_in_progress);
557
558        match &*self.handle {
559            AnyHandle::Channel(_) => {
560                'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
561                    match result {
562                        Ok(msg) => event_queue.push_back(
563                            UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
564                        ),
565                        Err(e) => {
566                            event_queue.push_back(
567                                FDomainEvent::ChannelStreamingData(
568                                    proto::ChannelOnChannelStreamingDataRequest {
569                                        handle: self.hid,
570                                        channel_sent: proto::ChannelSent::Stopped(
571                                            proto::AioStopped { error: Some(Box::new(e)) },
572                                        ),
573                                    },
574                                )
575                                .into(),
576                            );
577                            self.async_read_in_progress = false;
578                            break 'read_loop;
579                        }
580                    }
581                }
582            }
583
584            AnyHandle::Socket(_) => {
585                'read_loop: while let Some(result) =
586                    self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
587                {
588                    match result {
589                        Ok(data) => {
590                            event_queue.push_back(
591                                FDomainEvent::SocketStreamingData(
592                                    proto::SocketOnSocketStreamingDataRequest {
593                                        handle: self.hid,
594                                        socket_message: proto::SocketMessage::Data(
595                                            proto::SocketData {
596                                                data,
597                                                is_datagram: self.is_datagram_socket,
598                                            },
599                                        ),
600                                    },
601                                )
602                                .into(),
603                            );
604                        }
605                        Err(e) => {
606                            event_queue.push_back(
607                                FDomainEvent::SocketStreamingData(
608                                    proto::SocketOnSocketStreamingDataRequest {
609                                        handle: self.hid,
610                                        socket_message: proto::SocketMessage::Stopped(
611                                            proto::AioStopped { error: Some(Box::new(e)) },
612                                        ),
613                                    },
614                                )
615                                .into(),
616                            );
617                            self.async_read_in_progress = false;
618                            break 'read_loop;
619                        }
620                    }
621                }
622            }
623
624            _ => unreachable!("Processed async read for unreadable handle type!"),
625        }
626    }
627
628    /// Handle events from the front of the write queue.
629    fn process_write_queue(
630        &mut self,
631        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
632        ctx: &mut Context<'_>,
633    ) {
634        // We want to mutate and *maybe* pop the front of the write queue, but
635        // lifetime shenanigans mean we can't do that and also access `self`,
636        // which we need. So we pop the item always, and then maybe push it to
637        // the front again if we didn't actually want to pop it.
638        while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
639            match op {
640                WriteOp::Socket(mut op) => {
641                    if let Some(event) = self.do_write_socket(&mut op) {
642                        event_queue.push_back(event.into());
643                    } else {
644                        self.write_queue.push_front_no_wake(WriteOp::Socket(op));
645                        break;
646                    }
647                }
648                WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
649                    let result = { self.handle.socket_disposition(disposition, disposition_peer) };
650                    event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
651                }
652                WriteOp::Channel(tid, data, mut handles) => {
653                    if self
654                        .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
655                        .is_pending()
656                    {
657                        self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
658                        break;
659                    }
660                }
661            }
662        }
663    }
664
665    /// Attempt to read from the handle in this [`HandleState`] as if it were a
666    /// socket. If the read succeeds or produces an error that should not be
667    /// retried, produce an [`FDomainEvent`] containing the result.
668    fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
669        if self.async_read_in_progress {
670            return Some(
671                FDomainEvent::SocketData(
672                    tid,
673                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
674                )
675                .into(),
676            );
677        }
678
679        let max_bytes = if self.is_datagram_socket {
680            let AnyHandle::Socket(s) = &*self.handle else {
681                unreachable!("Read socket from state that wasn't for a socket!");
682            };
683            match s.info() {
684                Ok(x) => x.rx_buf_available as u64,
685                // We should always succeed. The only failures are if we don't
686                // have the rights or something's screwed up with the handle. We
687                // know we have the rights because figuring out this was a
688                // datagram socket to begin with meant calling the same call on
689                // the same handle earlier.
690                Err(e) => {
691                    return Some(FDomainEvent::SocketData(
692                        tid,
693                        Err(proto::Error::TargetError(e.into_raw())),
694                    ))
695                }
696            }
697        } else {
698            max_bytes
699        };
700        self.handle.read_socket(max_bytes).transpose().map(|x| {
701            FDomainEvent::SocketData(
702                tid,
703                x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
704            )
705        })
706    }
707
708    /// Attempt to write to the handle in this [`HandleState`] as if it were a
709    /// socket. If the write succeeds or produces an error that should not be
710    /// retried, produce an [`FDomainEvent`] containing the result.
711    fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
712        match self.handle.write_socket(&op.to_write) {
713            Ok(wrote) => {
714                op.wrote += wrote;
715                op.to_write.drain(..wrote);
716
717                if op.to_write.is_empty() {
718                    Some(FDomainEvent::WroteSocket(
719                        op.tid,
720                        Ok(proto::SocketWriteSocketResponse {
721                            wrote: op.wrote.try_into().unwrap(),
722                        }),
723                    ))
724                } else {
725                    None
726                }
727            }
728            Err(error) => Some(FDomainEvent::WroteSocket(
729                op.tid,
730                Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
731            )),
732        }
733    }
734
735    /// Attempt to write to the handle in this [`HandleState`] as if it were a
736    /// channel. If the write succeeds or produces an error that should not be
737    /// retried, produce an [`FDomainEvent`] containing the result.
738    fn do_write_channel(
739        &mut self,
740        tid: NonZeroU32,
741        data: &[u8],
742        handles: &mut HandlesToWrite,
743        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
744        ctx: &mut Context<'_>,
745    ) -> Poll<()> {
746        let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
747            return Poll::Pending;
748        };
749
750        let ret = self.handle.write_channel(data, handles);
751        if let Some(ret) = ret {
752            event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
753        }
754        Poll::Ready(())
755    }
756
757    /// Attempt to read from the handle in this [`HandleState`] as if it were a
758    /// channel. If the read succeeds or produces an error that should not be
759    /// retried, produce an [`FDomainEvent`] containing the result.
760    fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
761        if self.async_read_in_progress {
762            return Some(
763                FDomainEvent::ChannelData(
764                    tid,
765                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
766                )
767                .into(),
768            );
769        }
770        match self.handle.read_channel() {
771            Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
772            Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
773        }
774    }
775}
776
777/// State for a handle which is closing, but which needs its read and write
778/// queues flushed first.
779struct ClosingHandle {
780    action: Arc<CloseAction>,
781    state: Option<ShuttingDownHandle>,
782}
783
784impl ClosingHandle {
785    fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
786        if let Some(state) = self.state.as_mut() {
787            if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
788                let state = self.state.take().unwrap();
789                let ShuttingDownHandle::Ready(handle) = state else {
790                    unreachable!();
791                };
792                self.action.perform(fdomain, handle);
793                Poll::Ready(())
794            } else {
795                Poll::Pending
796            }
797        } else {
798            Poll::Ready(())
799        }
800    }
801}
802
803/// When the client requests a handle to be closed or moved or otherwise
804/// destroyed, it goes into limbo for a bit while pending read and write actions
805/// are flushed. This is how we mark what should happen to the handle after that
806/// period ends.
807enum CloseAction {
808    Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
809    Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
810}
811
812impl CloseAction {
813    fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
814        match self {
815            CloseAction::Close { tid, count, result } => {
816                if count.fetch_sub(1, Ordering::Relaxed) == 1 {
817                    fdomain
818                        .event_queue
819                        .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
820                }
821            }
822            CloseAction::Replace { tid, new_hid, rights } => {
823                let result = handle
824                    .replace(*rights)
825                    .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
826                fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
827            }
828        }
829    }
830}
831
832/// This is a container of handles that is manipulable via the FDomain protocol.
833/// See [RFC-0228].
834///
835/// Most of the methods simply handle FIDL requests from the FDomain protocol.
836#[pin_project::pin_project]
837pub struct FDomain {
838    namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
839    handles: HashMap<proto::HandleId, HandleState>,
840    closing_handles: Vec<ClosingHandle>,
841    event_queue: VecDeque<UnprocessedFDomainEvent>,
842    waker: Option<Waker>,
843}
844
845impl FDomain {
846    /// Create a new FDomain. The new FDomain is empty and ready to be connected
847    /// to by a client.
848    pub fn new_empty() -> Self {
849        Self::new(|| Err(fidl::Status::NOT_FOUND))
850    }
851
852    /// Create a new FDomain populated with the given namespace entries.
853    pub fn new(
854        namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
855    ) -> Self {
856        FDomain {
857            namespace: Box::new(namespace),
858            handles: HashMap::new(),
859            closing_handles: Vec::new(),
860            event_queue: VecDeque::new(),
861            waker: None,
862        }
863    }
864
865    /// Add an event to be emitted by this FDomain.
866    fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
867        self.event_queue.push_back(event.into());
868        self.waker.take().map(Waker::wake);
869    }
870
871    /// Given a [`fidl::MessageBufEtc`], load all of the handles from it into this
872    /// FDomain and return a [`ReadChannelPayload`](proto::ReadChannelPayload)
873    /// with the same data and the IDs for the handles.
874    fn process_message(
875        &mut self,
876        message: fidl::MessageBufEtc,
877    ) -> Result<proto::ChannelMessage, proto::Error> {
878        let (data, handles) = message.split();
879        let handles = handles
880            .into_iter()
881            .map(|info| {
882                let type_ = info.object_type;
883
884                let handle = match info.object_type {
885                    fidl::ObjectType::CHANNEL => {
886                        AnyHandle::Channel(fidl::Channel::from_handle(info.handle))
887                    }
888                    fidl::ObjectType::SOCKET => {
889                        AnyHandle::Socket(fidl::Socket::from_handle(info.handle))
890                    }
891                    fidl::ObjectType::EVENTPAIR => {
892                        AnyHandle::EventPair(fidl::EventPair::from_handle(info.handle))
893                    }
894                    fidl::ObjectType::EVENT => {
895                        AnyHandle::Event(fidl::Event::from_handle(info.handle))
896                    }
897                    _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
898                };
899
900                Ok(proto::HandleInfo {
901                    rights: info.rights,
902                    handle: self.alloc_fdomain_handle(handle)?,
903                    type_,
904                })
905            })
906            .collect::<Result<Vec<_>, proto::Error>>()?;
907
908        Ok(proto::ChannelMessage { data, handles })
909    }
910
911    /// Allocate `N` new handle IDs. These are allocated from
912    /// [`NewHandleId`](proto::NewHandleId) and are expected to follow the protocol
913    /// rules for client-allocated handle IDs.
914    ///
915    /// If any of the handles passed fail to allocate, none of the handles will
916    /// be allocated.
917    fn alloc_client_handles<const N: usize>(
918        &mut self,
919        ids: [proto::NewHandleId; N],
920        handles: [AnyHandle; N],
921    ) -> Result<(), proto::Error> {
922        for id in ids {
923            if id.id & (1 << 31) != 0 {
924                return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
925                    id: id.id,
926                }));
927            }
928
929            if self.handles.contains_key(&proto::HandleId { id: id.id }) {
930                return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
931                    id: id.id,
932                    same_call: false,
933                }));
934            }
935        }
936
937        let mut sorted_ids = ids;
938        sorted_ids.sort();
939
940        if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
941            Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
942                id: a[0].id,
943                same_call: true,
944            }))
945        } else {
946            let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
947            let handles = ids
948                .zip(handles.into_iter())
949                .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
950                .collect::<Result<Vec<_>, proto::Error>>()?;
951
952            self.handles.extend(handles);
953
954            Ok(())
955        }
956    }
957
958    /// Allocate a new handle ID. These are allocated internally and are
959    /// expected to follow the protocol rules for FDomain-allocated handle IDs.
960    fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
961        loop {
962            let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
963            if let Entry::Vacant(v) = self.handles.entry(id) {
964                v.insert(HandleState::new(handle, id)?);
965                break Ok(id);
966            }
967        }
968    }
969
970    /// If a handle exists in this FDomain, remove it.
971    fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
972        self.handles
973            .remove(&handle)
974            .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
975    }
976
977    /// Use a handle in our handle table, if it exists.
978    fn using_handle<T>(
979        &mut self,
980        id: proto::HandleId,
981        f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
982    ) -> Result<T, proto::Error> {
983        if let Some(s) = self.handles.get_mut(&id) {
984            f(s)
985        } else {
986            Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
987        }
988    }
989
990    pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
991        match (self.namespace)() {
992            Ok(endpoint) => self.alloc_client_handles(
993                [request.new_handle],
994                [AnyHandle::Channel(endpoint.into_channel())],
995            ),
996            Err(e) => Err(proto::Error::TargetError(e.into_raw())),
997        }
998    }
999
1000    pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
1001        let (a, b) = fidl::Channel::create();
1002        self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
1003    }
1004
1005    pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1006        let (a, b) = match request.options {
1007            proto::SocketType::Stream => fidl::Socket::create_stream(),
1008            proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1009            type_ => {
1010                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }))
1011            }
1012        };
1013
1014        self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1015    }
1016
1017    pub fn create_event_pair(
1018        &mut self,
1019        request: proto::EventPairCreateEventPairRequest,
1020    ) -> Result<()> {
1021        let (a, b) = fidl::EventPair::create();
1022        self.alloc_client_handles(
1023            request.handles,
1024            [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1025        )
1026    }
1027
1028    pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1029        let a = fidl::Event::create();
1030        self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1031    }
1032
1033    pub fn set_socket_disposition(
1034        &mut self,
1035        tid: NonZeroU32,
1036        request: proto::SocketSetSocketDispositionRequest,
1037    ) {
1038        if let Err(err) = self.using_handle(request.handle, |h| {
1039            h.write_queue.push_back(WriteOp::SetDisposition(
1040                tid,
1041                request.disposition,
1042                request.disposition_peer,
1043            ));
1044            Ok(())
1045        }) {
1046            self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1047        }
1048    }
1049
1050    pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1051        if let Err(e) = self.using_handle(request.handle, |h| {
1052            h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1053            Ok(())
1054        }) {
1055            self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1056        }
1057    }
1058
1059    pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1060        if let Err(e) = self.using_handle(request.handle, |h| {
1061            h.read_queue.push_back(ReadOp::Channel(tid));
1062            Ok(())
1063        }) {
1064            self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1065        }
1066    }
1067
1068    pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1069        if let Err(error) = self.using_handle(request.handle, |h| {
1070            h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1071                tid,
1072                wrote: 0,
1073                to_write: request.data,
1074            }));
1075            Ok(())
1076        }) {
1077            self.push_event(FDomainEvent::WroteSocket(
1078                tid,
1079                Err(proto::WriteSocketError { error, wrote: 0 }),
1080            ));
1081        }
1082    }
1083
1084    pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1085        // Go through the list of handles in the requests (which will either be
1086        // a simple list of handles or a list of HandleDispositions) and obtain
1087        // for each a `ShuttingDownHandle` which contains our handle state (the
1088        // "Shutting down" refers to the fact that we're pulling the handle out
1089        // of the FDomain in order to send it) and the rights the requester
1090        // would like the handle to have upon arrival at the other end of the
1091        // channel.
1092        let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1093            proto::Handles::Handles(h) => h
1094                .into_iter()
1095                .map(|h| {
1096                    if h != request.handle {
1097                        self.take_handle(h).map(|handle_state| {
1098                            (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1099                        })
1100                    } else {
1101                        Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1102                    }
1103                })
1104                .collect(),
1105            proto::Handles::Dispositions(d) => d
1106                .into_iter()
1107                .map(|d| {
1108                    let res = match d.handle {
1109                        proto::HandleOp::Move_(h) => {
1110                            if h != request.handle {
1111                                self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1112                            } else {
1113                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1114                            }
1115                        }
1116                        proto::HandleOp::Duplicate(h) => {
1117                            if h != request.handle {
1118                                // If the requester wants us to duplicate the
1119                                // handle, we do so now rather than letting
1120                                // `write_etc` do it. Otherwise we have to use a
1121                                // reference to the handle, and we get lifetime
1122                                // hell.
1123                                self.using_handle(h, |h| {
1124                                    h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1125                                })
1126                                .map(ShuttingDownHandle::Ready)
1127                            } else {
1128                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1129                            }
1130                        }
1131                    };
1132
1133                    res.and_then(|x| Ok((x, d.rights)))
1134                })
1135                .collect(),
1136        };
1137
1138        if handles.iter().any(|x| x.is_err()) {
1139            let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1140
1141            self.push_event(FDomainEvent::WroteChannel(
1142                tid,
1143                Err(proto::WriteChannelError::OpErrors(e)),
1144            ));
1145            return;
1146        }
1147
1148        let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1149
1150        if let Err(e) = self.using_handle(request.handle, |h| {
1151            h.write_queue.push_back(WriteOp::Channel(
1152                tid,
1153                request.data,
1154                HandlesToWrite::SomeInUse(handles),
1155            ));
1156            Ok(())
1157        }) {
1158            self.push_event(FDomainEvent::WroteChannel(
1159                tid,
1160                Err(proto::WriteChannelError::Error(e)),
1161            ));
1162        }
1163    }
1164
1165    pub fn wait_for_signals(
1166        &mut self,
1167        tid: NonZeroU32,
1168        request: proto::FDomainWaitForSignalsRequest,
1169    ) {
1170        let result = self.using_handle(request.handle, |h| {
1171            let signals = fidl::Signals::from_bits_retain(request.signals);
1172            h.signal_waiters.push(SignalWaiter {
1173                tid,
1174                waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1175            });
1176            Ok(())
1177        });
1178
1179        if let Err(e) = result {
1180            self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1181        } else {
1182            self.waker.take().map(Waker::wake);
1183        }
1184    }
1185
1186    pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1187        let mut states = Vec::with_capacity(request.handles.len());
1188        let mut result = Ok(());
1189        for hid in request.handles {
1190            match self.take_handle(hid) {
1191                Ok(state) => states.push((hid, state)),
1192
1193                Err(e) => {
1194                    result = result.and(Err(e));
1195                }
1196            }
1197        }
1198
1199        let action = Arc::new(CloseAction::Close {
1200            tid,
1201            count: AtomicU32::new(states.len().try_into().unwrap()),
1202            result,
1203        });
1204
1205        for (hid, state) in states {
1206            self.closing_handles.push(ClosingHandle {
1207                action: Arc::clone(&action),
1208                state: Some(ShuttingDownHandle::InUse(hid, state)),
1209            });
1210        }
1211    }
1212
1213    pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1214        let rights = request.rights;
1215        let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1216        handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1217    }
1218
1219    pub fn replace(
1220        &mut self,
1221        tid: NonZeroU32,
1222        request: proto::FDomainReplaceRequest,
1223    ) -> Result<()> {
1224        let rights = request.rights;
1225        let new_hid = request.new_handle;
1226        match self.take_handle(request.handle) {
1227            Ok(state) => self.closing_handles.push(ClosingHandle {
1228                action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1229                state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1230            }),
1231            Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1232                FDomainEvent::ReplacedHandle(tid, Err(e)),
1233            )),
1234        }
1235
1236        Ok(())
1237    }
1238
1239    pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1240        let set = fidl::Signals::from_bits_retain(request.set);
1241        let clear = fidl::Signals::from_bits_retain(request.clear);
1242
1243        self.using_handle(request.handle, |h| {
1244            h.handle.signal_handle(clear, set).map_err(|e| proto::Error::TargetError(e.into_raw()))
1245        })
1246    }
1247
1248    pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1249        let set = fidl::Signals::from_bits_retain(request.set);
1250        let clear = fidl::Signals::from_bits_retain(request.clear);
1251
1252        self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1253    }
1254
1255    pub fn read_channel_streaming_start(
1256        &mut self,
1257        tid: NonZeroU32,
1258        request: proto::ChannelReadChannelStreamingStartRequest,
1259    ) {
1260        if let Err(err) = self.using_handle(request.handle, |h| {
1261            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1262            h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1263            Ok(())
1264        }) {
1265            self.event_queue
1266                .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1267        }
1268    }
1269
1270    pub fn read_channel_streaming_stop(
1271        &mut self,
1272        tid: NonZeroU32,
1273        request: proto::ChannelReadChannelStreamingStopRequest,
1274    ) {
1275        if let Err(err) = self.using_handle(request.handle, |h| {
1276            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1277            h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1278            Ok(())
1279        }) {
1280            self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1281        }
1282    }
1283
1284    pub fn read_socket_streaming_start(
1285        &mut self,
1286        tid: NonZeroU32,
1287        request: proto::SocketReadSocketStreamingStartRequest,
1288    ) {
1289        if let Err(err) = self.using_handle(request.handle, |h| {
1290            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1291            h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1292            Ok(())
1293        }) {
1294            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1295        }
1296    }
1297
1298    pub fn read_socket_streaming_stop(
1299        &mut self,
1300        tid: NonZeroU32,
1301        request: proto::SocketReadSocketStreamingStopRequest,
1302    ) {
1303        if let Err(err) = self.using_handle(request.handle, |h| {
1304            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1305            h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1306            Ok(())
1307        }) {
1308            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1309        }
1310    }
1311}
1312
1313/// [`FDomain`] implements a stream of events, for protocol events and for
1314/// replies to long-running methods.
1315impl futures::Stream for FDomain {
1316    type Item = FDomainEvent;
1317
1318    fn poll_next(
1319        mut self: std::pin::Pin<&mut Self>,
1320        ctx: &mut Context<'_>,
1321    ) -> Poll<Option<Self::Item>> {
1322        let this = &mut *self;
1323
1324        let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1325        closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1326        this.closing_handles = closing_handles;
1327
1328        let handles = &mut this.handles;
1329        let event_queue = &mut this.event_queue;
1330        for state in handles.values_mut() {
1331            state.poll(event_queue, ctx);
1332        }
1333
1334        if let Some(event) = self.event_queue.pop_front() {
1335            match event {
1336                UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1337                UnprocessedFDomainEvent::ChannelData(tid, message) => {
1338                    Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1339                }
1340                UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1341                    match self.process_message(message) {
1342                        Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1343                            proto::ChannelOnChannelStreamingDataRequest {
1344                                handle: hid,
1345                                channel_sent: proto::ChannelSent::Message(message),
1346                            },
1347                        ))),
1348                        Err(e) => {
1349                            self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1350                            Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1351                                proto::ChannelOnChannelStreamingDataRequest {
1352                                    handle: hid,
1353                                    channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1354                                        error: Some(Box::new(e)),
1355                                    }),
1356                                },
1357                            )))
1358                        }
1359                    }
1360                }
1361            }
1362        } else {
1363            self.waker = Some(ctx.waker().clone());
1364            Poll::Pending
1365        }
1366    }
1367}