fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use futures::FutureExt;
7use futures::channel::oneshot::Sender as OneshotSender;
8use futures::stream::Stream as StreamTrait;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{Context, Poll, Waker, ready};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29pub mod fidl_next;
30
31use responder::Responder;
32
33pub use channel::{
34    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
35};
36pub use event::Event;
37pub use event_pair::Eventpair as EventPair;
38pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
39pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
40pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
41
42// Unsupported handle types.
43#[rustfmt::skip]
44pub use Handle as Fifo;
45#[rustfmt::skip]
46pub use Handle as Job;
47#[rustfmt::skip]
48pub use Handle as Process;
49#[rustfmt::skip]
50pub use Handle as Resource;
51#[rustfmt::skip]
52pub use Handle as Stream;
53#[rustfmt::skip]
54pub use Handle as Thread;
55#[rustfmt::skip]
56pub use Handle as Vmar;
57#[rustfmt::skip]
58pub use Handle as Vmo;
59
60use proto::f_domain_ordinals as ordinals;
61
62fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63    match error {
64        FDomainError::TargetError(e) => {
65            let e = zx_status::Status::from_raw(*e);
66            write!(f, "Target-side error {e}")
67        }
68        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
69            write!(f, "Tried to use invalid handle id {id}")
70        }
71        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
72            f,
73            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
74        ),
75        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
76            write!(f, "Handle is occupied delivering streaming reads")
77        }
78        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
79            write!(f, "No streaming read was in progress")
80        }
81        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
82            write!(
83                f,
84                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
85            )
86        }
87        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
88            if *same_call {
89                write!(f, "Tried to create two or more new handles with the same id {id}")
90            } else {
91                write!(
92                    f,
93                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
94                )
95            }
96        }
97        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
98            write!(f, "Tried to write a channel into itself")
99        }
100        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
101            write!(f, "Handle closed while being read")
102        }
103        _ => todo!(),
104    }
105}
106
107/// Result type alias.
108pub type Result<T, E = Error> = std::result::Result<T, E>;
109
110/// Error type emitted by FDomain operations.
111#[derive(Clone)]
112pub enum Error {
113    SocketWrite(WriteSocketError),
114    ChannelWrite(WriteChannelError),
115    FDomain(FDomainError),
116    Protocol(::fidl::Error),
117    ProtocolObjectTypeIncompatible,
118    ProtocolRightsIncompatible,
119    ProtocolSignalsIncompatible,
120    ProtocolStreamEventIncompatible,
121    Transport(Option<Arc<std::io::Error>>),
122    ConnectionMismatch,
123    StreamingAborted,
124}
125
126impl std::fmt::Display for Error {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
130                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
131                write_fdomain_error(error, f)
132            }
133            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
134                write!(f, "While writing channel: ")?;
135                write_fdomain_error(error, f)
136            }
137            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
138                write!(f, "Couldn't write all handles into a channel:")?;
139                for (pos, error) in
140                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
141                {
142                    write!(f, "\n  Handle in position {pos}: ")?;
143                    write_fdomain_error(error, f)?;
144                }
145                Ok(())
146            }
147            Self::ProtocolObjectTypeIncompatible => {
148                write!(f, "The FDomain protocol does not recognize an object type")
149            }
150            Self::ProtocolRightsIncompatible => {
151                write!(f, "The FDomain protocol does not recognize some rights")
152            }
153            Self::ProtocolSignalsIncompatible => {
154                write!(f, "The FDomain protocol does not recognize some signals")
155            }
156            Self::ProtocolStreamEventIncompatible => {
157                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
158            }
159            Self::FDomain(e) => write_fdomain_error(e, f),
160            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
161            Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
162            Self::Transport(None) => write!(f, "Transport closed"),
163            Self::ConnectionMismatch => {
164                write!(f, "Tried to use an FDomain handle from a different connection")
165            }
166            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
167        }
168    }
169}
170
171impl std::fmt::Debug for Error {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        match self {
174            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
175            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
176            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
177            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
178            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
179            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
180            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
181            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
182            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
183            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
184            Self::StreamingAborted => write!(f, "StreamingAborted"),
185        }
186    }
187}
188
189impl std::error::Error for Error {}
190
191impl From<FDomainError> for Error {
192    fn from(other: FDomainError) -> Self {
193        Self::FDomain(other)
194    }
195}
196
197impl From<::fidl::Error> for Error {
198    fn from(other: ::fidl::Error) -> Self {
199        Self::Protocol(other)
200    }
201}
202
203impl From<WriteSocketError> for Error {
204    fn from(other: WriteSocketError) -> Self {
205        Self::SocketWrite(other)
206    }
207}
208
209impl From<WriteChannelError> for Error {
210    fn from(other: WriteChannelError) -> Self {
211        Self::ChannelWrite(other)
212    }
213}
214
215/// An error emitted internally by the client. Similar to [`Error`] but does not
216/// contain several variants which are irrelevant in the contexts where it is
217/// used.
218#[derive(Clone)]
219enum InnerError {
220    Protocol(::fidl::Error),
221    ProtocolStreamEventIncompatible,
222    Transport(Option<Arc<std::io::Error>>),
223}
224
225impl From<InnerError> for Error {
226    fn from(other: InnerError) -> Self {
227        match other {
228            InnerError::Protocol(p) => Error::Protocol(p),
229            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
230            InnerError::Transport(t) => Error::Transport(t),
231        }
232    }
233}
234
235impl From<::fidl::Error> for InnerError {
236    fn from(other: ::fidl::Error) -> Self {
237        InnerError::Protocol(other)
238    }
239}
240
241// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
242/// Implemented by objects which provide a transport over which we can speak the
243/// FDomain protocol.
244///
245/// The implementer must provide two things:
246/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
247///    via the `Stream` trait, which this trait requires.
248/// 2) A way to send messages. This is provided by implementing the
249///    `poll_send_message` method.
250pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
251    /// Attempt to send a message asynchronously. Messages should be sent so
252    /// that they arrive at the target in order.
253    fn poll_send_message(
254        self: Pin<&mut Self>,
255        msg: &[u8],
256        ctx: &mut Context<'_>,
257    ) -> Poll<Result<(), Option<std::io::Error>>>;
258}
259
260/// Wrapper for an `FDomainTransport` implementer that:
261/// 1) Provides a queue for outgoing messages so we need not have an await point
262///    when we submit a message.
263/// 2) Drops the transport on error, then returns the last observed error for
264///    all future operations.
265enum Transport {
266    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
267    Error(InnerError),
268}
269
270impl Transport {
271    /// Get the failure mode of the transport if it has failed.
272    fn error(&self) -> Option<InnerError> {
273        match self {
274            Transport::Transport(_, _, _) => None,
275            Transport::Error(inner_error) => Some(inner_error.clone()),
276        }
277    }
278
279    /// Enqueue a message to be sent on this transport.
280    fn push_msg(&mut self, msg: Box<[u8]>) {
281        if let Transport::Transport(_, v, w) = self {
282            v.push_back(msg);
283            w.drain(..).for_each(Waker::wake);
284        }
285    }
286
287    /// Push messages in the send queue out through the transport.
288    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
289        match self {
290            Transport::Error(e) => Poll::Ready(e.clone()),
291            Transport::Transport(t, v, w) => {
292                while let Some(msg) = v.front() {
293                    match t.as_mut().poll_send_message(msg, ctx) {
294                        Poll::Ready(Ok(())) => {
295                            v.pop_front();
296                        }
297                        Poll::Ready(Err(e)) => {
298                            let e = e.map(Arc::new);
299                            *self = Transport::Error(InnerError::Transport(e.clone()));
300                            return Poll::Ready(InnerError::Transport(e));
301                        }
302                        Poll::Pending => return Poll::Pending,
303                    }
304                }
305
306                if v.is_empty() {
307                    w.push(ctx.waker().clone());
308                } else {
309                    ctx.waker().wake_by_ref();
310                }
311                Poll::Pending
312            }
313        }
314    }
315
316    /// Get the next incoming message from the transport.
317    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
318        match self {
319            Transport::Error(e) => Poll::Ready(Err(e.clone())),
320            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
321                Some(Ok(x)) => Poll::Ready(Ok(x)),
322                Some(Err(e)) => {
323                    let e = Arc::new(e);
324                    *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
325                    Poll::Ready(Err(InnerError::Transport(Some(e))))
326                }
327                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
328            },
329        }
330    }
331}
332
333/// State of a socket that is or has been read from.
334struct SocketReadState {
335    wakers: Vec<Waker>,
336    queued: VecDeque<Result<proto::SocketData, Error>>,
337    read_request_pending: bool,
338    is_streaming: bool,
339}
340
341impl SocketReadState {
342    /// Handle an incoming message, which is either a channel streaming event or
343    /// response to a `ChannelRead` request.
344    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
345        self.queued.push_back(msg);
346        self.wakers.drain(..).for_each(Waker::wake);
347    }
348}
349
350/// State of a channel that is or has been read from.
351struct ChannelReadState {
352    wakers: Vec<Waker>,
353    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
354    read_request_pending: bool,
355    is_streaming: bool,
356}
357
358impl ChannelReadState {
359    /// Handle an incoming message, which is either a channel streaming event or
360    /// response to a `ChannelRead` request.
361    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
362        self.queued.push_back(msg);
363        self.wakers.drain(..).for_each(Waker::wake);
364    }
365}
366
367/// Lock-protected interior of `Client`
368struct ClientInner {
369    transport: Transport,
370    transactions: HashMap<NonZeroU32, responder::Responder>,
371    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
372    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
373    next_tx_id: u32,
374    waiting_to_close: Vec<proto::HandleId>,
375    waiting_to_close_waker: Waker,
376}
377
378impl ClientInner {
379    /// Serialize and enqueue a new transaction, including header and transaction ID.
380    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
381        let tx_id = self.next_tx_id;
382
383        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
384        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
385        self.next_tx_id += 1;
386        assert!(
387            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
388            "Allocated same tx id twice!"
389        );
390        self.transport.push_msg(msg.into());
391    }
392
393    /// Polls the underlying transport to ensure any incoming or outgoing
394    /// messages are processed as far as possible. Errors if the transport has failed.
395    fn try_poll_transport(
396        &mut self,
397        ctx: &mut Context<'_>,
398    ) -> Poll<Result<Infallible, InnerError>> {
399        if !self.waiting_to_close.is_empty() {
400            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
401            // We've dropped the handle object. Nobody is going to wait to read
402            // the buffers anymore. This is a safe time to drop the read state.
403            for handle in &handles {
404                let _ = self.channel_read_states.remove(handle);
405                let _ = self.socket_read_states.remove(handle);
406            }
407            self.request(
408                ordinals::CLOSE,
409                proto::FDomainCloseRequest { handles },
410                Responder::Ignore,
411            );
412        }
413
414        self.waiting_to_close_waker = ctx.waker().clone();
415
416        loop {
417            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
418                for state in std::mem::take(&mut self.socket_read_states).into_values() {
419                    state.wakers.into_iter().for_each(Waker::wake);
420                }
421                for (_, state) in self.channel_read_states.drain() {
422                    state.wakers.into_iter().for_each(Waker::wake);
423                }
424                return Poll::Ready(Err(e));
425            }
426            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
427                return Poll::Pending;
428            };
429            let data = result?;
430            let (header, data) = match fidl_message::decode_transaction_header(&data) {
431                Ok(x) => x,
432                Err(e) => {
433                    self.transport = Transport::Error(InnerError::Protocol(e));
434                    continue;
435                }
436            };
437
438            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
439                if let Err(e) = self.process_event(header, data) {
440                    self.transport = Transport::Error(e);
441                }
442                continue;
443            };
444
445            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
446            match tx.handle(self, Ok((header, data))) {
447                Ok(x) => x,
448                Err(e) => {
449                    self.transport = Transport::Error(InnerError::Protocol(e));
450                    continue;
451                }
452            }
453        }
454    }
455
456    /// Process an incoming message that arose from an event rather than a transaction reply.
457    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
458        match header.ordinal {
459            ordinals::ON_SOCKET_STREAMING_DATA => {
460                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
461                    header, data,
462                )?;
463                let o =
464                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
465                        wakers: Vec::new(),
466                        queued: VecDeque::new(),
467                        is_streaming: false,
468                        read_request_pending: false,
469                    });
470                match msg.socket_message {
471                    proto::SocketMessage::Data(data) => {
472                        o.handle_incoming_message(Ok(data));
473                        Ok(())
474                    }
475                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
476                        if let Some(error) = error {
477                            o.handle_incoming_message(Err(Error::FDomain(*error)));
478                        }
479                        o.is_streaming = false;
480                        Ok(())
481                    }
482                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
483                }
484            }
485            ordinals::ON_CHANNEL_STREAMING_DATA => {
486                let msg = fidl_message::decode_message::<
487                    proto::ChannelOnChannelStreamingDataRequest,
488                >(header, data)?;
489                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
490                    ChannelReadState {
491                        wakers: Vec::new(),
492                        queued: VecDeque::new(),
493                        is_streaming: false,
494                        read_request_pending: false,
495                    }
496                });
497                match msg.channel_sent {
498                    proto::ChannelSent::Message(data) => {
499                        o.handle_incoming_message(Ok(data));
500                        Ok(())
501                    }
502                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
503                        if let Some(error) = error {
504                            o.handle_incoming_message(Err(Error::FDomain(*error)));
505                        }
506                        o.is_streaming = false;
507                        Ok(())
508                    }
509                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
510                }
511            }
512            _ => Err(::fidl::Error::UnknownOrdinal {
513                ordinal: header.ordinal,
514                protocol_name:
515                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
516            }
517            .into()),
518        }
519    }
520
521    /// Polls the underlying transport to ensure any incoming or outgoing
522    /// messages are processed as far as possible. If a failure occurs, puts the
523    /// transport into an error state and fails all pending transactions.
524    fn poll_transport(&mut self, ctx: &mut Context<'_>) {
525        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
526            for (_, v) in std::mem::take(&mut self.transactions) {
527                let _ = v.handle(self, Err(e.clone()));
528            }
529        }
530    }
531
532    /// Handles the response to a `SocketRead` protocol message.
533    pub(crate) fn handle_socket_read_response(
534        &mut self,
535        msg: Result<proto::SocketData, Error>,
536        id: proto::HandleId,
537    ) {
538        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
539            wakers: Vec::new(),
540            queued: VecDeque::new(),
541            is_streaming: false,
542            read_request_pending: false,
543        });
544        state.handle_incoming_message(msg);
545        state.read_request_pending = false;
546    }
547
548    /// Handles the response to a `ChannelRead` protocol message.
549    pub(crate) fn handle_channel_read_response(
550        &mut self,
551        msg: Result<proto::ChannelMessage, Error>,
552        id: proto::HandleId,
553    ) {
554        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
555            wakers: Vec::new(),
556            queued: VecDeque::new(),
557            is_streaming: false,
558            read_request_pending: false,
559        });
560        state.handle_incoming_message(msg);
561        state.read_request_pending = false;
562    }
563}
564
565impl Drop for ClientInner {
566    fn drop(&mut self) {
567        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
568        for responder in responders {
569            let _ = responder.handle(self, Err(InnerError::Transport(None)));
570        }
571        for state in self.channel_read_states.values_mut() {
572            state.wakers.drain(..).for_each(Waker::wake);
573        }
574        for state in self.socket_read_states.values_mut() {
575            state.wakers.drain(..).for_each(Waker::wake);
576        }
577    }
578}
579
580/// Represents a connection to an FDomain.
581///
582/// The client is constructed by passing it a transport object which represents
583/// the raw connection to the remote FDomain. The `Client` wrapper then allows
584/// us to construct and use handles which behave similarly to their counterparts
585/// on a Fuchsia device.
586pub struct Client(pub(crate) Mutex<ClientInner>);
587
588impl std::fmt::Debug for Client {
589    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590        f.debug_tuple("Client").field(&"...").finish()
591    }
592}
593
594/// A client which is always disconnected. Handles that lose their clients
595/// connect to this client instead, which always returns a "Client Lost"
596/// transport failure.
597pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
598    Arc::new(Client(Mutex::new(ClientInner {
599        transport: Transport::Error(InnerError::Transport(None)),
600        transactions: HashMap::new(),
601        channel_read_states: HashMap::new(),
602        socket_read_states: HashMap::new(),
603        next_tx_id: 1,
604        waiting_to_close: Vec::new(),
605        waiting_to_close_waker: futures::task::noop_waker(),
606    })))
607});
608
609impl Client {
610    /// Create a new FDomain client. The `transport` argument should contain the
611    /// established connection to the target, ready to communicate the FDomain
612    /// protocol.
613    ///
614    /// The second return item is a future that must be polled to keep
615    /// transactions running.
616    pub fn new(
617        transport: impl FDomainTransport + 'static,
618    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
619        let ret = Arc::new(Client(Mutex::new(ClientInner {
620            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
621            transactions: HashMap::new(),
622            socket_read_states: HashMap::new(),
623            channel_read_states: HashMap::new(),
624            next_tx_id: 1,
625            waiting_to_close: Vec::new(),
626            waiting_to_close_waker: futures::task::noop_waker(),
627        })));
628
629        let client_weak = Arc::downgrade(&ret);
630        let fut = futures::future::poll_fn(move |ctx| {
631            let Some(client) = client_weak.upgrade() else {
632                return Poll::Ready(());
633            };
634
635            client.0.lock().unwrap().poll_transport(ctx);
636            Poll::Pending
637        });
638
639        (ret, fut)
640    }
641
642    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
643    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
644        let new_handle = self.new_hid();
645        self.transaction(
646            ordinals::GET_NAMESPACE,
647            proto::FDomainGetNamespaceRequest { new_handle },
648            Responder::Namespace,
649        )
650        .await?;
651        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
652    }
653
654    /// Create a new channel in the connected FDomain.
655    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
656        let id_a = self.new_hid();
657        let id_b = self.new_hid();
658        let fut = self.transaction(
659            ordinals::CREATE_CHANNEL,
660            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
661            Responder::CreateChannel,
662        );
663
664        fuchsia_async::Task::spawn(async move {
665            if let Err(e) = fut.await {
666                log::debug!("FDomain channel creation failed: {e}");
667            }
668        })
669        .detach();
670
671        (
672            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
673            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
674        )
675    }
676
677    /// Creates client and server endpoints connected to by a channel.
678    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
679        self: &Arc<Self>,
680    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
681        let (client, server) = self.create_channel();
682        let client_end = crate::fidl::ClientEnd::<F>::new(client);
683        let server_end = crate::fidl::ServerEnd::new(server);
684        (client_end, server_end)
685    }
686
687    /// Creates a client proxy and a server endpoint connected by a channel.
688    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
689        self: &Arc<Self>,
690    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
691        let (client_end, server_end) = self.create_endpoints::<F>();
692        (client_end.into_proxy(), server_end)
693    }
694
695    /// Creates a client proxy and a server request stream connected by a channel.
696    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
697        self: &Arc<Self>,
698    ) -> (F::Proxy, F::RequestStream) {
699        let (client_end, server_end) = self.create_endpoints::<F>();
700        (client_end.into_proxy(), server_end.into_stream())
701    }
702
703    /// Create a new socket in the connected FDomain.
704    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
705        let id_a = self.new_hid();
706        let id_b = self.new_hid();
707        let fut = self.transaction(
708            ordinals::CREATE_SOCKET,
709            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
710            Responder::CreateSocket,
711        );
712
713        fuchsia_async::Task::spawn(async move {
714            if let Err(e) = fut.await {
715                log::debug!("FDomain socket creation failed: {e}");
716            }
717        })
718        .detach();
719
720        (
721            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
722            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
723        )
724    }
725
726    /// Create a new streaming socket in the connected FDomain.
727    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
728        self.create_socket(proto::SocketType::Stream)
729    }
730
731    /// Create a new datagram socket in the connected FDomain.
732    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
733        self.create_socket(proto::SocketType::Datagram)
734    }
735
736    /// Create a new event pair in the connected FDomain.
737    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
738        let id_a = self.new_hid();
739        let id_b = self.new_hid();
740        let fut = self.transaction(
741            ordinals::CREATE_EVENT_PAIR,
742            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
743            Responder::CreateEventPair,
744        );
745
746        fuchsia_async::Task::spawn(async move {
747            if let Err(e) = fut.await {
748                log::debug!("FDomain event pair creation failed: {e}");
749            }
750        })
751        .detach();
752
753        (
754            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
755            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
756        )
757    }
758
759    /// Create a new event handle in the connected FDomain.
760    pub fn create_event(self: &Arc<Self>) -> Event {
761        let id = self.new_hid();
762        let fut = self.transaction(
763            ordinals::CREATE_EVENT,
764            proto::EventCreateEventRequest { handle: id },
765            Responder::CreateEvent,
766        );
767
768        fuchsia_async::Task::spawn(async move {
769            if let Err(e) = fut.await {
770                log::debug!("FDomain event creation failed: {e}");
771            }
772        })
773        .detach();
774
775        Event(Handle { id: id.id, client: Arc::downgrade(self) })
776    }
777
778    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
779    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
780        // TODO: On the target side we have to keep a table of these which means
781        // we can automatically detect collisions in the random value. On the
782        // client side we'd have to add a whole data structure just for that
783        // purpose. Should we?
784        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
785    }
786
787    /// Create a future which sends a FIDL message to the connected FDomain and
788    /// waits for a response.
789    ///
790    /// Calling this method queues the transaction synchronously. Awaiting is
791    /// only necessary to wait for the response.
792    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
793        self: &Arc<Self>,
794        ordinal: u64,
795        request: S,
796        f: F,
797    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
798    where
799        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
800    {
801        let mut inner = self.0.lock().unwrap();
802
803        let (sender, receiver) = futures::channel::oneshot::channel();
804        inner.request(ordinal, request, f(sender));
805        receiver.map(|x| x.expect("Oneshot went away without reply!"))
806    }
807
808    /// Start getting streaming events for socket reads.
809    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
810        let mut inner = self.0.lock().unwrap();
811        if let Some(e) = inner.transport.error() {
812            return Err(e.into());
813        }
814
815        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
816            wakers: Vec::new(),
817            queued: VecDeque::new(),
818            is_streaming: false,
819            read_request_pending: false,
820        });
821
822        assert!(!state.is_streaming, "Initiated streaming twice!");
823        state.is_streaming = true;
824
825        inner.request(
826            ordinals::READ_SOCKET_STREAMING_START,
827            proto::SocketReadSocketStreamingStartRequest { handle: id },
828            Responder::Ignore,
829        );
830        Ok(())
831    }
832
833    /// Stop getting streaming events for socket reads. Doesn't return errors
834    /// because it's exclusively called in destructors where we have nothing to
835    /// do with them.
836    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
837        let mut inner = self.0.lock().unwrap();
838        if let Some(state) = inner.socket_read_states.get_mut(&id) {
839            if state.is_streaming {
840                state.is_streaming = false;
841                // TODO: Log?
842                let _ = inner.request(
843                    ordinals::READ_SOCKET_STREAMING_STOP,
844                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
845                    Responder::Ignore,
846                );
847            }
848        }
849    }
850
851    /// Start getting streaming events for socket reads.
852    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
853        let mut inner = self.0.lock().unwrap();
854        if let Some(e) = inner.transport.error() {
855            return Err(e.into());
856        }
857        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
858            wakers: Vec::new(),
859            queued: VecDeque::new(),
860            is_streaming: false,
861            read_request_pending: false,
862        });
863
864        assert!(!state.is_streaming, "Initiated streaming twice!");
865        state.is_streaming = true;
866
867        inner.request(
868            ordinals::READ_CHANNEL_STREAMING_START,
869            proto::ChannelReadChannelStreamingStartRequest { handle: id },
870            Responder::Ignore,
871        );
872
873        Ok(())
874    }
875
876    /// Stop getting streaming events for socket reads. Doesn't return errors
877    /// because it's exclusively called in destructors where we have nothing to
878    /// do with them.
879    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
880        let mut inner = self.0.lock().unwrap();
881        if let Some(state) = inner.channel_read_states.get_mut(&id) {
882            if state.is_streaming {
883                state.is_streaming = false;
884                // TODO: Log?
885                let _ = inner.request(
886                    ordinals::READ_CHANNEL_STREAMING_STOP,
887                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
888                    Responder::Ignore,
889                );
890            }
891        }
892    }
893
894    /// Execute a read from a channel.
895    pub(crate) fn poll_socket(
896        &self,
897        id: proto::HandleId,
898        ctx: &mut Context<'_>,
899        out: &mut [u8],
900    ) -> Poll<Result<usize, Error>> {
901        let mut inner = self.0.lock().unwrap();
902        if let Some(error) = inner.transport.error() {
903            return Poll::Ready(Err(error.into()));
904        }
905
906        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
907            wakers: Vec::new(),
908            queued: VecDeque::new(),
909            is_streaming: false,
910            read_request_pending: false,
911        });
912
913        if let Some(got) = state.queued.front_mut() {
914            match got.as_mut() {
915                Ok(data) => {
916                    let read_size = std::cmp::min(data.data.len(), out.len());
917                    out[..read_size].copy_from_slice(&data.data[..read_size]);
918
919                    if data.data.len() > read_size && !data.is_datagram {
920                        let _ = data.data.drain(..read_size);
921                    } else {
922                        let _ = state.queued.pop_front();
923                    }
924
925                    return Poll::Ready(Ok(read_size));
926                }
927                Err(_) => {
928                    let err = state.queued.pop_front().unwrap().unwrap_err();
929                    return Poll::Ready(Err(err));
930                }
931            }
932        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
933            state.wakers.push(ctx.waker().clone());
934        }
935
936        if !state.read_request_pending && !state.is_streaming {
937            inner.request(
938                ordinals::READ_SOCKET,
939                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
940                Responder::ReadSocket(id),
941            );
942        }
943
944        Poll::Pending
945    }
946
947    /// Execute a read from a channel.
948    pub(crate) fn poll_channel(
949        &self,
950        id: proto::HandleId,
951        ctx: &mut Context<'_>,
952        for_stream: bool,
953    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
954        let mut inner = self.0.lock().unwrap();
955        if let Some(error) = inner.transport.error() {
956            return Poll::Ready(Some(Err(error.into())));
957        }
958
959        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
960            wakers: Vec::new(),
961            queued: VecDeque::new(),
962            is_streaming: false,
963            read_request_pending: false,
964        });
965
966        if let Some(got) = state.queued.pop_front() {
967            return Poll::Ready(Some(got));
968        } else if for_stream && !state.is_streaming {
969            return Poll::Ready(None);
970        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
971            state.wakers.push(ctx.waker().clone());
972        }
973
974        if !state.read_request_pending && !state.is_streaming {
975            inner.request(
976                ordinals::READ_CHANNEL,
977                proto::ChannelReadChannelRequest { handle: id },
978                Responder::ReadChannel(id),
979            );
980        }
981
982        Poll::Pending
983    }
984
985    /// Check whether this channel is streaming
986    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
987        let inner = self.0.lock().unwrap();
988        let Some(state) = inner.channel_read_states.get(&id) else {
989            return false;
990        };
991        state.is_streaming
992    }
993
994    /// Check that all the given handles are safe to transfer through a channel
995    /// e.g. that there's no chance of in-flight reads getting dropped.
996    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
997        let inner = self.0.lock().unwrap();
998        match handles {
999            proto::Handles::Handles(handles) => {
1000                for handle in handles {
1001                    assert!(
1002                        !(inner.channel_read_states.contains_key(handle)
1003                            || inner.socket_read_states.contains_key(handle)),
1004                        "Tried to transfer handle after reading"
1005                    );
1006                }
1007            }
1008            proto::Handles::Dispositions(dispositions) => {
1009                for disposition in dispositions {
1010                    match &disposition.handle {
1011                        proto::HandleOp::Move_(handle) => assert!(
1012                            !(inner.channel_read_states.contains_key(handle)
1013                                || inner.socket_read_states.contains_key(handle)),
1014                            "Tried to transfer handle after reading"
1015                        ),
1016                        // Pretty sure this should be fine regardless of read state.
1017                        proto::HandleOp::Duplicate(_) => (),
1018                    }
1019                }
1020            }
1021        }
1022    }
1023}