Skip to main content

fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_async as _;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::Stream as StreamTrait;
12use std::collections::{HashMap, VecDeque};
13use std::convert::Infallible;
14use std::future::Future;
15use std::num::NonZeroU32;
16use std::pin::Pin;
17use std::sync::{Arc, LazyLock, Weak};
18use std::task::{Context, Poll, Waker, ready};
19
20mod channel;
21mod event;
22mod event_pair;
23mod handle;
24mod responder;
25mod socket;
26
27#[cfg(test)]
28mod test;
29
30pub mod fidl;
31pub mod fidl_next;
32
33use responder::Responder;
34
35pub use channel::{
36    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
37};
38pub use event::Event;
39pub use event_pair::Eventpair as EventPair;
40pub use handle::unowned::Unowned;
41pub use handle::{
42    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
43};
44pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
45pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
46
47// Unsupported handle types.
48#[rustfmt::skip]
49pub use Handle as Clock;
50#[rustfmt::skip]
51pub use Handle as Exception;
52#[rustfmt::skip]
53pub use Handle as Fifo;
54#[rustfmt::skip]
55pub use Handle as Iob;
56#[rustfmt::skip]
57pub use Handle as Job;
58#[rustfmt::skip]
59pub use Handle as Process;
60#[rustfmt::skip]
61pub use Handle as Resource;
62#[rustfmt::skip]
63pub use Handle as Stream;
64#[rustfmt::skip]
65pub use Handle as Thread;
66#[rustfmt::skip]
67pub use Handle as Vmar;
68#[rustfmt::skip]
69pub use Handle as Vmo;
70#[rustfmt::skip]
71pub use Handle as Counter;
72
73use proto::f_domain_ordinals as ordinals;
74
75fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76    match error {
77        FDomainError::TargetError(e) => {
78            let e = zx_status::Status::from_raw(*e);
79            write!(f, "Target-side error {e}")
80        }
81        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
82            write!(f, "Tried to use invalid handle id {id}")
83        }
84        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
85            f,
86            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
87        ),
88        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
89            write!(f, "Handle is occupied delivering streaming reads")
90        }
91        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
92            write!(f, "No streaming read was in progress")
93        }
94        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
95            write!(
96                f,
97                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
98            )
99        }
100        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
101            if *same_call {
102                write!(f, "Tried to create two or more new handles with the same id {id}")
103            } else {
104                write!(
105                    f,
106                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
107                )
108            }
109        }
110        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
111            write!(f, "Tried to write a channel into itself")
112        }
113        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
114            write!(f, "Handle closed while being read")
115        }
116        _ => todo!(),
117    }
118}
119
120/// Result type alias.
121pub type Result<T, E = Error> = std::result::Result<T, E>;
122
123/// Error type emitted by FDomain operations.
124#[derive(Clone)]
125pub enum Error {
126    SocketWrite(WriteSocketError),
127    ChannelWrite(WriteChannelError),
128    FDomain(FDomainError),
129    Protocol(::fidl::Error),
130    ProtocolObjectTypeIncompatible,
131    ProtocolRightsIncompatible,
132    ProtocolSignalsIncompatible,
133    ProtocolStreamEventIncompatible,
134    Transport(Option<Arc<std::io::Error>>),
135    ConnectionMismatch,
136    StreamingAborted,
137}
138
139impl std::fmt::Display for Error {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
143                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
144                write_fdomain_error(error, f)
145            }
146            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
147                write!(f, "While writing channel: ")?;
148                write_fdomain_error(error, f)
149            }
150            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
151                write!(f, "Couldn't write all handles into a channel:")?;
152                for (pos, error) in
153                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
154                {
155                    write!(f, "\n  Handle in position {pos}: ")?;
156                    write_fdomain_error(error, f)?;
157                }
158                Ok(())
159            }
160            Self::ProtocolObjectTypeIncompatible => {
161                write!(
162                    f,
163                    "The FDomain protocol received an unrecognized or incompatible object type"
164                )
165            }
166            Self::ProtocolRightsIncompatible => {
167                write!(
168                    f,
169                    "The FDomain protocol received unrecognized or incompatible handle rights"
170                )
171            }
172            Self::ProtocolSignalsIncompatible => {
173                write!(f, "The FDomain protocol received unrecognized or incompatible signals")
174            }
175            Self::ProtocolStreamEventIncompatible => {
176                write!(
177                    f,
178                    "The FDomain protocol received an unrecognized or incompatible streaming IO event"
179                )
180            }
181            Self::FDomain(e) => write_fdomain_error(e, f),
182            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
183            Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
184            Self::Transport(None) => {
185                write!(f, "Transport error: Connection to the device has been lost")
186            }
187            Self::ConnectionMismatch => {
188                write!(
189                    f,
190                    "Tried to use an FDomain handle with a different connection than the one it was created on"
191                )
192            }
193            Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
194        }
195    }
196}
197
198impl std::fmt::Debug for Error {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        match self {
201            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
202            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
203            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
204            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
205            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
206            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
207            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
208            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
209            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
210            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
211            Self::StreamingAborted => write!(f, "StreamingAborted"),
212        }
213    }
214}
215
216impl std::error::Error for Error {}
217
218impl From<FDomainError> for Error {
219    fn from(other: FDomainError) -> Self {
220        Self::FDomain(other)
221    }
222}
223
224impl From<::fidl::Error> for Error {
225    fn from(other: ::fidl::Error) -> Self {
226        Self::Protocol(other)
227    }
228}
229
230impl From<WriteSocketError> for Error {
231    fn from(other: WriteSocketError) -> Self {
232        Self::SocketWrite(other)
233    }
234}
235
236impl From<WriteChannelError> for Error {
237    fn from(other: WriteChannelError) -> Self {
238        Self::ChannelWrite(other)
239    }
240}
241
242/// An error emitted internally by the client. Similar to [`Error`] but does not
243/// contain several variants which are irrelevant in the contexts where it is
244/// used.
245#[derive(Clone)]
246enum InnerError {
247    Protocol(::fidl::Error),
248    ProtocolStreamEventIncompatible,
249    Transport(Option<Arc<std::io::Error>>),
250}
251
252impl From<InnerError> for Error {
253    fn from(other: InnerError) -> Self {
254        match other {
255            InnerError::Protocol(p) => Error::Protocol(p),
256            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
257            InnerError::Transport(t) => Error::Transport(t),
258        }
259    }
260}
261
262impl From<::fidl::Error> for InnerError {
263    fn from(other: ::fidl::Error) -> Self {
264        InnerError::Protocol(other)
265    }
266}
267
268// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
269/// Implemented by objects which provide a transport over which we can speak the
270/// FDomain protocol.
271///
272/// The implementer must provide two things:
273/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
274///    via the `Stream` trait, which this trait requires.
275/// 2) A way to send messages. This is provided by implementing the
276///    `poll_send_message` method.
277pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
278    /// Attempt to send a message asynchronously. Messages should be sent so
279    /// that they arrive at the target in order.
280    fn poll_send_message(
281        self: Pin<&mut Self>,
282        msg: &[u8],
283        ctx: &mut Context<'_>,
284    ) -> Poll<Result<(), Option<std::io::Error>>>;
285
286    /// Optional debug information outlet.
287    fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        Ok(())
289    }
290
291    /// Whether `debug_fmt` does anything.
292    fn has_debug_fmt(&self) -> bool {
293        false
294    }
295}
296
297/// Wrapper for an `FDomainTransport` implementer that:
298/// 1) Provides a queue for outgoing messages so we need not have an await point
299///    when we submit a message.
300/// 2) Drops the transport on error, then returns the last observed error for
301///    all future operations.
302enum Transport {
303    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
304    Error(InnerError),
305}
306
307impl Transport {
308    /// Get the failure mode of the transport if it has failed.
309    fn error(&self) -> Option<InnerError> {
310        match self {
311            Transport::Transport(_, _, _) => None,
312            Transport::Error(inner_error) => Some(inner_error.clone()),
313        }
314    }
315
316    /// Enqueue a message to be sent on this transport.
317    fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
318        match self {
319            Transport::Transport(_, v, w) => {
320                v.push_back(msg);
321                w.drain(..).for_each(Waker::wake);
322                Ok(())
323            }
324            Transport::Error(e) => Err(e.clone()),
325        }
326    }
327
328    /// Push messages in the send queue out through the transport.
329    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
330        match self {
331            Transport::Error(e) => Poll::Ready(e.clone()),
332            Transport::Transport(t, v, w) => {
333                while let Some(msg) = v.front() {
334                    match t.as_mut().poll_send_message(msg, ctx) {
335                        Poll::Ready(Ok(())) => {
336                            v.pop_front();
337                        }
338                        Poll::Ready(Err(e)) => {
339                            let e = e.map(Arc::new);
340                            return Poll::Ready(InnerError::Transport(e));
341                        }
342                        Poll::Pending => return Poll::Pending,
343                    }
344                }
345
346                if v.is_empty() {
347                    w.push(ctx.waker().clone());
348                } else {
349                    ctx.waker().wake_by_ref();
350                }
351                Poll::Pending
352            }
353        }
354    }
355
356    /// Get the next incoming message from the transport.
357    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
358        match self {
359            Transport::Error(e) => Poll::Ready(Err(e.clone())),
360            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
361                Some(Ok(x)) => Poll::Ready(Ok(x)),
362                Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
363                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
364            },
365        }
366    }
367}
368
369impl Drop for Transport {
370    fn drop(&mut self) {
371        if let Transport::Transport(_, _, wakers) = self {
372            wakers.drain(..).for_each(Waker::wake);
373        }
374    }
375}
376
377/// State of a socket that is or has been read from.
378struct SocketReadState {
379    wakers: Vec<Waker>,
380    queued: VecDeque<Result<proto::SocketData, Error>>,
381    read_request_pending: bool,
382    is_streaming: bool,
383}
384
385impl SocketReadState {
386    /// Handle an incoming message, which is either a channel streaming event or
387    /// response to a `ChannelRead` request.
388    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
389        self.queued.push_back(msg);
390        std::mem::replace(&mut self.wakers, Vec::new())
391    }
392}
393
394/// State of a channel that is or has been read from.
395struct ChannelReadState {
396    wakers: Vec<Waker>,
397    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
398    read_request_pending: bool,
399    is_streaming: bool,
400}
401
402impl ChannelReadState {
403    /// Handle an incoming message, which is either a channel streaming event or
404    /// response to a `ChannelRead` request.
405    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
406        self.queued.push_back(msg);
407        std::mem::replace(&mut self.wakers, Vec::new())
408    }
409}
410
411/// Lock-protected interior of `Client`
412struct ClientInner {
413    transport: Transport,
414    transactions: HashMap<NonZeroU32, responder::Responder>,
415    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
416    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
417    next_tx_id: u32,
418    waiting_to_close: Vec<proto::HandleId>,
419    waiting_to_close_waker: Waker,
420
421    /// There is a lock around `ClientInner`, and sometimes the FIDL bindings
422    /// give us wakers that want to do handle operations synchronously on wake,
423    /// which means we can double-take the lock if we wake a waker while we hold
424    /// it. This is a place to store wakers that we'd like to be woken as soon
425    /// as we're not holding that lock, to avoid these weird reentrancy issues.
426    wakers_to_wake: Vec<Waker>,
427}
428
429impl ClientInner {
430    /// Serialize and enqueue a new transaction, including header and transaction ID.
431    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
432        if ordinal != ordinals::CLOSE {
433            self.process_waiting_to_close();
434        }
435        let tx_id = self.next_tx_id;
436
437        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
438        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
439        self.next_tx_id += 1;
440        if let Err(e) = self.transport.push_msg(msg.into()) {
441            let _ = responder.handle(self, Err(e.into()));
442        } else {
443            assert!(
444                self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
445                "Allocated same tx id twice!"
446            );
447        }
448    }
449
450    fn process_waiting_to_close(&mut self) {
451        if !self.waiting_to_close.is_empty() {
452            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
453            // We've dropped the handle object. Nobody is going to wait to read
454            // the buffers anymore. This is a safe time to drop the read state.
455            for handle in &handles {
456                let _ = self.channel_read_states.remove(handle);
457                let _ = self.socket_read_states.remove(handle);
458            }
459            self.request(
460                ordinals::CLOSE,
461                proto::FDomainCloseRequest { handles },
462                Responder::Ignore,
463            );
464        }
465    }
466
467    /// Polls the underlying transport to ensure any incoming or outgoing
468    /// messages are processed as far as possible. Errors if the transport has failed.
469    fn try_poll_transport(
470        &mut self,
471        ctx: &mut Context<'_>,
472    ) -> Poll<Result<Infallible, InnerError>> {
473        self.process_waiting_to_close();
474
475        self.waiting_to_close_waker = ctx.waker().clone();
476
477        loop {
478            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
479                return Poll::Ready(Err(e));
480            }
481            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
482                return Poll::Pending;
483            };
484            let data = result?;
485            let (header, data) = fidl_message::decode_transaction_header(&data)?;
486
487            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
488                let wakers = self.process_event(header, data)?;
489                self.wakers_to_wake.extend(wakers);
490                continue;
491            };
492
493            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
494            tx.handle(self, Ok((header, data)))?;
495        }
496    }
497
498    /// Process an incoming message that arose from an event rather than a transaction reply.
499    fn process_event(
500        &mut self,
501        header: TransactionHeader,
502        data: &[u8],
503    ) -> Result<Vec<Waker>, InnerError> {
504        match header.ordinal {
505            ordinals::ON_SOCKET_STREAMING_DATA => {
506                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
507                    header, data,
508                )?;
509                let o =
510                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
511                        wakers: Vec::new(),
512                        queued: VecDeque::new(),
513                        is_streaming: false,
514                        read_request_pending: false,
515                    });
516                match msg.socket_message {
517                    proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
518                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
519                        let ret = if let Some(error) = error {
520                            o.handle_incoming_message(Err(Error::FDomain(*error)))
521                        } else {
522                            Vec::new()
523                        };
524                        o.is_streaming = false;
525                        Ok(ret)
526                    }
527                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
528                }
529            }
530            ordinals::ON_CHANNEL_STREAMING_DATA => {
531                let msg = fidl_message::decode_message::<
532                    proto::ChannelOnChannelStreamingDataRequest,
533                >(header, data)?;
534                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
535                    ChannelReadState {
536                        wakers: Vec::new(),
537                        queued: VecDeque::new(),
538                        is_streaming: false,
539                        read_request_pending: false,
540                    }
541                });
542                match msg.channel_sent {
543                    proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
544                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
545                        let ret = if let Some(error) = error {
546                            o.handle_incoming_message(Err(Error::FDomain(*error)))
547                        } else {
548                            Vec::new()
549                        };
550                        o.is_streaming = false;
551                        Ok(ret)
552                    }
553                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
554                }
555            }
556            _ => Err(::fidl::Error::UnknownOrdinal {
557                ordinal: header.ordinal,
558                protocol_name:
559                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
560            }
561            .into()),
562        }
563    }
564
565    /// Polls the underlying transport to ensure any incoming or outgoing
566    /// messages are processed as far as possible. If a failure occurs, puts the
567    /// transport into an error state and fails all pending transactions.
568    fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
569        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
570            for (_, v) in std::mem::take(&mut self.transactions) {
571                let _ = v.handle(self, Err(e.clone()));
572            }
573            for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
574                state.queued.push_back(Err(Error::from(e.clone())));
575                self.wakers_to_wake.extend(state.wakers);
576            }
577            for (_, mut state) in self.channel_read_states.drain() {
578                state.queued.push_back(Err(Error::from(e.clone())));
579                self.wakers_to_wake.extend(state.wakers);
580            }
581            if matches!(self.transport, Transport::Transport(_, _, _)) {
582                self.transport = Transport::Error(e);
583            }
584
585            Poll::Ready(())
586        } else {
587            Poll::Pending
588        }
589    }
590
591    /// Handles the response to a `SocketRead` protocol message.
592    pub(crate) fn handle_socket_read_response(
593        &mut self,
594        msg: Result<proto::SocketData, Error>,
595        id: proto::HandleId,
596    ) {
597        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
598            wakers: Vec::new(),
599            queued: VecDeque::new(),
600            is_streaming: false,
601            read_request_pending: false,
602        });
603        let wakers = state.handle_incoming_message(msg);
604        self.wakers_to_wake.extend(wakers);
605        state.read_request_pending = false;
606    }
607
608    /// Handles the response to a `ChannelRead` protocol message.
609    pub(crate) fn handle_channel_read_response(
610        &mut self,
611        msg: Result<proto::ChannelMessage, Error>,
612        id: proto::HandleId,
613    ) {
614        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
615            wakers: Vec::new(),
616            queued: VecDeque::new(),
617            is_streaming: false,
618            read_request_pending: false,
619        });
620        let wakers = state.handle_incoming_message(msg);
621        self.wakers_to_wake.extend(wakers);
622        state.read_request_pending = false;
623    }
624}
625
626impl Drop for ClientInner {
627    fn drop(&mut self) {
628        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
629        for responder in responders {
630            let _ = responder.handle(self, Err(InnerError::Transport(None)));
631        }
632        for state in self.channel_read_states.values_mut() {
633            state.wakers.drain(..).for_each(Waker::wake);
634        }
635        for state in self.socket_read_states.values_mut() {
636            state.wakers.drain(..).for_each(Waker::wake);
637        }
638        self.waiting_to_close_waker.wake_by_ref();
639        self.wakers_to_wake.drain(..).for_each(Waker::wake);
640    }
641}
642
643/// Represents a connection to an FDomain.
644///
645/// The client is constructed by passing it a transport object which represents
646/// the raw connection to the remote FDomain. The `Client` wrapper then allows
647/// us to construct and use handles which behave similarly to their counterparts
648/// on a Fuchsia device.
649pub struct Client(pub(crate) Mutex<ClientInner>);
650
651impl std::fmt::Debug for Client {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        let inner = self.0.lock();
654        match &inner.transport {
655            Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
656                write!(f, "Client(")?;
657                transport.debug_fmt(f)?;
658                write!(f, ")")
659            }
660            Transport::Error(error) => {
661                let error = Error::from(error.clone());
662                write!(f, "Client(Failed: {error})")
663            }
664            _ => f.debug_tuple("Client").field(&"<transport>").finish(),
665        }
666    }
667}
668
669/// A client which is always disconnected. Handles that lose their clients
670/// connect to this client instead, which always returns a "Client Lost"
671/// transport failure.
672pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
673    Arc::new(Client(Mutex::new(ClientInner {
674        transport: Transport::Error(InnerError::Transport(None)),
675        transactions: HashMap::new(),
676        channel_read_states: HashMap::new(),
677        socket_read_states: HashMap::new(),
678        next_tx_id: 1,
679        waiting_to_close: Vec::new(),
680        waiting_to_close_waker: std::task::Waker::noop().clone(),
681        wakers_to_wake: Vec::new(),
682    })))
683});
684
685/// A wrapper around the FDomain client background future that ensures
686/// all pending transactions and reads are failed if the loop is dropped.
687///
688/// This prevents hangs when the transport is abruptly closed (e.g. during target reboot)
689/// by waking up any futures waiting for responses or data on channels/sockets.
690pub struct ClientLoop {
691    client: Weak<Client>,
692    fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
693}
694
695impl Future for ClientLoop {
696    type Output = ();
697    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
698        self.fut.as_mut().poll(cx)
699    }
700}
701
702impl Drop for ClientLoop {
703    fn drop(&mut self) {
704        let Some(client) = self.client.upgrade() else {
705            return;
706        };
707
708        let (channel_read_states, socket_read_states, deferred_wakers) = {
709            let mut inner = client.0.lock();
710            let transactions = std::mem::take(&mut inner.transactions);
711            log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
712            for (_, v) in transactions {
713                let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
714            }
715
716            let channel_read_states = std::mem::take(&mut inner.channel_read_states);
717            let socket_read_states = std::mem::take(&mut inner.socket_read_states);
718
719            let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
720
721            (channel_read_states, socket_read_states, deferred_wakers)
722        };
723
724        log::debug!("Failing reads on {} channels", channel_read_states.len());
725        for (_, mut state) in channel_read_states {
726            state.queued.push_back(Err(Error::Transport(None)));
727            state.wakers.into_iter().for_each(Waker::wake);
728        }
729
730        log::debug!("Failing reads on {} sockets", socket_read_states.len());
731        for (_, mut state) in socket_read_states {
732            state.queued.push_back(Err(Error::Transport(None)));
733            state.wakers.into_iter().for_each(Waker::wake);
734        }
735
736        deferred_wakers.into_iter().for_each(Waker::wake);
737    }
738}
739
740impl Client {
741    pub fn transport_status(&self) -> Result<()> {
742        match &self.0.lock().transport {
743            Transport::Error(e) => Err(e.clone().into()),
744            Transport::Transport(_, _, _) => Ok(()),
745        }
746    }
747    /// Create a new FDomain client. The `transport` argument should contain the
748    /// established connection to the target, ready to communicate the FDomain
749    /// protocol.
750    ///
751    /// The second return item is a future that must be polled to keep
752    /// transactions running.
753    pub fn new(
754        transport: impl FDomainTransport + 'static,
755    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
756        let ret = Arc::new(Client(Mutex::new(ClientInner {
757            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
758            transactions: HashMap::new(),
759            socket_read_states: HashMap::new(),
760            channel_read_states: HashMap::new(),
761            next_tx_id: 1,
762            waiting_to_close: Vec::new(),
763            waiting_to_close_waker: std::task::Waker::noop().clone(),
764            wakers_to_wake: Vec::new(),
765        })));
766
767        let client_weak = Arc::downgrade(&ret);
768        let fut = futures::future::poll_fn(move |ctx| {
769            let Some(client) = client_weak.upgrade() else {
770                return Poll::Ready(());
771            };
772
773            let (ret, deferred_wakers) = {
774                let mut inner = client.0.lock();
775                let ret = inner.poll_transport(ctx);
776                let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
777                (ret, deferred_wakers)
778            };
779            deferred_wakers.into_iter().for_each(Waker::wake);
780            ret
781        });
782
783        let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
784
785        (ret, client_loop)
786    }
787
788    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
789    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
790        let new_handle = self.new_hid();
791        self.transaction(
792            ordinals::GET_NAMESPACE,
793            proto::FDomainGetNamespaceRequest { new_handle },
794            Responder::Namespace,
795        )
796        .await?;
797        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
798    }
799
800    /// Create a new channel in the connected FDomain.
801    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
802        let id_a = self.new_hid();
803        let id_b = self.new_hid();
804        let fut = self.transaction(
805            ordinals::CREATE_CHANNEL,
806            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
807            Responder::CreateChannel,
808        );
809
810        fuchsia_async::Task::spawn(async move {
811            if let Err(e) = fut.await {
812                log::debug!("FDomain channel creation failed: {e}");
813            }
814        })
815        .detach();
816
817        (
818            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
819            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
820        )
821    }
822
823    /// Creates client and server endpoints connected to by a channel.
824    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
825        self: &Arc<Self>,
826    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
827        let (client, server) = self.create_channel();
828        let client_end = crate::fidl::ClientEnd::<F>::new(client);
829        let server_end = crate::fidl::ServerEnd::new(server);
830        (client_end, server_end)
831    }
832
833    /// Creates a client proxy and a server endpoint connected by a channel.
834    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
835        self: &Arc<Self>,
836    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
837        let (client_end, server_end) = self.create_endpoints::<F>();
838        (client_end.into_proxy(), server_end)
839    }
840
841    /// Creates a client proxy and a server request stream connected by a channel.
842    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
843        self: &Arc<Self>,
844    ) -> (F::Proxy, F::RequestStream) {
845        let (client_end, server_end) = self.create_endpoints::<F>();
846        (client_end.into_proxy(), server_end.into_stream())
847    }
848
849    /// Creates a client end and a server request stream connected by a channel.
850    pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
851        self: &Arc<Self>,
852    ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
853        let (client_end, server_end) = self.create_endpoints::<F>();
854        (client_end, server_end.into_stream())
855    }
856
857    /// Create a new socket in the connected FDomain.
858    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
859        let id_a = self.new_hid();
860        let id_b = self.new_hid();
861        let fut = self.transaction(
862            ordinals::CREATE_SOCKET,
863            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
864            Responder::CreateSocket,
865        );
866
867        fuchsia_async::Task::spawn(async move {
868            if let Err(e) = fut.await {
869                log::debug!("FDomain socket creation failed: {e}");
870            }
871        })
872        .detach();
873
874        (
875            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
876            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
877        )
878    }
879
880    /// Create a new streaming socket in the connected FDomain.
881    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
882        self.create_socket(proto::SocketType::Stream)
883    }
884
885    /// Create a new datagram socket in the connected FDomain.
886    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
887        self.create_socket(proto::SocketType::Datagram)
888    }
889
890    /// Create a new event pair in the connected FDomain.
891    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
892        let id_a = self.new_hid();
893        let id_b = self.new_hid();
894        let fut = self.transaction(
895            ordinals::CREATE_EVENT_PAIR,
896            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
897            Responder::CreateEventPair,
898        );
899
900        fuchsia_async::Task::spawn(async move {
901            if let Err(e) = fut.await {
902                log::debug!("FDomain event pair creation failed: {e}");
903            }
904        })
905        .detach();
906
907        (
908            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
909            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
910        )
911    }
912
913    /// Create a new event handle in the connected FDomain.
914    pub fn create_event(self: &Arc<Self>) -> Event {
915        let id = self.new_hid();
916        let fut = self.transaction(
917            ordinals::CREATE_EVENT,
918            proto::EventCreateEventRequest { handle: id },
919            Responder::CreateEvent,
920        );
921
922        fuchsia_async::Task::spawn(async move {
923            if let Err(e) = fut.await {
924                log::debug!("FDomain event creation failed: {e}");
925            }
926        })
927        .detach();
928
929        Event(Handle { id: id.id, client: Arc::downgrade(self) })
930    }
931
932    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
933    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
934        // TODO: On the target side we have to keep a table of these which means
935        // we can automatically detect collisions in the random value. On the
936        // client side we'd have to add a whole data structure just for that
937        // purpose. Should we?
938        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
939    }
940
941    /// Create a future which sends a FIDL message to the connected FDomain and
942    /// waits for a response.
943    ///
944    /// Calling this method queues the transaction synchronously. Awaiting is
945    /// only necessary to wait for the response.
946    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
947        self: &Arc<Self>,
948        ordinal: u64,
949        request: S,
950        f: F,
951    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
952    where
953        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
954    {
955        let mut inner = self.0.lock();
956
957        let (sender, receiver) = futures::channel::oneshot::channel();
958        inner.request(ordinal, request, f(sender));
959        receiver.map(|x| x.expect("Oneshot went away without reply!"))
960    }
961
962    /// Start getting streaming events for socket reads.
963    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
964        let mut inner = self.0.lock();
965        if let Some(e) = inner.transport.error() {
966            return Err(e.into());
967        }
968
969        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
970            wakers: Vec::new(),
971            queued: VecDeque::new(),
972            is_streaming: false,
973            read_request_pending: false,
974        });
975
976        assert!(!state.is_streaming, "Initiated streaming twice!");
977        state.is_streaming = true;
978
979        inner.request(
980            ordinals::READ_SOCKET_STREAMING_START,
981            proto::SocketReadSocketStreamingStartRequest { handle: id },
982            Responder::Ignore,
983        );
984        Ok(())
985    }
986
987    /// Stop getting streaming events for socket reads. Doesn't return errors
988    /// because it's exclusively called in destructors where we have nothing to
989    /// do with them.
990    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
991        let mut inner = self.0.lock();
992        if let Some(state) = inner.socket_read_states.get_mut(&id) {
993            if state.is_streaming {
994                state.is_streaming = false;
995                // TODO: Log?
996                let _ = inner.request(
997                    ordinals::READ_SOCKET_STREAMING_STOP,
998                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
999                    Responder::Ignore,
1000                );
1001            }
1002        }
1003    }
1004
1005    /// Start getting streaming events for socket reads.
1006    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1007        let mut inner = self.0.lock();
1008        if let Some(e) = inner.transport.error() {
1009            return Err(e.into());
1010        }
1011        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1012            wakers: Vec::new(),
1013            queued: VecDeque::new(),
1014            is_streaming: false,
1015            read_request_pending: false,
1016        });
1017
1018        assert!(!state.is_streaming, "Initiated streaming twice!");
1019        state.is_streaming = true;
1020
1021        inner.request(
1022            ordinals::READ_CHANNEL_STREAMING_START,
1023            proto::ChannelReadChannelStreamingStartRequest { handle: id },
1024            Responder::Ignore,
1025        );
1026
1027        Ok(())
1028    }
1029
1030    /// Stop getting streaming events for socket reads. Doesn't return errors
1031    /// because it's exclusively called in destructors where we have nothing to
1032    /// do with them.
1033    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1034        let mut inner = self.0.lock();
1035        if let Some(state) = inner.channel_read_states.get_mut(&id) {
1036            if state.is_streaming {
1037                state.is_streaming = false;
1038                // TODO: Log?
1039                let _ = inner.request(
1040                    ordinals::READ_CHANNEL_STREAMING_STOP,
1041                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
1042                    Responder::Ignore,
1043                );
1044            }
1045        }
1046    }
1047
1048    /// Execute a read from a channel.
1049    pub(crate) fn poll_socket(
1050        &self,
1051        id: proto::HandleId,
1052        ctx: &mut Context<'_>,
1053        out: &mut [u8],
1054    ) -> Poll<Result<usize, Error>> {
1055        let mut inner = self.0.lock();
1056        if let Some(error) = inner.transport.error() {
1057            return Poll::Ready(Err(error.into()));
1058        }
1059
1060        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1061            wakers: Vec::new(),
1062            queued: VecDeque::new(),
1063            is_streaming: false,
1064            read_request_pending: false,
1065        });
1066
1067        if let Some(got) = state.queued.front_mut() {
1068            match got.as_mut() {
1069                Ok(data) => {
1070                    let read_size = std::cmp::min(data.data.len(), out.len());
1071                    out[..read_size].copy_from_slice(&data.data[..read_size]);
1072
1073                    if data.data.len() > read_size && !data.is_datagram {
1074                        let _ = data.data.drain(..read_size);
1075                    } else {
1076                        let _ = state.queued.pop_front();
1077                    }
1078
1079                    return Poll::Ready(Ok(read_size));
1080                }
1081                Err(_) => {
1082                    let err = state.queued.pop_front().unwrap().unwrap_err();
1083                    return Poll::Ready(Err(err));
1084                }
1085            }
1086        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1087            state.wakers.push(ctx.waker().clone());
1088        }
1089
1090        if !state.read_request_pending && !state.is_streaming {
1091            inner.request(
1092                ordinals::READ_SOCKET,
1093                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1094                Responder::ReadSocket(id),
1095            );
1096        }
1097
1098        Poll::Pending
1099    }
1100
1101    /// Execute a read from a channel.
1102    pub(crate) fn poll_channel(
1103        &self,
1104        id: proto::HandleId,
1105        ctx: &mut Context<'_>,
1106        for_stream: bool,
1107    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1108        let mut inner = self.0.lock();
1109        if let Some(error) = inner.transport.error() {
1110            return Poll::Ready(Some(Err(error.into())));
1111        }
1112
1113        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1114            wakers: Vec::new(),
1115            queued: VecDeque::new(),
1116            is_streaming: false,
1117            read_request_pending: false,
1118        });
1119
1120        if let Some(got) = state.queued.pop_front() {
1121            return Poll::Ready(Some(got));
1122        } else if for_stream && !state.is_streaming {
1123            return Poll::Ready(None);
1124        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1125            state.wakers.push(ctx.waker().clone());
1126        }
1127
1128        if !state.read_request_pending && !state.is_streaming {
1129            inner.request(
1130                ordinals::READ_CHANNEL,
1131                proto::ChannelReadChannelRequest { handle: id },
1132                Responder::ReadChannel(id),
1133            );
1134        }
1135
1136        Poll::Pending
1137    }
1138
1139    /// Check whether this channel is streaming
1140    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1141        let inner = self.0.lock();
1142        let Some(state) = inner.channel_read_states.get(&id) else {
1143            return false;
1144        };
1145        state.is_streaming
1146    }
1147
1148    /// Check that all the given handles are safe to transfer through a channel
1149    /// e.g. that there's no chance of in-flight reads getting dropped.
1150    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1151        let inner = self.0.lock();
1152        match handles {
1153            proto::Handles::Handles(handles) => {
1154                for handle in handles {
1155                    assert!(
1156                        !(inner.channel_read_states.contains_key(handle)
1157                            || inner.socket_read_states.contains_key(handle)),
1158                        "Tried to transfer handle after reading"
1159                    );
1160                }
1161            }
1162            proto::Handles::Dispositions(dispositions) => {
1163                for disposition in dispositions {
1164                    match &disposition.handle {
1165                        proto::HandleOp::Move_(handle) => assert!(
1166                            !(inner.channel_read_states.contains_key(handle)
1167                                || inner.socket_read_states.contains_key(handle)),
1168                            "Tried to transfer handle after reading"
1169                        ),
1170                        // Pretty sure this should be fine regardless of read state.
1171                        proto::HandleOp::Duplicate(_) => (),
1172                    }
1173                }
1174            }
1175        }
1176    }
1177}