Skip to main content

fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46// Unsupported handle types.
47#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Fifo;
51#[rustfmt::skip]
52pub use Handle as Job;
53#[rustfmt::skip]
54pub use Handle as Process;
55#[rustfmt::skip]
56pub use Handle as Resource;
57#[rustfmt::skip]
58pub use Handle as Stream;
59#[rustfmt::skip]
60pub use Handle as Thread;
61#[rustfmt::skip]
62pub use Handle as Vmar;
63#[rustfmt::skip]
64pub use Handle as Vmo;
65#[rustfmt::skip]
66pub use Handle as Counter;
67
68use proto::f_domain_ordinals as ordinals;
69
70fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71    match error {
72        FDomainError::TargetError(e) => {
73            let e = zx_status::Status::from_raw(*e);
74            write!(f, "Target-side error {e}")
75        }
76        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
77            write!(f, "Tried to use invalid handle id {id}")
78        }
79        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
80            f,
81            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
82        ),
83        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
84            write!(f, "Handle is occupied delivering streaming reads")
85        }
86        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
87            write!(f, "No streaming read was in progress")
88        }
89        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
90            write!(
91                f,
92                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
93            )
94        }
95        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
96            if *same_call {
97                write!(f, "Tried to create two or more new handles with the same id {id}")
98            } else {
99                write!(
100                    f,
101                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
102                )
103            }
104        }
105        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
106            write!(f, "Tried to write a channel into itself")
107        }
108        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
109            write!(f, "Handle closed while being read")
110        }
111        _ => todo!(),
112    }
113}
114
115/// Result type alias.
116pub type Result<T, E = Error> = std::result::Result<T, E>;
117
118/// Error type emitted by FDomain operations.
119#[derive(Clone)]
120pub enum Error {
121    SocketWrite(WriteSocketError),
122    ChannelWrite(WriteChannelError),
123    FDomain(FDomainError),
124    Protocol(::fidl::Error),
125    ProtocolObjectTypeIncompatible,
126    ProtocolRightsIncompatible,
127    ProtocolSignalsIncompatible,
128    ProtocolStreamEventIncompatible,
129    Transport(Option<Arc<std::io::Error>>),
130    ConnectionMismatch,
131    StreamingAborted,
132}
133
134impl std::fmt::Display for Error {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
138                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
139                write_fdomain_error(error, f)
140            }
141            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
142                write!(f, "While writing channel: ")?;
143                write_fdomain_error(error, f)
144            }
145            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
146                write!(f, "Couldn't write all handles into a channel:")?;
147                for (pos, error) in
148                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
149                {
150                    write!(f, "\n  Handle in position {pos}: ")?;
151                    write_fdomain_error(error, f)?;
152                }
153                Ok(())
154            }
155            Self::ProtocolObjectTypeIncompatible => {
156                write!(f, "The FDomain protocol does not recognize an object type")
157            }
158            Self::ProtocolRightsIncompatible => {
159                write!(f, "The FDomain protocol does not recognize some rights")
160            }
161            Self::ProtocolSignalsIncompatible => {
162                write!(f, "The FDomain protocol does not recognize some signals")
163            }
164            Self::ProtocolStreamEventIncompatible => {
165                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
166            }
167            Self::FDomain(e) => write_fdomain_error(e, f),
168            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
169            Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
170            Self::Transport(None) => write!(f, "Transport closed"),
171            Self::ConnectionMismatch => {
172                write!(f, "Tried to use an FDomain handle from a different connection")
173            }
174            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
175        }
176    }
177}
178
179impl std::fmt::Debug for Error {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        match self {
182            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
183            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
184            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
185            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
186            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
187            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
188            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
189            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
190            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
191            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
192            Self::StreamingAborted => write!(f, "StreamingAborted"),
193        }
194    }
195}
196
197impl std::error::Error for Error {}
198
199impl From<FDomainError> for Error {
200    fn from(other: FDomainError) -> Self {
201        Self::FDomain(other)
202    }
203}
204
205impl From<::fidl::Error> for Error {
206    fn from(other: ::fidl::Error) -> Self {
207        Self::Protocol(other)
208    }
209}
210
211impl From<WriteSocketError> for Error {
212    fn from(other: WriteSocketError) -> Self {
213        Self::SocketWrite(other)
214    }
215}
216
217impl From<WriteChannelError> for Error {
218    fn from(other: WriteChannelError) -> Self {
219        Self::ChannelWrite(other)
220    }
221}
222
223/// An error emitted internally by the client. Similar to [`Error`] but does not
224/// contain several variants which are irrelevant in the contexts where it is
225/// used.
226#[derive(Clone)]
227enum InnerError {
228    Protocol(::fidl::Error),
229    ProtocolStreamEventIncompatible,
230    Transport(Option<Arc<std::io::Error>>),
231}
232
233impl From<InnerError> for Error {
234    fn from(other: InnerError) -> Self {
235        match other {
236            InnerError::Protocol(p) => Error::Protocol(p),
237            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
238            InnerError::Transport(t) => Error::Transport(t),
239        }
240    }
241}
242
243impl From<::fidl::Error> for InnerError {
244    fn from(other: ::fidl::Error) -> Self {
245        InnerError::Protocol(other)
246    }
247}
248
249// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
250/// Implemented by objects which provide a transport over which we can speak the
251/// FDomain protocol.
252///
253/// The implementer must provide two things:
254/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
255///    via the `Stream` trait, which this trait requires.
256/// 2) A way to send messages. This is provided by implementing the
257///    `poll_send_message` method.
258pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
259    /// Attempt to send a message asynchronously. Messages should be sent so
260    /// that they arrive at the target in order.
261    fn poll_send_message(
262        self: Pin<&mut Self>,
263        msg: &[u8],
264        ctx: &mut Context<'_>,
265    ) -> Poll<Result<(), Option<std::io::Error>>>;
266
267    /// Optional debug information outlet.
268    fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        Ok(())
270    }
271
272    /// Whether `debug_fmt` does anything.
273    fn has_debug_fmt(&self) -> bool {
274        false
275    }
276}
277
278/// Wrapper for an `FDomainTransport` implementer that:
279/// 1) Provides a queue for outgoing messages so we need not have an await point
280///    when we submit a message.
281/// 2) Drops the transport on error, then returns the last observed error for
282///    all future operations.
283enum Transport {
284    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
285    Error(InnerError),
286}
287
288impl Transport {
289    /// Get the failure mode of the transport if it has failed.
290    fn error(&self) -> Option<InnerError> {
291        match self {
292            Transport::Transport(_, _, _) => None,
293            Transport::Error(inner_error) => Some(inner_error.clone()),
294        }
295    }
296
297    /// Enqueue a message to be sent on this transport.
298    fn push_msg(&mut self, msg: Box<[u8]>) {
299        if let Transport::Transport(_, v, w) = self {
300            v.push_back(msg);
301            w.drain(..).for_each(Waker::wake);
302        }
303    }
304
305    /// Push messages in the send queue out through the transport.
306    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
307        match self {
308            Transport::Error(e) => Poll::Ready(e.clone()),
309            Transport::Transport(t, v, w) => {
310                while let Some(msg) = v.front() {
311                    match t.as_mut().poll_send_message(msg, ctx) {
312                        Poll::Ready(Ok(())) => {
313                            v.pop_front();
314                        }
315                        Poll::Ready(Err(e)) => {
316                            let e = e.map(Arc::new);
317                            *self = Transport::Error(InnerError::Transport(e.clone()));
318                            return Poll::Ready(InnerError::Transport(e));
319                        }
320                        Poll::Pending => return Poll::Pending,
321                    }
322                }
323
324                if v.is_empty() {
325                    w.push(ctx.waker().clone());
326                } else {
327                    ctx.waker().wake_by_ref();
328                }
329                Poll::Pending
330            }
331        }
332    }
333
334    /// Get the next incoming message from the transport.
335    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
336        match self {
337            Transport::Error(e) => Poll::Ready(Err(e.clone())),
338            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
339                Some(Ok(x)) => Poll::Ready(Ok(x)),
340                Some(Err(e)) => {
341                    let e = Arc::new(e);
342                    *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
343                    Poll::Ready(Err(InnerError::Transport(Some(e))))
344                }
345                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
346            },
347        }
348    }
349}
350
351impl Drop for Transport {
352    fn drop(&mut self) {
353        if let Transport::Transport(_, _, wakers) = self {
354            wakers.drain(..).for_each(Waker::wake);
355        }
356    }
357}
358
359/// State of a socket that is or has been read from.
360struct SocketReadState {
361    wakers: Vec<Waker>,
362    queued: VecDeque<Result<proto::SocketData, Error>>,
363    read_request_pending: bool,
364    is_streaming: bool,
365}
366
367impl SocketReadState {
368    /// Handle an incoming message, which is either a channel streaming event or
369    /// response to a `ChannelRead` request.
370    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
371        self.queued.push_back(msg);
372        self.wakers.drain(..).for_each(Waker::wake);
373    }
374}
375
376/// State of a channel that is or has been read from.
377struct ChannelReadState {
378    wakers: Vec<Waker>,
379    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
380    read_request_pending: bool,
381    is_streaming: bool,
382}
383
384impl ChannelReadState {
385    /// Handle an incoming message, which is either a channel streaming event or
386    /// response to a `ChannelRead` request.
387    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
388        self.queued.push_back(msg);
389        self.wakers.drain(..).for_each(Waker::wake);
390    }
391}
392
393/// Lock-protected interior of `Client`
394struct ClientInner {
395    transport: Transport,
396    transactions: HashMap<NonZeroU32, responder::Responder>,
397    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
398    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
399    next_tx_id: u32,
400    waiting_to_close: Vec<proto::HandleId>,
401    waiting_to_close_waker: Waker,
402}
403
404impl ClientInner {
405    /// Serialize and enqueue a new transaction, including header and transaction ID.
406    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
407        let tx_id = self.next_tx_id;
408
409        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
410        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
411        self.next_tx_id += 1;
412        assert!(
413            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
414            "Allocated same tx id twice!"
415        );
416        self.transport.push_msg(msg.into());
417    }
418
419    /// Polls the underlying transport to ensure any incoming or outgoing
420    /// messages are processed as far as possible. Errors if the transport has failed.
421    fn try_poll_transport(
422        &mut self,
423        ctx: &mut Context<'_>,
424    ) -> Poll<Result<Infallible, InnerError>> {
425        if !self.waiting_to_close.is_empty() {
426            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
427            // We've dropped the handle object. Nobody is going to wait to read
428            // the buffers anymore. This is a safe time to drop the read state.
429            for handle in &handles {
430                let _ = self.channel_read_states.remove(handle);
431                let _ = self.socket_read_states.remove(handle);
432            }
433            self.request(
434                ordinals::CLOSE,
435                proto::FDomainCloseRequest { handles },
436                Responder::Ignore,
437            );
438        }
439
440        self.waiting_to_close_waker = ctx.waker().clone();
441
442        loop {
443            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
444                for state in std::mem::take(&mut self.socket_read_states).into_values() {
445                    state.wakers.into_iter().for_each(Waker::wake);
446                }
447                for (_, state) in self.channel_read_states.drain() {
448                    state.wakers.into_iter().for_each(Waker::wake);
449                }
450                return Poll::Ready(Err(e));
451            }
452            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
453                return Poll::Pending;
454            };
455            let data = result?;
456            let (header, data) = match fidl_message::decode_transaction_header(&data) {
457                Ok(x) => x,
458                Err(e) => {
459                    self.transport = Transport::Error(InnerError::Protocol(e));
460                    continue;
461                }
462            };
463
464            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
465                if let Err(e) = self.process_event(header, data) {
466                    self.transport = Transport::Error(e);
467                }
468                continue;
469            };
470
471            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
472            match tx.handle(self, Ok((header, data))) {
473                Ok(x) => x,
474                Err(e) => {
475                    self.transport = Transport::Error(InnerError::Protocol(e));
476                    continue;
477                }
478            }
479        }
480    }
481
482    /// Process an incoming message that arose from an event rather than a transaction reply.
483    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
484        match header.ordinal {
485            ordinals::ON_SOCKET_STREAMING_DATA => {
486                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
487                    header, data,
488                )?;
489                let o =
490                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
491                        wakers: Vec::new(),
492                        queued: VecDeque::new(),
493                        is_streaming: false,
494                        read_request_pending: false,
495                    });
496                match msg.socket_message {
497                    proto::SocketMessage::Data(data) => {
498                        o.handle_incoming_message(Ok(data));
499                        Ok(())
500                    }
501                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
502                        if let Some(error) = error {
503                            o.handle_incoming_message(Err(Error::FDomain(*error)));
504                        }
505                        o.is_streaming = false;
506                        Ok(())
507                    }
508                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
509                }
510            }
511            ordinals::ON_CHANNEL_STREAMING_DATA => {
512                let msg = fidl_message::decode_message::<
513                    proto::ChannelOnChannelStreamingDataRequest,
514                >(header, data)?;
515                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
516                    ChannelReadState {
517                        wakers: Vec::new(),
518                        queued: VecDeque::new(),
519                        is_streaming: false,
520                        read_request_pending: false,
521                    }
522                });
523                match msg.channel_sent {
524                    proto::ChannelSent::Message(data) => {
525                        o.handle_incoming_message(Ok(data));
526                        Ok(())
527                    }
528                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
529                        if let Some(error) = error {
530                            o.handle_incoming_message(Err(Error::FDomain(*error)));
531                        }
532                        o.is_streaming = false;
533                        Ok(())
534                    }
535                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
536                }
537            }
538            _ => Err(::fidl::Error::UnknownOrdinal {
539                ordinal: header.ordinal,
540                protocol_name:
541                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
542            }
543            .into()),
544        }
545    }
546
547    /// Polls the underlying transport to ensure any incoming or outgoing
548    /// messages are processed as far as possible. If a failure occurs, puts the
549    /// transport into an error state and fails all pending transactions.
550    fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
551        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
552            for (_, v) in std::mem::take(&mut self.transactions) {
553                let _ = v.handle(self, Err(e.clone()));
554            }
555
556            Poll::Ready(())
557        } else {
558            Poll::Pending
559        }
560    }
561
562    /// Handles the response to a `SocketRead` protocol message.
563    pub(crate) fn handle_socket_read_response(
564        &mut self,
565        msg: Result<proto::SocketData, Error>,
566        id: proto::HandleId,
567    ) {
568        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
569            wakers: Vec::new(),
570            queued: VecDeque::new(),
571            is_streaming: false,
572            read_request_pending: false,
573        });
574        state.handle_incoming_message(msg);
575        state.read_request_pending = false;
576    }
577
578    /// Handles the response to a `ChannelRead` protocol message.
579    pub(crate) fn handle_channel_read_response(
580        &mut self,
581        msg: Result<proto::ChannelMessage, Error>,
582        id: proto::HandleId,
583    ) {
584        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
585            wakers: Vec::new(),
586            queued: VecDeque::new(),
587            is_streaming: false,
588            read_request_pending: false,
589        });
590        state.handle_incoming_message(msg);
591        state.read_request_pending = false;
592    }
593}
594
595impl Drop for ClientInner {
596    fn drop(&mut self) {
597        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
598        for responder in responders {
599            let _ = responder.handle(self, Err(InnerError::Transport(None)));
600        }
601        for state in self.channel_read_states.values_mut() {
602            state.wakers.drain(..).for_each(Waker::wake);
603        }
604        for state in self.socket_read_states.values_mut() {
605            state.wakers.drain(..).for_each(Waker::wake);
606        }
607        self.waiting_to_close_waker.wake_by_ref();
608    }
609}
610
611/// Represents a connection to an FDomain.
612///
613/// The client is constructed by passing it a transport object which represents
614/// the raw connection to the remote FDomain. The `Client` wrapper then allows
615/// us to construct and use handles which behave similarly to their counterparts
616/// on a Fuchsia device.
617pub struct Client(pub(crate) Mutex<ClientInner>);
618
619impl std::fmt::Debug for Client {
620    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621        let inner = self.0.lock();
622        match &inner.transport {
623            Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
624                write!(f, "Client(")?;
625                transport.debug_fmt(f)?;
626                write!(f, ")")
627            }
628            Transport::Error(error) => {
629                let error = Error::from(error.clone());
630                write!(f, "Client(Failed: {error})")
631            }
632            _ => f.debug_tuple("Client").field(&"<transport>").finish(),
633        }
634    }
635}
636
637/// A client which is always disconnected. Handles that lose their clients
638/// connect to this client instead, which always returns a "Client Lost"
639/// transport failure.
640pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
641    Arc::new(Client(Mutex::new(ClientInner {
642        transport: Transport::Error(InnerError::Transport(None)),
643        transactions: HashMap::new(),
644        channel_read_states: HashMap::new(),
645        socket_read_states: HashMap::new(),
646        next_tx_id: 1,
647        waiting_to_close: Vec::new(),
648        waiting_to_close_waker: std::task::Waker::noop().clone(),
649    })))
650});
651
652impl Client {
653    /// Create a new FDomain client. The `transport` argument should contain the
654    /// established connection to the target, ready to communicate the FDomain
655    /// protocol.
656    ///
657    /// The second return item is a future that must be polled to keep
658    /// transactions running.
659    pub fn new(
660        transport: impl FDomainTransport + 'static,
661    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
662        let ret = Arc::new(Client(Mutex::new(ClientInner {
663            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
664            transactions: HashMap::new(),
665            socket_read_states: HashMap::new(),
666            channel_read_states: HashMap::new(),
667            next_tx_id: 1,
668            waiting_to_close: Vec::new(),
669            waiting_to_close_waker: std::task::Waker::noop().clone(),
670        })));
671
672        let client_weak = Arc::downgrade(&ret);
673        let fut = futures::future::poll_fn(move |ctx| {
674            let Some(client) = client_weak.upgrade() else {
675                return Poll::Ready(());
676            };
677
678            client.0.lock().poll_transport(ctx)
679        });
680
681        (ret, fut)
682    }
683
684    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
685    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
686        let new_handle = self.new_hid();
687        self.transaction(
688            ordinals::GET_NAMESPACE,
689            proto::FDomainGetNamespaceRequest { new_handle },
690            Responder::Namespace,
691        )
692        .await?;
693        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
694    }
695
696    /// Create a new channel in the connected FDomain.
697    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
698        let id_a = self.new_hid();
699        let id_b = self.new_hid();
700        let fut = self.transaction(
701            ordinals::CREATE_CHANNEL,
702            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
703            Responder::CreateChannel,
704        );
705
706        fuchsia_async::Task::spawn(async move {
707            if let Err(e) = fut.await {
708                log::debug!("FDomain channel creation failed: {e}");
709            }
710        })
711        .detach();
712
713        (
714            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
715            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
716        )
717    }
718
719    /// Creates client and server endpoints connected to by a channel.
720    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
721        self: &Arc<Self>,
722    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
723        let (client, server) = self.create_channel();
724        let client_end = crate::fidl::ClientEnd::<F>::new(client);
725        let server_end = crate::fidl::ServerEnd::new(server);
726        (client_end, server_end)
727    }
728
729    /// Creates a client proxy and a server endpoint connected by a channel.
730    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
731        self: &Arc<Self>,
732    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
733        let (client_end, server_end) = self.create_endpoints::<F>();
734        (client_end.into_proxy(), server_end)
735    }
736
737    /// Creates a client proxy and a server request stream connected by a channel.
738    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
739        self: &Arc<Self>,
740    ) -> (F::Proxy, F::RequestStream) {
741        let (client_end, server_end) = self.create_endpoints::<F>();
742        (client_end.into_proxy(), server_end.into_stream())
743    }
744
745    /// Creates a client end and a server request stream connected by a channel.
746    pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
747        self: &Arc<Self>,
748    ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
749        let (client_end, server_end) = self.create_endpoints::<F>();
750        (client_end, server_end.into_stream())
751    }
752
753    /// Create a new socket in the connected FDomain.
754    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
755        let id_a = self.new_hid();
756        let id_b = self.new_hid();
757        let fut = self.transaction(
758            ordinals::CREATE_SOCKET,
759            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
760            Responder::CreateSocket,
761        );
762
763        fuchsia_async::Task::spawn(async move {
764            if let Err(e) = fut.await {
765                log::debug!("FDomain socket creation failed: {e}");
766            }
767        })
768        .detach();
769
770        (
771            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
772            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
773        )
774    }
775
776    /// Create a new streaming socket in the connected FDomain.
777    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
778        self.create_socket(proto::SocketType::Stream)
779    }
780
781    /// Create a new datagram socket in the connected FDomain.
782    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
783        self.create_socket(proto::SocketType::Datagram)
784    }
785
786    /// Create a new event pair in the connected FDomain.
787    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
788        let id_a = self.new_hid();
789        let id_b = self.new_hid();
790        let fut = self.transaction(
791            ordinals::CREATE_EVENT_PAIR,
792            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
793            Responder::CreateEventPair,
794        );
795
796        fuchsia_async::Task::spawn(async move {
797            if let Err(e) = fut.await {
798                log::debug!("FDomain event pair creation failed: {e}");
799            }
800        })
801        .detach();
802
803        (
804            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
805            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
806        )
807    }
808
809    /// Create a new event handle in the connected FDomain.
810    pub fn create_event(self: &Arc<Self>) -> Event {
811        let id = self.new_hid();
812        let fut = self.transaction(
813            ordinals::CREATE_EVENT,
814            proto::EventCreateEventRequest { handle: id },
815            Responder::CreateEvent,
816        );
817
818        fuchsia_async::Task::spawn(async move {
819            if let Err(e) = fut.await {
820                log::debug!("FDomain event creation failed: {e}");
821            }
822        })
823        .detach();
824
825        Event(Handle { id: id.id, client: Arc::downgrade(self) })
826    }
827
828    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
829    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
830        // TODO: On the target side we have to keep a table of these which means
831        // we can automatically detect collisions in the random value. On the
832        // client side we'd have to add a whole data structure just for that
833        // purpose. Should we?
834        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
835    }
836
837    /// Create a future which sends a FIDL message to the connected FDomain and
838    /// waits for a response.
839    ///
840    /// Calling this method queues the transaction synchronously. Awaiting is
841    /// only necessary to wait for the response.
842    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
843        self: &Arc<Self>,
844        ordinal: u64,
845        request: S,
846        f: F,
847    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
848    where
849        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
850    {
851        let mut inner = self.0.lock();
852
853        let (sender, receiver) = futures::channel::oneshot::channel();
854        inner.request(ordinal, request, f(sender));
855        receiver.map(|x| x.expect("Oneshot went away without reply!"))
856    }
857
858    /// Start getting streaming events for socket reads.
859    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
860        let mut inner = self.0.lock();
861        if let Some(e) = inner.transport.error() {
862            return Err(e.into());
863        }
864
865        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
866            wakers: Vec::new(),
867            queued: VecDeque::new(),
868            is_streaming: false,
869            read_request_pending: false,
870        });
871
872        assert!(!state.is_streaming, "Initiated streaming twice!");
873        state.is_streaming = true;
874
875        inner.request(
876            ordinals::READ_SOCKET_STREAMING_START,
877            proto::SocketReadSocketStreamingStartRequest { handle: id },
878            Responder::Ignore,
879        );
880        Ok(())
881    }
882
883    /// Stop getting streaming events for socket reads. Doesn't return errors
884    /// because it's exclusively called in destructors where we have nothing to
885    /// do with them.
886    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
887        let mut inner = self.0.lock();
888        if let Some(state) = inner.socket_read_states.get_mut(&id) {
889            if state.is_streaming {
890                state.is_streaming = false;
891                // TODO: Log?
892                let _ = inner.request(
893                    ordinals::READ_SOCKET_STREAMING_STOP,
894                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
895                    Responder::Ignore,
896                );
897            }
898        }
899    }
900
901    /// Start getting streaming events for socket reads.
902    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
903        let mut inner = self.0.lock();
904        if let Some(e) = inner.transport.error() {
905            return Err(e.into());
906        }
907        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
908            wakers: Vec::new(),
909            queued: VecDeque::new(),
910            is_streaming: false,
911            read_request_pending: false,
912        });
913
914        assert!(!state.is_streaming, "Initiated streaming twice!");
915        state.is_streaming = true;
916
917        inner.request(
918            ordinals::READ_CHANNEL_STREAMING_START,
919            proto::ChannelReadChannelStreamingStartRequest { handle: id },
920            Responder::Ignore,
921        );
922
923        Ok(())
924    }
925
926    /// Stop getting streaming events for socket reads. Doesn't return errors
927    /// because it's exclusively called in destructors where we have nothing to
928    /// do with them.
929    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
930        let mut inner = self.0.lock();
931        if let Some(state) = inner.channel_read_states.get_mut(&id) {
932            if state.is_streaming {
933                state.is_streaming = false;
934                // TODO: Log?
935                let _ = inner.request(
936                    ordinals::READ_CHANNEL_STREAMING_STOP,
937                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
938                    Responder::Ignore,
939                );
940            }
941        }
942    }
943
944    /// Execute a read from a channel.
945    pub(crate) fn poll_socket(
946        &self,
947        id: proto::HandleId,
948        ctx: &mut Context<'_>,
949        out: &mut [u8],
950    ) -> Poll<Result<usize, Error>> {
951        let mut inner = self.0.lock();
952        if let Some(error) = inner.transport.error() {
953            return Poll::Ready(Err(error.into()));
954        }
955
956        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
957            wakers: Vec::new(),
958            queued: VecDeque::new(),
959            is_streaming: false,
960            read_request_pending: false,
961        });
962
963        if let Some(got) = state.queued.front_mut() {
964            match got.as_mut() {
965                Ok(data) => {
966                    let read_size = std::cmp::min(data.data.len(), out.len());
967                    out[..read_size].copy_from_slice(&data.data[..read_size]);
968
969                    if data.data.len() > read_size && !data.is_datagram {
970                        let _ = data.data.drain(..read_size);
971                    } else {
972                        let _ = state.queued.pop_front();
973                    }
974
975                    return Poll::Ready(Ok(read_size));
976                }
977                Err(_) => {
978                    let err = state.queued.pop_front().unwrap().unwrap_err();
979                    return Poll::Ready(Err(err));
980                }
981            }
982        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
983            state.wakers.push(ctx.waker().clone());
984        }
985
986        if !state.read_request_pending && !state.is_streaming {
987            inner.request(
988                ordinals::READ_SOCKET,
989                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
990                Responder::ReadSocket(id),
991            );
992        }
993
994        Poll::Pending
995    }
996
997    /// Execute a read from a channel.
998    pub(crate) fn poll_channel(
999        &self,
1000        id: proto::HandleId,
1001        ctx: &mut Context<'_>,
1002        for_stream: bool,
1003    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1004        let mut inner = self.0.lock();
1005        if let Some(error) = inner.transport.error() {
1006            return Poll::Ready(Some(Err(error.into())));
1007        }
1008
1009        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1010            wakers: Vec::new(),
1011            queued: VecDeque::new(),
1012            is_streaming: false,
1013            read_request_pending: false,
1014        });
1015
1016        if let Some(got) = state.queued.pop_front() {
1017            return Poll::Ready(Some(got));
1018        } else if for_stream && !state.is_streaming {
1019            return Poll::Ready(None);
1020        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1021            state.wakers.push(ctx.waker().clone());
1022        }
1023
1024        if !state.read_request_pending && !state.is_streaming {
1025            inner.request(
1026                ordinals::READ_CHANNEL,
1027                proto::ChannelReadChannelRequest { handle: id },
1028                Responder::ReadChannel(id),
1029            );
1030        }
1031
1032        Poll::Pending
1033    }
1034
1035    /// Check whether this channel is streaming
1036    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1037        let inner = self.0.lock();
1038        let Some(state) = inner.channel_read_states.get(&id) else {
1039            return false;
1040        };
1041        state.is_streaming
1042    }
1043
1044    /// Check that all the given handles are safe to transfer through a channel
1045    /// e.g. that there's no chance of in-flight reads getting dropped.
1046    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1047        let inner = self.0.lock();
1048        match handles {
1049            proto::Handles::Handles(handles) => {
1050                for handle in handles {
1051                    assert!(
1052                        !(inner.channel_read_states.contains_key(handle)
1053                            || inner.socket_read_states.contains_key(handle)),
1054                        "Tried to transfer handle after reading"
1055                    );
1056                }
1057            }
1058            proto::Handles::Dispositions(dispositions) => {
1059                for disposition in dispositions {
1060                    match &disposition.handle {
1061                        proto::HandleOp::Move_(handle) => assert!(
1062                            !(inner.channel_read_states.contains_key(handle)
1063                                || inner.socket_read_states.contains_key(handle)),
1064                            "Tried to transfer handle after reading"
1065                        ),
1066                        // Pretty sure this should be fine regardless of read state.
1067                        proto::HandleOp::Duplicate(_) => (),
1068                    }
1069                }
1070            }
1071        }
1072    }
1073}