fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use futures::FutureExt;
7use futures::channel::oneshot::Sender as OneshotSender;
8use futures::stream::Stream as StreamTrait;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{Context, Poll, Waker, ready};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29pub mod fidl_next;
30
31use responder::Responder;
32
33pub use channel::{
34    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
35};
36pub use event::Event;
37pub use event_pair::Eventpair as EventPair;
38pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
39pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
40pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
41
42// Unsupported handle types.
43#[rustfmt::skip]
44pub use Handle as Fifo;
45#[rustfmt::skip]
46pub use Handle as Job;
47#[rustfmt::skip]
48pub use Handle as Process;
49#[rustfmt::skip]
50pub use Handle as Resource;
51#[rustfmt::skip]
52pub use Handle as Stream;
53#[rustfmt::skip]
54pub use Handle as Thread;
55#[rustfmt::skip]
56pub use Handle as Vmar;
57#[rustfmt::skip]
58pub use Handle as Vmo;
59
60use proto::f_domain_ordinals as ordinals;
61
62fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63    match error {
64        FDomainError::TargetError(e) => {
65            let e = zx_status::Status::from_raw(*e);
66            write!(f, "Target-side error {e}")
67        }
68        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
69            write!(f, "Tried to use invalid handle id {id}")
70        }
71        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
72            f,
73            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
74        ),
75        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
76            write!(f, "Handle is occupied delivering streaming reads")
77        }
78        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
79            write!(f, "No streaming read was in progress")
80        }
81        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
82            write!(
83                f,
84                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
85            )
86        }
87        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
88            if *same_call {
89                write!(f, "Tried to create two or more new handles with the same id {id}")
90            } else {
91                write!(
92                    f,
93                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
94                )
95            }
96        }
97        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
98            write!(f, "Tried to write a channel into itself")
99        }
100        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
101            write!(f, "Handle closed while being read")
102        }
103        _ => todo!(),
104    }
105}
106
107/// Result type alias.
108pub type Result<T, E = Error> = std::result::Result<T, E>;
109
110/// Error type emitted by FDomain operations.
111#[derive(Clone)]
112pub enum Error {
113    SocketWrite(WriteSocketError),
114    ChannelWrite(WriteChannelError),
115    FDomain(FDomainError),
116    Protocol(::fidl::Error),
117    ProtocolObjectTypeIncompatible,
118    ProtocolRightsIncompatible,
119    ProtocolSignalsIncompatible,
120    ProtocolStreamEventIncompatible,
121    Transport(Arc<std::io::Error>),
122    ConnectionMismatch,
123    StreamingAborted,
124}
125
126impl std::fmt::Display for Error {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        match self {
129            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
130                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
131                write_fdomain_error(error, f)
132            }
133            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
134                write!(f, "While writing channel: ")?;
135                write_fdomain_error(error, f)
136            }
137            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
138                write!(f, "Couldn't write all handles into a channel:")?;
139                for (pos, error) in
140                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
141                {
142                    write!(f, "\n  Handle in position {pos}: ")?;
143                    write_fdomain_error(error, f)?;
144                }
145                Ok(())
146            }
147            Self::ProtocolObjectTypeIncompatible => {
148                write!(f, "The FDomain protocol does not recognize an object type")
149            }
150            Self::ProtocolRightsIncompatible => {
151                write!(f, "The FDomain protocol does not recognize some rights")
152            }
153            Self::ProtocolSignalsIncompatible => {
154                write!(f, "The FDomain protocol does not recognize some signals")
155            }
156            Self::ProtocolStreamEventIncompatible => {
157                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
158            }
159            Self::FDomain(e) => write_fdomain_error(e, f),
160            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
161            Self::Transport(e) => write!(f, "Transport error: {e:?}"),
162            Self::ConnectionMismatch => {
163                write!(f, "Tried to use an FDomain handle from a different connection")
164            }
165            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
166        }
167    }
168}
169
170impl std::fmt::Debug for Error {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        match self {
173            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
174            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
175            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
176            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
177            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
178            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
179            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
180            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
181            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
182            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
183            Self::StreamingAborted => write!(f, "StreamingAborted"),
184        }
185    }
186}
187
188impl std::error::Error for Error {}
189
190impl From<FDomainError> for Error {
191    fn from(other: FDomainError) -> Self {
192        Self::FDomain(other)
193    }
194}
195
196impl From<::fidl::Error> for Error {
197    fn from(other: ::fidl::Error) -> Self {
198        Self::Protocol(other)
199    }
200}
201
202impl From<WriteSocketError> for Error {
203    fn from(other: WriteSocketError) -> Self {
204        Self::SocketWrite(other)
205    }
206}
207
208impl From<WriteChannelError> for Error {
209    fn from(other: WriteChannelError) -> Self {
210        Self::ChannelWrite(other)
211    }
212}
213
214/// An error emitted internally by the client. Similar to [`Error`] but does not
215/// contain several variants which are irrelevant in the contexts where it is
216/// used.
217enum InnerError {
218    Protocol(::fidl::Error),
219    ProtocolStreamEventIncompatible,
220    Transport(Arc<std::io::Error>),
221}
222
223impl Clone for InnerError {
224    fn clone(&self) -> Self {
225        match self {
226            InnerError::Protocol(a) => InnerError::Protocol(a.clone()),
227            InnerError::ProtocolStreamEventIncompatible => {
228                InnerError::ProtocolStreamEventIncompatible
229            }
230            InnerError::Transport(a) => InnerError::Transport(Arc::clone(a)),
231        }
232    }
233}
234
235impl From<InnerError> for Error {
236    fn from(other: InnerError) -> Self {
237        match other {
238            InnerError::Protocol(p) => Error::Protocol(p),
239            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
240            InnerError::Transport(t) => Error::Transport(t),
241        }
242    }
243}
244
245impl From<::fidl::Error> for InnerError {
246    fn from(other: ::fidl::Error) -> Self {
247        InnerError::Protocol(other)
248    }
249}
250
251// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
252/// Implemented by objects which provide a transport over which we can speak the
253/// FDomain protocol.
254///
255/// The implementer must provide two things:
256/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
257///    via the `Stream` trait, which this trait requires.
258/// 2) A way to send messages. This is provided by implementing the
259///    `poll_send_message` method.
260pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
261    /// Attempt to send a message asynchronously. Messages should be sent so
262    /// that they arrive at the target in order.
263    fn poll_send_message(
264        self: Pin<&mut Self>,
265        msg: &[u8],
266        ctx: &mut Context<'_>,
267    ) -> Poll<Result<(), std::io::Error>>;
268}
269
270/// Wrapper for an `FDomainTransport` implementer that:
271/// 1) Provides a queue for outgoing messages so we need not have an await point
272///    when we submit a message.
273/// 2) Drops the transport on error, then returns the last observed error for
274///    all future operations.
275enum Transport {
276    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
277    Error(InnerError),
278}
279
280impl Transport {
281    /// Get the failure mode of the transport if it has failed.
282    fn error(&self) -> Option<InnerError> {
283        match self {
284            Transport::Transport(_, _, _) => None,
285            Transport::Error(inner_error) => Some(inner_error.clone()),
286        }
287    }
288
289    /// Enqueue a message to be sent on this transport.
290    fn push_msg(&mut self, msg: Box<[u8]>) {
291        if let Transport::Transport(_, v, w) = self {
292            v.push_back(msg);
293            w.drain(..).for_each(Waker::wake);
294        }
295    }
296
297    /// Push messages in the send queue out through the transport.
298    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
299        match self {
300            Transport::Error(e) => Poll::Ready(e.clone()),
301            Transport::Transport(t, v, w) => {
302                while let Some(msg) = v.front() {
303                    match t.as_mut().poll_send_message(msg, ctx) {
304                        Poll::Ready(Ok(())) => {
305                            v.pop_front();
306                        }
307                        Poll::Ready(Err(e)) => {
308                            let e = Arc::new(e);
309                            *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
310                            return Poll::Ready(InnerError::Transport(e));
311                        }
312                        Poll::Pending => return Poll::Pending,
313                    }
314                }
315
316                if v.is_empty() {
317                    w.push(ctx.waker().clone());
318                } else {
319                    ctx.waker().wake_by_ref();
320                }
321                Poll::Pending
322            }
323        }
324    }
325
326    /// Get the next incoming message from the transport.
327    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Option<Result<Box<[u8]>, InnerError>>> {
328        match self {
329            Transport::Error(e) => Poll::Ready(Some(Err(e.clone()))),
330            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
331                Some(Ok(x)) => Poll::Ready(Some(Ok(x))),
332                Some(Err(e)) => {
333                    let e = Arc::new(e);
334                    *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
335                    Poll::Ready(Some(Err(InnerError::Transport(e))))
336                }
337                Option::None => Poll::Ready(None),
338            },
339        }
340    }
341}
342
343/// State of a socket that is or has been read from.
344struct SocketReadState {
345    wakers: Vec<Waker>,
346    queued: VecDeque<Result<proto::SocketData, Error>>,
347    read_request_pending: bool,
348    is_streaming: bool,
349}
350
351impl SocketReadState {
352    /// Handle an incoming message, which is either a channel streaming event or
353    /// response to a `ChannelRead` request.
354    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
355        self.queued.push_back(msg);
356        self.wakers.drain(..).for_each(Waker::wake);
357    }
358}
359
360/// State of a channel that is or has been read from.
361struct ChannelReadState {
362    wakers: Vec<Waker>,
363    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
364    read_request_pending: bool,
365    is_streaming: bool,
366}
367
368impl ChannelReadState {
369    /// Handle an incoming message, which is either a channel streaming event or
370    /// response to a `ChannelRead` request.
371    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
372        self.queued.push_back(msg);
373        self.wakers.drain(..).for_each(Waker::wake);
374    }
375}
376
377/// Lock-protected interior of `Client`
378struct ClientInner {
379    transport: Transport,
380    transactions: HashMap<NonZeroU32, responder::Responder>,
381    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
382    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
383    next_tx_id: u32,
384    waiting_to_close: Vec<proto::HandleId>,
385    waiting_to_close_waker: Waker,
386}
387
388impl ClientInner {
389    /// Serialize and enqueue a new transaction, including header and transaction ID.
390    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
391        let tx_id = self.next_tx_id;
392
393        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
394        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
395        self.next_tx_id += 1;
396        assert!(
397            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
398            "Allocated same tx id twice!"
399        );
400        self.transport.push_msg(msg.into());
401    }
402
403    /// Polls the underlying transport to ensure any incoming or outgoing
404    /// messages are processed as far as possible. Errors if the transport has failed.
405    fn try_poll_transport(
406        &mut self,
407        ctx: &mut Context<'_>,
408    ) -> Poll<Result<Infallible, InnerError>> {
409        if !self.waiting_to_close.is_empty() {
410            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
411            // We've dropped the handle object. Nobody is going to wait to read
412            // the buffers anymore. This is a safe time to drop the read state.
413            for handle in &handles {
414                let _ = self.channel_read_states.remove(handle);
415                let _ = self.socket_read_states.remove(handle);
416            }
417            self.request(
418                ordinals::CLOSE,
419                proto::FDomainCloseRequest { handles },
420                Responder::Ignore,
421            );
422        }
423
424        self.waiting_to_close_waker = ctx.waker().clone();
425
426        loop {
427            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
428                for state in std::mem::take(&mut self.socket_read_states).into_values() {
429                    state.wakers.into_iter().for_each(Waker::wake);
430                }
431                for (_, state) in self.channel_read_states.drain() {
432                    state.wakers.into_iter().for_each(Waker::wake);
433                }
434                return Poll::Ready(Err(e));
435            }
436            let Poll::Ready(Some(result)) = self.transport.poll_next(ctx) else {
437                return Poll::Pending;
438            };
439            let data = result?;
440            let (header, data) = match fidl_message::decode_transaction_header(&data) {
441                Ok(x) => x,
442                Err(e) => {
443                    self.transport = Transport::Error(InnerError::Protocol(e));
444                    continue;
445                }
446            };
447
448            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
449                if let Err(e) = self.process_event(header, data) {
450                    self.transport = Transport::Error(e);
451                }
452                continue;
453            };
454
455            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
456            match tx.handle(self, Ok((header, data))) {
457                Ok(x) => x,
458                Err(e) => {
459                    self.transport = Transport::Error(InnerError::Protocol(e));
460                    continue;
461                }
462            }
463        }
464    }
465
466    /// Process an incoming message that arose from an event rather than a transaction reply.
467    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
468        match header.ordinal {
469            ordinals::ON_SOCKET_STREAMING_DATA => {
470                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
471                    header, data,
472                )?;
473                let o =
474                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
475                        wakers: Vec::new(),
476                        queued: VecDeque::new(),
477                        is_streaming: false,
478                        read_request_pending: false,
479                    });
480                match msg.socket_message {
481                    proto::SocketMessage::Data(data) => {
482                        o.handle_incoming_message(Ok(data));
483                        Ok(())
484                    }
485                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
486                        if let Some(error) = error {
487                            o.handle_incoming_message(Err(Error::FDomain(*error)));
488                        }
489                        o.is_streaming = false;
490                        Ok(())
491                    }
492                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
493                }
494            }
495            ordinals::ON_CHANNEL_STREAMING_DATA => {
496                let msg = fidl_message::decode_message::<
497                    proto::ChannelOnChannelStreamingDataRequest,
498                >(header, data)?;
499                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
500                    ChannelReadState {
501                        wakers: Vec::new(),
502                        queued: VecDeque::new(),
503                        is_streaming: false,
504                        read_request_pending: false,
505                    }
506                });
507                match msg.channel_sent {
508                    proto::ChannelSent::Message(data) => {
509                        o.handle_incoming_message(Ok(data));
510                        Ok(())
511                    }
512                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
513                        if let Some(error) = error {
514                            o.handle_incoming_message(Err(Error::FDomain(*error)));
515                        }
516                        o.is_streaming = false;
517                        Ok(())
518                    }
519                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
520                }
521            }
522            _ => Err(::fidl::Error::UnknownOrdinal {
523                ordinal: header.ordinal,
524                protocol_name:
525                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
526            }
527            .into()),
528        }
529    }
530
531    /// Polls the underlying transport to ensure any incoming or outgoing
532    /// messages are processed as far as possible. If a failure occurs, puts the
533    /// transport into an error state and fails all pending transactions.
534    fn poll_transport(&mut self, ctx: &mut Context<'_>) {
535        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
536            for (_, v) in std::mem::take(&mut self.transactions) {
537                let _ = v.handle(self, Err(e.clone()));
538            }
539        }
540    }
541
542    /// Handles the response to a `SocketRead` protocol message.
543    pub(crate) fn handle_socket_read_response(
544        &mut self,
545        msg: Result<proto::SocketData, Error>,
546        id: proto::HandleId,
547    ) {
548        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
549            wakers: Vec::new(),
550            queued: VecDeque::new(),
551            is_streaming: false,
552            read_request_pending: false,
553        });
554        state.handle_incoming_message(msg);
555        state.read_request_pending = false;
556    }
557
558    /// Handles the response to a `ChannelRead` protocol message.
559    pub(crate) fn handle_channel_read_response(
560        &mut self,
561        msg: Result<proto::ChannelMessage, Error>,
562        id: proto::HandleId,
563    ) {
564        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
565            wakers: Vec::new(),
566            queued: VecDeque::new(),
567            is_streaming: false,
568            read_request_pending: false,
569        });
570        state.handle_incoming_message(msg);
571        state.read_request_pending = false;
572    }
573}
574
575/// Represents a connection to an FDomain.
576///
577/// The client is constructed by passing it a transport object which represents
578/// the raw connection to the remote FDomain. The `Client` wrapper then allows
579/// us to construct and use handles which behave similarly to their counterparts
580/// on a Fuchsia device.
581pub struct Client(pub(crate) Mutex<ClientInner>);
582
583impl std::fmt::Debug for Client {
584    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
585        f.debug_tuple("Client").field(&"...").finish()
586    }
587}
588
589/// A client which is always disconnected. Handles that lose their clients
590/// connect to this client instead, which always returns a "Client Lost"
591/// transport failure.
592pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
593    Arc::new(Client(Mutex::new(ClientInner {
594        transport: Transport::Error(InnerError::Transport(Arc::new(std::io::Error::other(
595            "Client Lost",
596        )))),
597        transactions: HashMap::new(),
598        channel_read_states: HashMap::new(),
599        socket_read_states: HashMap::new(),
600        next_tx_id: 1,
601        waiting_to_close: Vec::new(),
602        waiting_to_close_waker: futures::task::noop_waker(),
603    })))
604});
605
606impl Client {
607    /// Create a new FDomain client. The `transport` argument should contain the
608    /// established connection to the target, ready to communicate the FDomain
609    /// protocol.
610    ///
611    /// The second return item is a future that must be polled to keep
612    /// transactions running.
613    pub fn new(
614        transport: impl FDomainTransport + 'static,
615    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
616        let ret = Arc::new(Client(Mutex::new(ClientInner {
617            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
618            transactions: HashMap::new(),
619            socket_read_states: HashMap::new(),
620            channel_read_states: HashMap::new(),
621            next_tx_id: 1,
622            waiting_to_close: Vec::new(),
623            waiting_to_close_waker: futures::task::noop_waker(),
624        })));
625
626        let client_weak = Arc::downgrade(&ret);
627        let fut = futures::future::poll_fn(move |ctx| {
628            let Some(client) = client_weak.upgrade() else {
629                return Poll::Ready(());
630            };
631
632            client.0.lock().unwrap().poll_transport(ctx);
633            Poll::Pending
634        });
635
636        (ret, fut)
637    }
638
639    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
640    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
641        let new_handle = self.new_hid();
642        self.transaction(
643            ordinals::GET_NAMESPACE,
644            proto::FDomainGetNamespaceRequest { new_handle },
645            Responder::Namespace,
646        )
647        .await?;
648        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
649    }
650
651    /// Create a new channel in the connected FDomain.
652    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
653        let id_a = self.new_hid();
654        let id_b = self.new_hid();
655        let fut = self.transaction(
656            ordinals::CREATE_CHANNEL,
657            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
658            Responder::CreateChannel,
659        );
660
661        fuchsia_async::Task::spawn(async move {
662            if let Err(e) = fut.await {
663                log::debug!("FDomain channel creation failed: {e}");
664            }
665        })
666        .detach();
667
668        (
669            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
670            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
671        )
672    }
673
674    /// Creates client and server endpoints connected to by a channel.
675    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
676        self: &Arc<Self>,
677    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
678        let (client, server) = self.create_channel();
679        let client_end = crate::fidl::ClientEnd::<F>::new(client);
680        let server_end = crate::fidl::ServerEnd::new(server);
681        (client_end, server_end)
682    }
683
684    /// Creates a client proxy and a server endpoint connected by a channel.
685    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
686        self: &Arc<Self>,
687    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
688        let (client_end, server_end) = self.create_endpoints::<F>();
689        (client_end.into_proxy(), server_end)
690    }
691
692    /// Creates a client proxy and a server request stream connected by a channel.
693    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
694        self: &Arc<Self>,
695    ) -> (F::Proxy, F::RequestStream) {
696        let (client_end, server_end) = self.create_endpoints::<F>();
697        (client_end.into_proxy(), server_end.into_stream())
698    }
699
700    /// Create a new socket in the connected FDomain.
701    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
702        let id_a = self.new_hid();
703        let id_b = self.new_hid();
704        let fut = self.transaction(
705            ordinals::CREATE_SOCKET,
706            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
707            Responder::CreateSocket,
708        );
709
710        fuchsia_async::Task::spawn(async move {
711            if let Err(e) = fut.await {
712                log::debug!("FDomain socket creation failed: {e}");
713            }
714        })
715        .detach();
716
717        (
718            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
719            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
720        )
721    }
722
723    /// Create a new streaming socket in the connected FDomain.
724    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
725        self.create_socket(proto::SocketType::Stream)
726    }
727
728    /// Create a new datagram socket in the connected FDomain.
729    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
730        self.create_socket(proto::SocketType::Datagram)
731    }
732
733    /// Create a new event pair in the connected FDomain.
734    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
735        let id_a = self.new_hid();
736        let id_b = self.new_hid();
737        let fut = self.transaction(
738            ordinals::CREATE_EVENT_PAIR,
739            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
740            Responder::CreateEventPair,
741        );
742
743        fuchsia_async::Task::spawn(async move {
744            if let Err(e) = fut.await {
745                log::debug!("FDomain event pair creation failed: {e}");
746            }
747        })
748        .detach();
749
750        (
751            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
752            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
753        )
754    }
755
756    /// Create a new event handle in the connected FDomain.
757    pub fn create_event(self: &Arc<Self>) -> Event {
758        let id = self.new_hid();
759        let fut = self.transaction(
760            ordinals::CREATE_EVENT,
761            proto::EventCreateEventRequest { handle: id },
762            Responder::CreateEvent,
763        );
764
765        fuchsia_async::Task::spawn(async move {
766            if let Err(e) = fut.await {
767                log::debug!("FDomain event creation failed: {e}");
768            }
769        })
770        .detach();
771
772        Event(Handle { id: id.id, client: Arc::downgrade(self) })
773    }
774
775    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
776    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
777        // TODO: On the target side we have to keep a table of these which means
778        // we can automatically detect collisions in the random value. On the
779        // client side we'd have to add a whole data structure just for that
780        // purpose. Should we?
781        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
782    }
783
784    /// Create a future which sends a FIDL message to the connected FDomain and
785    /// waits for a response.
786    ///
787    /// Calling this method queues the transaction synchronously. Awaiting is
788    /// only necessary to wait for the response.
789    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
790        self: &Arc<Self>,
791        ordinal: u64,
792        request: S,
793        f: F,
794    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
795    where
796        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
797    {
798        let mut inner = self.0.lock().unwrap();
799
800        let (sender, receiver) = futures::channel::oneshot::channel();
801        inner.request(ordinal, request, f(sender));
802        receiver.map(|x| x.expect("Oneshot went away without reply!"))
803    }
804
805    /// Start getting streaming events for socket reads.
806    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
807        let mut inner = self.0.lock().unwrap();
808        if let Some(e) = inner.transport.error() {
809            return Err(e.into());
810        }
811
812        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
813            wakers: Vec::new(),
814            queued: VecDeque::new(),
815            is_streaming: false,
816            read_request_pending: false,
817        });
818
819        assert!(!state.is_streaming, "Initiated streaming twice!");
820        state.is_streaming = true;
821
822        inner.request(
823            ordinals::READ_SOCKET_STREAMING_START,
824            proto::SocketReadSocketStreamingStartRequest { handle: id },
825            Responder::Ignore,
826        );
827        Ok(())
828    }
829
830    /// Stop getting streaming events for socket reads. Doesn't return errors
831    /// because it's exclusively called in destructors where we have nothing to
832    /// do with them.
833    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
834        let mut inner = self.0.lock().unwrap();
835        if let Some(state) = inner.socket_read_states.get_mut(&id) {
836            if state.is_streaming {
837                state.is_streaming = false;
838                // TODO: Log?
839                let _ = inner.request(
840                    ordinals::READ_SOCKET_STREAMING_STOP,
841                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
842                    Responder::Ignore,
843                );
844            }
845        }
846    }
847
848    /// Start getting streaming events for socket reads.
849    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
850        let mut inner = self.0.lock().unwrap();
851        if let Some(e) = inner.transport.error() {
852            return Err(e.into());
853        }
854        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
855            wakers: Vec::new(),
856            queued: VecDeque::new(),
857            is_streaming: false,
858            read_request_pending: false,
859        });
860
861        assert!(!state.is_streaming, "Initiated streaming twice!");
862        state.is_streaming = true;
863
864        inner.request(
865            ordinals::READ_CHANNEL_STREAMING_START,
866            proto::ChannelReadChannelStreamingStartRequest { handle: id },
867            Responder::Ignore,
868        );
869
870        Ok(())
871    }
872
873    /// Stop getting streaming events for socket reads. Doesn't return errors
874    /// because it's exclusively called in destructors where we have nothing to
875    /// do with them.
876    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
877        let mut inner = self.0.lock().unwrap();
878        if let Some(state) = inner.channel_read_states.get_mut(&id) {
879            if state.is_streaming {
880                state.is_streaming = false;
881                // TODO: Log?
882                let _ = inner.request(
883                    ordinals::READ_CHANNEL_STREAMING_STOP,
884                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
885                    Responder::Ignore,
886                );
887            }
888        }
889    }
890
891    /// Execute a read from a channel.
892    pub(crate) fn poll_socket(
893        &self,
894        id: proto::HandleId,
895        ctx: &mut Context<'_>,
896        out: &mut [u8],
897    ) -> Poll<Result<usize, Error>> {
898        let mut inner = self.0.lock().unwrap();
899        if let Some(error) = inner.transport.error() {
900            return Poll::Ready(Err(error.into()));
901        }
902
903        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
904            wakers: Vec::new(),
905            queued: VecDeque::new(),
906            is_streaming: false,
907            read_request_pending: false,
908        });
909
910        if let Some(got) = state.queued.front_mut() {
911            match got.as_mut() {
912                Ok(data) => {
913                    let read_size = std::cmp::min(data.data.len(), out.len());
914                    out[..read_size].copy_from_slice(&data.data[..read_size]);
915
916                    if data.data.len() > read_size && !data.is_datagram {
917                        let _ = data.data.drain(..read_size);
918                    } else {
919                        let _ = state.queued.pop_front();
920                    }
921
922                    return Poll::Ready(Ok(read_size));
923                }
924                Err(_) => {
925                    let err = state.queued.pop_front().unwrap().unwrap_err();
926                    return Poll::Ready(Err(err));
927                }
928            }
929        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
930            state.wakers.push(ctx.waker().clone());
931        }
932
933        if !state.read_request_pending && !state.is_streaming {
934            inner.request(
935                ordinals::READ_SOCKET,
936                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
937                Responder::ReadSocket(id),
938            );
939        }
940
941        Poll::Pending
942    }
943
944    /// Execute a read from a channel.
945    pub(crate) fn poll_channel(
946        &self,
947        id: proto::HandleId,
948        ctx: &mut Context<'_>,
949        for_stream: bool,
950    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
951        let mut inner = self.0.lock().unwrap();
952        if let Some(error) = inner.transport.error() {
953            return Poll::Ready(Some(Err(error.into())));
954        }
955
956        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
957            wakers: Vec::new(),
958            queued: VecDeque::new(),
959            is_streaming: false,
960            read_request_pending: false,
961        });
962
963        if let Some(got) = state.queued.pop_front() {
964            return Poll::Ready(Some(got));
965        } else if for_stream && !state.is_streaming {
966            return Poll::Ready(None);
967        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
968            state.wakers.push(ctx.waker().clone());
969        }
970
971        if !state.read_request_pending && !state.is_streaming {
972            inner.request(
973                ordinals::READ_CHANNEL,
974                proto::ChannelReadChannelRequest { handle: id },
975                Responder::ReadChannel(id),
976            );
977        }
978
979        Poll::Pending
980    }
981
982    /// Check whether this channel is streaming
983    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
984        let inner = self.0.lock().unwrap();
985        let Some(state) = inner.channel_read_states.get(&id) else {
986            return false;
987        };
988        state.is_streaming
989    }
990
991    /// Check that all the given handles are safe to transfer through a channel
992    /// e.g. that there's no chance of in-flight reads getting dropped.
993    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
994        let inner = self.0.lock().unwrap();
995        match handles {
996            proto::Handles::Handles(handles) => {
997                for handle in handles {
998                    assert!(
999                        !(inner.channel_read_states.contains_key(handle)
1000                            || inner.socket_read_states.contains_key(handle)),
1001                        "Tried to transfer handle after reading"
1002                    );
1003                }
1004            }
1005            proto::Handles::Dispositions(dispositions) => {
1006                for disposition in dispositions {
1007                    match &disposition.handle {
1008                        proto::HandleOp::Move_(handle) => assert!(
1009                            !(inner.channel_read_states.contains_key(handle)
1010                                || inner.socket_read_states.contains_key(handle)),
1011                            "Tried to transfer handle after reading"
1012                        ),
1013                        // Pretty sure this should be fine regardless of read state.
1014                        proto::HandleOp::Duplicate(_) => (),
1015                    }
1016                }
1017            }
1018        }
1019    }
1020}