fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use futures::channel::oneshot::Sender as OneshotSender;
7use futures::stream::Stream as StreamTrait;
8use futures::FutureExt;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{ready, Context, Poll, Waker};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29
30use responder::Responder;
31
32pub use channel::{
33    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, MessageBuf,
34};
35pub use event::Event;
36pub use event_pair::Eventpair as EventPair;
37pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
38pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
39pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
40
41// Unsupported handle types.
42#[rustfmt::skip]
43pub use Handle as Fifo;
44#[rustfmt::skip]
45pub use Handle as Job;
46#[rustfmt::skip]
47pub use Handle as Process;
48#[rustfmt::skip]
49pub use Handle as Resource;
50#[rustfmt::skip]
51pub use Handle as Stream;
52#[rustfmt::skip]
53pub use Handle as Thread;
54#[rustfmt::skip]
55pub use Handle as Vmar;
56#[rustfmt::skip]
57pub use Handle as Vmo;
58
59use proto::f_domain_ordinals as ordinals;
60
61fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62    match error {
63        FDomainError::TargetError(e) => write!(f, "Target-side error {e}"),
64        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
65            write!(f, "Tried to use invalid handle id {id}")
66        }
67        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
68            f,
69            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
70        ),
71        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
72            write!(f, "Handle is occupied delivering streaming reads")
73        }
74        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
75            write!(f, "No streaming read was in progress")
76        }
77        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
78            write!(f, "Tried to create a handle with id {id}, which is outside the valid range for client handles")
79        }
80        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
81            if *same_call {
82                write!(f, "Tried to create two or more new handles with the same id {id}")
83            } else {
84                write!(f, "Tried to create a new handle with id {id}, which is already the id of an existing handle")
85            }
86        }
87        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
88            write!(f, "Tried to write a channel into itself")
89        }
90        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
91            write!(f, "Handle closed while being read")
92        }
93        _ => todo!(),
94    }
95}
96
97/// Result type alias.
98pub type Result<T, E = Error> = std::result::Result<T, E>;
99
100/// Error type emitted by FDomain operations.
101#[derive(Clone)]
102pub enum Error {
103    SocketWrite(WriteSocketError),
104    ChannelWrite(WriteChannelError),
105    FDomain(FDomainError),
106    Protocol(::fidl::Error),
107    ProtocolObjectTypeIncompatible,
108    ProtocolRightsIncompatible,
109    ProtocolSignalsIncompatible,
110    ProtocolStreamEventIncompatible,
111    Transport(Arc<std::io::Error>),
112    ConnectionMismatch,
113    StreamingAborted,
114}
115
116impl std::fmt::Display for Error {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
120                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
121                write_fdomain_error(error, f)
122            }
123            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
124                write!(f, "While writing channel: ")?;
125                write_fdomain_error(error, f)
126            }
127            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
128                write!(f, "Couldn't write all handles into a channel:")?;
129                for (pos, error) in
130                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
131                {
132                    write!(f, "\n  Handle in position {pos}: ")?;
133                    write_fdomain_error(error, f)?;
134                }
135                Ok(())
136            }
137            Self::ProtocolObjectTypeIncompatible => {
138                write!(f, "The FDomain protocol does not recognize an object type")
139            }
140            Self::ProtocolRightsIncompatible => {
141                write!(f, "The FDomain protocol does not recognize some rights")
142            }
143            Self::ProtocolSignalsIncompatible => {
144                write!(f, "The FDomain protocol does not recognize some signals")
145            }
146            Self::ProtocolStreamEventIncompatible => {
147                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
148            }
149            Self::FDomain(e) => write_fdomain_error(e, f),
150            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
151            Self::Transport(e) => write!(f, "Transport error: {e:?}"),
152            Self::ConnectionMismatch => {
153                write!(f, "Tried to use an FDomain handle from a different connection")
154            }
155            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
156        }
157    }
158}
159
160impl std::fmt::Debug for Error {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
164            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
165            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
166            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
167            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
168            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
169            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
170            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
171            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
172            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
173            Self::StreamingAborted => write!(f, "StreamingAborted"),
174        }
175    }
176}
177
178impl std::error::Error for Error {}
179
180impl From<FDomainError> for Error {
181    fn from(other: FDomainError) -> Self {
182        Self::FDomain(other)
183    }
184}
185
186impl From<::fidl::Error> for Error {
187    fn from(other: ::fidl::Error) -> Self {
188        Self::Protocol(other)
189    }
190}
191
192impl From<WriteSocketError> for Error {
193    fn from(other: WriteSocketError) -> Self {
194        Self::SocketWrite(other)
195    }
196}
197
198impl From<WriteChannelError> for Error {
199    fn from(other: WriteChannelError) -> Self {
200        Self::ChannelWrite(other)
201    }
202}
203
204/// An error emitted internally by the client. Similar to [`Error`] but does not
205/// contain several variants which are irrelevant in the contexts where it is
206/// used.
207enum InnerError {
208    Protocol(::fidl::Error),
209    ProtocolStreamEventIncompatible,
210    Transport(Arc<std::io::Error>),
211}
212
213impl Clone for InnerError {
214    fn clone(&self) -> Self {
215        match self {
216            InnerError::Protocol(a) => InnerError::Protocol(a.clone()),
217            InnerError::ProtocolStreamEventIncompatible => {
218                InnerError::ProtocolStreamEventIncompatible
219            }
220            InnerError::Transport(a) => InnerError::Transport(Arc::clone(a)),
221        }
222    }
223}
224
225impl From<InnerError> for Error {
226    fn from(other: InnerError) -> Self {
227        match other {
228            InnerError::Protocol(p) => Error::Protocol(p),
229            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
230            InnerError::Transport(t) => Error::Transport(t),
231        }
232    }
233}
234
235impl From<::fidl::Error> for InnerError {
236    fn from(other: ::fidl::Error) -> Self {
237        InnerError::Protocol(other)
238    }
239}
240
241// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
242/// Implemented by objects which provide a transport over which we can speak the
243/// FDomain protocol.
244///
245/// The implementer must provide two things:
246/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
247///    via the `Stream` trait, which this trait requires.
248/// 2) A way to send messages. This is provided by implementing the
249///    `poll_send_message` method.
250pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
251    /// Attempt to send a message asynchronously. Messages should be sent so
252    /// that they arrive at the target in order.
253    fn poll_send_message(
254        self: Pin<&mut Self>,
255        msg: &[u8],
256        ctx: &mut Context<'_>,
257    ) -> Poll<Result<(), std::io::Error>>;
258}
259
260/// Wrapper for an `FDomainTransport` implementer that:
261/// 1) Provides a queue for outgoing messages so we need not have an await point
262///    when we submit a message.
263/// 2) Drops the transport on error, then returns the last observed error for
264///    all future operations.
265enum Transport {
266    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
267    Error(InnerError),
268}
269
270impl Transport {
271    /// Get the failure mode of the transport if it has failed.
272    fn error(&self) -> Option<InnerError> {
273        match self {
274            Transport::Transport(_, _, _) => None,
275            Transport::Error(inner_error) => Some(inner_error.clone()),
276        }
277    }
278
279    /// Enqueue a message to be sent on this transport.
280    fn push_msg(&mut self, msg: Box<[u8]>) {
281        if let Transport::Transport(_, v, w) = self {
282            v.push_back(msg);
283            w.drain(..).for_each(Waker::wake);
284        }
285    }
286
287    /// Push messages in the send queue out through the transport.
288    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
289        match self {
290            Transport::Error(e) => Poll::Ready(e.clone()),
291            Transport::Transport(t, v, w) => {
292                while let Some(msg) = v.front() {
293                    match t.as_mut().poll_send_message(msg, ctx) {
294                        Poll::Ready(Ok(())) => {
295                            v.pop_front();
296                        }
297                        Poll::Ready(Err(e)) => {
298                            let e = Arc::new(e);
299                            *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
300                            return Poll::Ready(InnerError::Transport(e));
301                        }
302                        Poll::Pending => return Poll::Pending,
303                    }
304                }
305
306                if v.is_empty() {
307                    w.push(ctx.waker().clone());
308                } else {
309                    ctx.waker().wake_by_ref();
310                }
311                Poll::Pending
312            }
313        }
314    }
315
316    /// Get the next incoming message from the transport.
317    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Option<Result<Box<[u8]>, InnerError>>> {
318        match self {
319            Transport::Error(e) => Poll::Ready(Some(Err(e.clone()))),
320            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
321                Some(Ok(x)) => Poll::Ready(Some(Ok(x))),
322                Some(Err(e)) => {
323                    let e = Arc::new(e);
324                    *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
325                    Poll::Ready(Some(Err(InnerError::Transport(e))))
326                }
327                Option::None => Poll::Ready(None),
328            },
329        }
330    }
331}
332
333/// State of a socket that is or has been read from.
334struct SocketReadState {
335    wakers: Vec<Waker>,
336    queued: VecDeque<Result<proto::SocketData, Error>>,
337    read_request_pending: bool,
338    is_streaming: bool,
339}
340
341impl SocketReadState {
342    /// Handle an incoming message, which is either a channel streaming event or
343    /// response to a `ChannelRead` request.
344    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
345        self.queued.push_back(msg);
346        self.wakers.drain(..).for_each(Waker::wake);
347    }
348}
349
350/// State of a channel that is or has been read from.
351struct ChannelReadState {
352    wakers: Vec<Waker>,
353    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
354    read_request_pending: bool,
355    is_streaming: bool,
356}
357
358impl ChannelReadState {
359    /// Handle an incoming message, which is either a channel streaming event or
360    /// response to a `ChannelRead` request.
361    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
362        self.queued.push_back(msg);
363        self.wakers.drain(..).for_each(Waker::wake);
364    }
365}
366
367/// Lock-protected interior of `Client`
368struct ClientInner {
369    transport: Transport,
370    transactions: HashMap<NonZeroU32, responder::Responder>,
371    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
372    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
373    next_tx_id: u32,
374    waiting_to_close: Vec<proto::HandleId>,
375    waiting_to_close_waker: Waker,
376}
377
378impl ClientInner {
379    /// Serialize and enqueue a new transaction, including header and transaction ID.
380    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
381        let tx_id = self.next_tx_id;
382
383        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
384        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
385        self.next_tx_id += 1;
386        assert!(
387            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
388            "Allocated same tx id twice!"
389        );
390        self.transport.push_msg(msg.into());
391    }
392
393    /// Polls the underlying transport to ensure any incoming or outgoing
394    /// messages are processed as far as possible. Errors if the transport has failed.
395    fn try_poll_transport(
396        &mut self,
397        ctx: &mut Context<'_>,
398    ) -> Poll<Result<Infallible, InnerError>> {
399        if !self.waiting_to_close.is_empty() {
400            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
401            // We've dropped the handle object. Nobody is going to wait to read
402            // the buffers anymore. This is a safe time to drop the read state.
403            for handle in &handles {
404                let _ = self.channel_read_states.remove(handle);
405                let _ = self.socket_read_states.remove(handle);
406            }
407            self.request(
408                ordinals::CLOSE,
409                proto::FDomainCloseRequest { handles },
410                Responder::Ignore,
411            );
412        }
413
414        self.waiting_to_close_waker = ctx.waker().clone();
415
416        loop {
417            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
418                for state in std::mem::take(&mut self.socket_read_states).into_values() {
419                    state.wakers.into_iter().for_each(Waker::wake);
420                }
421                for (_, state) in self.channel_read_states.drain() {
422                    state.wakers.into_iter().for_each(Waker::wake);
423                }
424                return Poll::Ready(Err(e));
425            }
426            let Poll::Ready(Some(result)) = self.transport.poll_next(ctx) else {
427                return Poll::Pending;
428            };
429            let data = result?;
430            let (header, data) = match fidl_message::decode_transaction_header(&data) {
431                Ok(x) => x,
432                Err(e) => {
433                    self.transport = Transport::Error(InnerError::Protocol(e));
434                    continue;
435                }
436            };
437
438            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
439                if let Err(e) = self.process_event(header, data) {
440                    self.transport = Transport::Error(e);
441                }
442                continue;
443            };
444
445            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
446            match tx.handle(self, Ok((header, data))) {
447                Ok(x) => x,
448                Err(e) => {
449                    self.transport = Transport::Error(InnerError::Protocol(e));
450                    continue;
451                }
452            }
453        }
454    }
455
456    /// Process an incoming message that arose from an event rather than a transaction reply.
457    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
458        match header.ordinal {
459            ordinals::ON_SOCKET_STREAMING_DATA => {
460                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
461                    header, data,
462                )?;
463                let o =
464                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
465                        wakers: Vec::new(),
466                        queued: VecDeque::new(),
467                        is_streaming: false,
468                        read_request_pending: false,
469                    });
470                match msg.socket_message {
471                    proto::SocketMessage::Data(data) => {
472                        o.handle_incoming_message(Ok(data));
473                        Ok(())
474                    }
475                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
476                        if let Some(error) = error {
477                            o.handle_incoming_message(Err(Error::FDomain(*error)));
478                        }
479                        o.is_streaming = false;
480                        Ok(())
481                    }
482                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
483                }
484            }
485            ordinals::ON_CHANNEL_STREAMING_DATA => {
486                let msg = fidl_message::decode_message::<
487                    proto::ChannelOnChannelStreamingDataRequest,
488                >(header, data)?;
489                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
490                    ChannelReadState {
491                        wakers: Vec::new(),
492                        queued: VecDeque::new(),
493                        is_streaming: false,
494                        read_request_pending: false,
495                    }
496                });
497                match msg.channel_sent {
498                    proto::ChannelSent::Message(data) => {
499                        o.handle_incoming_message(Ok(data));
500                        Ok(())
501                    }
502                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
503                        if let Some(error) = error {
504                            o.handle_incoming_message(Err(Error::FDomain(*error)));
505                        }
506                        o.is_streaming = false;
507                        Ok(())
508                    }
509                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
510                }
511            }
512            _ => Err(::fidl::Error::UnknownOrdinal {
513                ordinal: header.ordinal,
514                protocol_name:
515                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
516            }
517            .into()),
518        }
519    }
520
521    /// Polls the underlying transport to ensure any incoming or outgoing
522    /// messages are processed as far as possible. If a failure occurs, puts the
523    /// transport into an error state and fails all pending transactions.
524    fn poll_transport(&mut self, ctx: &mut Context<'_>) {
525        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
526            for (_, v) in std::mem::take(&mut self.transactions) {
527                let _ = v.handle(self, Err(e.clone()));
528            }
529        }
530    }
531
532    /// Handles the response to a `SocketRead` protocol message.
533    pub(crate) fn handle_socket_read_response(
534        &mut self,
535        msg: Result<proto::SocketData, Error>,
536        id: proto::HandleId,
537    ) {
538        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
539            wakers: Vec::new(),
540            queued: VecDeque::new(),
541            is_streaming: false,
542            read_request_pending: false,
543        });
544        state.handle_incoming_message(msg);
545        state.read_request_pending = false;
546    }
547
548    /// Handles the response to a `ChannelRead` protocol message.
549    pub(crate) fn handle_channel_read_response(
550        &mut self,
551        msg: Result<proto::ChannelMessage, Error>,
552        id: proto::HandleId,
553    ) {
554        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
555            wakers: Vec::new(),
556            queued: VecDeque::new(),
557            is_streaming: false,
558            read_request_pending: false,
559        });
560        state.handle_incoming_message(msg);
561        state.read_request_pending = false;
562    }
563}
564
565/// Represents a connection to an FDomain.
566///
567/// The client is constructed by passing it a transport object which represents
568/// the raw connection to the remote FDomain. The `Client` wrapper then allows
569/// us to construct and use handles which behave similarly to their counterparts
570/// on a Fuchsia device.
571pub struct Client(Mutex<ClientInner>);
572
573impl std::fmt::Debug for Client {
574    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575        f.debug_tuple("Client").field(&"...").finish()
576    }
577}
578
579/// A client which is always disconnected. Handles that lose their clients
580/// connect to this client instead, which always returns a "Client Lost"
581/// transport failure.
582pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
583    Arc::new(Client(Mutex::new(ClientInner {
584        transport: Transport::Error(InnerError::Transport(Arc::new(std::io::Error::other(
585            "Client Lost",
586        )))),
587        transactions: HashMap::new(),
588        channel_read_states: HashMap::new(),
589        socket_read_states: HashMap::new(),
590        next_tx_id: 1,
591        waiting_to_close: Vec::new(),
592        waiting_to_close_waker: futures::task::noop_waker(),
593    })))
594});
595
596impl Client {
597    /// Create a new FDomain client. The `transport` argument should contain the
598    /// established connection to the target, ready to communicate the FDomain
599    /// protocol.
600    ///
601    /// The second return item is a future that must be polled to keep
602    /// transactions running.
603    pub fn new(
604        transport: impl FDomainTransport + 'static,
605    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
606        let ret = Arc::new(Client(Mutex::new(ClientInner {
607            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
608            transactions: HashMap::new(),
609            socket_read_states: HashMap::new(),
610            channel_read_states: HashMap::new(),
611            next_tx_id: 1,
612            waiting_to_close: Vec::new(),
613            waiting_to_close_waker: futures::task::noop_waker(),
614        })));
615
616        let client_weak = Arc::downgrade(&ret);
617        let fut = futures::future::poll_fn(move |ctx| {
618            let Some(client) = client_weak.upgrade() else {
619                return Poll::Ready(());
620            };
621
622            client.0.lock().unwrap().poll_transport(ctx);
623            Poll::Pending
624        });
625
626        (ret, fut)
627    }
628
629    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
630    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
631        let new_handle = self.new_hid();
632        self.transaction(
633            ordinals::GET_NAMESPACE,
634            proto::FDomainGetNamespaceRequest { new_handle },
635            Responder::Namespace,
636        )
637        .await?;
638        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
639    }
640
641    /// Create a new channel in the connected FDomain.
642    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
643        let id_a = self.new_hid();
644        let id_b = self.new_hid();
645        let fut = self.transaction(
646            ordinals::CREATE_CHANNEL,
647            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
648            Responder::CreateChannel,
649        );
650
651        fuchsia_async::Task::spawn(async move {
652            if let Err(e) = fut.await {
653                log::debug!("FDomain channel creation failed: {e}");
654            }
655        })
656        .detach();
657
658        (
659            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
660            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
661        )
662    }
663
664    /// Creates client and server endpoints connected to by a channel.
665    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
666        self: &Arc<Self>,
667    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
668        let (client, server) = self.create_channel();
669        let client_end = crate::fidl::ClientEnd::<F>::new(client);
670        let server_end = crate::fidl::ServerEnd::new(server);
671        (client_end, server_end)
672    }
673
674    /// Creates a client proxy and a server endpoint connected by a channel.
675    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
676        self: &Arc<Self>,
677    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
678        let (client_end, server_end) = self.create_endpoints::<F>();
679        (client_end.into_proxy(), server_end)
680    }
681
682    /// Creates a client proxy and a server request stream connected by a channel.
683    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
684        self: &Arc<Self>,
685    ) -> (F::Proxy, F::RequestStream) {
686        let (client_end, server_end) = self.create_endpoints::<F>();
687        (client_end.into_proxy(), server_end.into_stream())
688    }
689
690    /// Create a new socket in the connected FDomain.
691    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
692        let id_a = self.new_hid();
693        let id_b = self.new_hid();
694        let fut = self.transaction(
695            ordinals::CREATE_SOCKET,
696            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
697            Responder::CreateSocket,
698        );
699
700        fuchsia_async::Task::spawn(async move {
701            if let Err(e) = fut.await {
702                log::debug!("FDomain socket creation failed: {e}");
703            }
704        })
705        .detach();
706
707        (
708            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
709            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
710        )
711    }
712
713    /// Create a new streaming socket in the connected FDomain.
714    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
715        self.create_socket(proto::SocketType::Stream)
716    }
717
718    /// Create a new datagram socket in the connected FDomain.
719    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
720        self.create_socket(proto::SocketType::Datagram)
721    }
722
723    /// Create a new event pair in the connected FDomain.
724    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
725        let id_a = self.new_hid();
726        let id_b = self.new_hid();
727        let fut = self.transaction(
728            ordinals::CREATE_EVENT_PAIR,
729            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
730            Responder::CreateEventPair,
731        );
732
733        fuchsia_async::Task::spawn(async move {
734            if let Err(e) = fut.await {
735                log::debug!("FDomain event pair creation failed: {e}");
736            }
737        })
738        .detach();
739
740        (
741            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
742            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
743        )
744    }
745
746    /// Create a new event handle in the connected FDomain.
747    pub fn create_event(self: &Arc<Self>) -> Event {
748        let id = self.new_hid();
749        let fut = self.transaction(
750            ordinals::CREATE_EVENT,
751            proto::EventCreateEventRequest { handle: id },
752            Responder::CreateEvent,
753        );
754
755        fuchsia_async::Task::spawn(async move {
756            if let Err(e) = fut.await {
757                log::debug!("FDomain event creation failed: {e}");
758            }
759        })
760        .detach();
761
762        Event(Handle { id: id.id, client: Arc::downgrade(self) })
763    }
764
765    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
766    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
767        // TODO: On the target side we have to keep a table of these which means
768        // we can automatically detect collisions in the random value. On the
769        // client side we'd have to add a whole data structure just for that
770        // purpose. Should we?
771        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
772    }
773
774    /// Create a future which sends a FIDL message to the connected FDomain and
775    /// waits for a response.
776    ///
777    /// Calling this method queues the transaction synchronously. Awaiting is
778    /// only necessary to wait for the response.
779    pub(crate) fn transaction<S: fidl_message::Body, R: 'static>(
780        self: &Arc<Self>,
781        ordinal: u64,
782        request: S,
783        f: impl Fn(OneshotSender<Result<R, Error>>) -> Responder,
784    ) -> impl Future<Output = Result<R, Error>> + 'static {
785        let mut inner = self.0.lock().unwrap();
786
787        let (sender, receiver) = futures::channel::oneshot::channel();
788        inner.request(ordinal, request, f(sender));
789        receiver.map(|x| x.expect("Oneshot went away without reply!"))
790    }
791
792    /// Start getting streaming events for socket reads.
793    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
794        let mut inner = self.0.lock().unwrap();
795        if let Some(e) = inner.transport.error() {
796            return Err(e.into());
797        }
798
799        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
800            wakers: Vec::new(),
801            queued: VecDeque::new(),
802            is_streaming: false,
803            read_request_pending: false,
804        });
805
806        assert!(!state.is_streaming, "Initiated streaming twice!");
807        state.is_streaming = true;
808
809        inner.request(
810            ordinals::READ_SOCKET_STREAMING_START,
811            proto::SocketReadSocketStreamingStartRequest { handle: id },
812            Responder::Ignore,
813        );
814        Ok(())
815    }
816
817    /// Stop getting streaming events for socket reads. Doesn't return errors
818    /// because it's exclusively called in destructors where we have nothing to
819    /// do with them.
820    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
821        let mut inner = self.0.lock().unwrap();
822        if let Some(state) = inner.socket_read_states.get_mut(&id) {
823            if state.is_streaming {
824                state.is_streaming = false;
825                // TODO: Log?
826                let _ = inner.request(
827                    ordinals::READ_SOCKET_STREAMING_STOP,
828                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
829                    Responder::Ignore,
830                );
831            }
832        }
833    }
834
835    /// Start getting streaming events for socket reads.
836    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
837        let mut inner = self.0.lock().unwrap();
838        if let Some(e) = inner.transport.error() {
839            return Err(e.into());
840        }
841        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
842            wakers: Vec::new(),
843            queued: VecDeque::new(),
844            is_streaming: false,
845            read_request_pending: false,
846        });
847
848        assert!(!state.is_streaming, "Initiated streaming twice!");
849        state.is_streaming = true;
850
851        inner.request(
852            ordinals::READ_CHANNEL_STREAMING_START,
853            proto::ChannelReadChannelStreamingStartRequest { handle: id },
854            Responder::Ignore,
855        );
856
857        Ok(())
858    }
859
860    /// Stop getting streaming events for socket reads. Doesn't return errors
861    /// because it's exclusively called in destructors where we have nothing to
862    /// do with them.
863    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
864        let mut inner = self.0.lock().unwrap();
865        if let Some(state) = inner.channel_read_states.get_mut(&id) {
866            if state.is_streaming {
867                state.is_streaming = false;
868                // TODO: Log?
869                let _ = inner.request(
870                    ordinals::READ_CHANNEL_STREAMING_STOP,
871                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
872                    Responder::Ignore,
873                );
874            }
875        }
876    }
877
878    /// Execute a read from a channel.
879    pub(crate) fn poll_socket(
880        &self,
881        id: proto::HandleId,
882        ctx: &mut Context<'_>,
883        out: &mut [u8],
884    ) -> Poll<Result<usize, Error>> {
885        let mut inner = self.0.lock().unwrap();
886        if let Some(error) = inner.transport.error() {
887            return Poll::Ready(Err(error.into()));
888        }
889
890        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
891            wakers: Vec::new(),
892            queued: VecDeque::new(),
893            is_streaming: false,
894            read_request_pending: false,
895        });
896
897        if let Some(got) = state.queued.front_mut() {
898            match got.as_mut() {
899                Ok(data) => {
900                    let read_size = std::cmp::min(data.data.len(), out.len());
901                    out[..read_size].copy_from_slice(&data.data[..read_size]);
902
903                    if data.data.len() > read_size && !data.is_datagram {
904                        let _ = data.data.drain(..read_size);
905                    } else {
906                        let _ = state.queued.pop_front();
907                    }
908
909                    return Poll::Ready(Ok(read_size));
910                }
911                Err(_) => {
912                    let err = state.queued.pop_front().unwrap().unwrap_err();
913                    return Poll::Ready(Err(err));
914                }
915            }
916        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
917            state.wakers.push(ctx.waker().clone());
918        }
919
920        if !state.read_request_pending && !state.is_streaming {
921            inner.request(
922                ordinals::READ_SOCKET,
923                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
924                Responder::ReadSocket(id),
925            );
926        }
927
928        Poll::Pending
929    }
930
931    /// Execute a read from a channel.
932    pub(crate) fn poll_channel(
933        &self,
934        id: proto::HandleId,
935        ctx: &mut Context<'_>,
936        for_stream: bool,
937    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
938        let mut inner = self.0.lock().unwrap();
939        if let Some(error) = inner.transport.error() {
940            return Poll::Ready(Some(Err(error.into())));
941        }
942
943        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
944            wakers: Vec::new(),
945            queued: VecDeque::new(),
946            is_streaming: false,
947            read_request_pending: false,
948        });
949
950        if let Some(got) = state.queued.pop_front() {
951            return Poll::Ready(Some(got));
952        } else if for_stream && !state.is_streaming {
953            return Poll::Ready(None);
954        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
955            state.wakers.push(ctx.waker().clone());
956        }
957
958        if !state.read_request_pending && !state.is_streaming {
959            inner.request(
960                ordinals::READ_CHANNEL,
961                proto::ChannelReadChannelRequest { handle: id },
962                Responder::ReadChannel(id),
963            );
964        }
965
966        Poll::Pending
967    }
968
969    /// Check whether this channel is streaming
970    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
971        let inner = self.0.lock().unwrap();
972        let Some(state) = inner.channel_read_states.get(&id) else {
973            return false;
974        };
975        state.is_streaming
976    }
977
978    /// Check that all the given handles are safe to transfer through a channel
979    /// e.g. that there's no chance of in-flight reads getting dropped.
980    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
981        let inner = self.0.lock().unwrap();
982        match handles {
983            proto::Handles::Handles(handles) => {
984                for handle in handles {
985                    assert!(
986                        !(inner.channel_read_states.contains_key(handle)
987                            || inner.socket_read_states.contains_key(handle)),
988                        "Tried to transfer handle after reading"
989                    );
990                }
991            }
992            proto::Handles::Dispositions(dispositions) => {
993                for disposition in dispositions {
994                    match &disposition.handle {
995                        proto::HandleOp::Move_(handle) => assert!(
996                            !(inner.channel_read_states.contains_key(handle)
997                                || inner.socket_read_states.contains_key(handle)),
998                            "Tried to transfer handle after reading"
999                        ),
1000                        // Pretty sure this should be fine regardless of read state.
1001                        proto::HandleOp::Duplicate(_) => (),
1002                    }
1003                }
1004            }
1005        }
1006    }
1007}