Skip to main content

fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_sync::Mutex;
8use futures::FutureExt;
9use futures::channel::oneshot::Sender as OneshotSender;
10use futures::stream::Stream as StreamTrait;
11use std::collections::{HashMap, VecDeque};
12use std::convert::Infallible;
13use std::future::Future;
14use std::num::NonZeroU32;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Weak};
17use std::task::{Context, Poll, Waker, ready};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46// Unsupported handle types.
47#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Exception;
51#[rustfmt::skip]
52pub use Handle as Fifo;
53#[rustfmt::skip]
54pub use Handle as Iob;
55#[rustfmt::skip]
56pub use Handle as Job;
57#[rustfmt::skip]
58pub use Handle as Process;
59#[rustfmt::skip]
60pub use Handle as Resource;
61#[rustfmt::skip]
62pub use Handle as Stream;
63#[rustfmt::skip]
64pub use Handle as Thread;
65#[rustfmt::skip]
66pub use Handle as Vmar;
67#[rustfmt::skip]
68pub use Handle as Vmo;
69#[rustfmt::skip]
70pub use Handle as Counter;
71
72use proto::f_domain_ordinals as ordinals;
73
74fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75    match error {
76        FDomainError::TargetError(e) => {
77            let e = zx_status::Status::from_raw(*e);
78            write!(f, "Target-side error {e}")
79        }
80        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
81            write!(f, "Tried to use invalid handle id {id}")
82        }
83        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
84            f,
85            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
86        ),
87        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
88            write!(f, "Handle is occupied delivering streaming reads")
89        }
90        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
91            write!(f, "No streaming read was in progress")
92        }
93        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
94            write!(
95                f,
96                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
97            )
98        }
99        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
100            if *same_call {
101                write!(f, "Tried to create two or more new handles with the same id {id}")
102            } else {
103                write!(
104                    f,
105                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
106                )
107            }
108        }
109        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
110            write!(f, "Tried to write a channel into itself")
111        }
112        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
113            write!(f, "Handle closed while being read")
114        }
115        _ => todo!(),
116    }
117}
118
119/// Result type alias.
120pub type Result<T, E = Error> = std::result::Result<T, E>;
121
122/// Error type emitted by FDomain operations.
123#[derive(Clone)]
124pub enum Error {
125    SocketWrite(WriteSocketError),
126    ChannelWrite(WriteChannelError),
127    FDomain(FDomainError),
128    Protocol(::fidl::Error),
129    ProtocolObjectTypeIncompatible,
130    ProtocolRightsIncompatible,
131    ProtocolSignalsIncompatible,
132    ProtocolStreamEventIncompatible,
133    Transport(Option<Arc<std::io::Error>>),
134    ConnectionMismatch,
135    StreamingAborted,
136}
137
138impl std::fmt::Display for Error {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        match self {
141            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
142                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
143                write_fdomain_error(error, f)
144            }
145            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
146                write!(f, "While writing channel: ")?;
147                write_fdomain_error(error, f)
148            }
149            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
150                write!(f, "Couldn't write all handles into a channel:")?;
151                for (pos, error) in
152                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
153                {
154                    write!(f, "\n  Handle in position {pos}: ")?;
155                    write_fdomain_error(error, f)?;
156                }
157                Ok(())
158            }
159            Self::ProtocolObjectTypeIncompatible => {
160                write!(
161                    f,
162                    "The FDomain protocol received an unrecognized or incompatible object type"
163                )
164            }
165            Self::ProtocolRightsIncompatible => {
166                write!(
167                    f,
168                    "The FDomain protocol received unrecognized or incompatible handle rights"
169                )
170            }
171            Self::ProtocolSignalsIncompatible => {
172                write!(f, "The FDomain protocol received unrecognized or incompatible signals")
173            }
174            Self::ProtocolStreamEventIncompatible => {
175                write!(
176                    f,
177                    "The FDomain protocol received an unrecognized or incompatible streaming IO event"
178                )
179            }
180            Self::FDomain(e) => write_fdomain_error(e, f),
181            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
182            Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
183            Self::Transport(None) => {
184                write!(f, "Transport error: Connection to the device has been lost")
185            }
186            Self::ConnectionMismatch => {
187                write!(
188                    f,
189                    "Tried to use an FDomain handle with a different connection than the one it was created on"
190                )
191            }
192            Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
193        }
194    }
195}
196
197impl std::fmt::Debug for Error {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        match self {
200            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
201            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
202            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
203            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
204            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
205            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
206            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
207            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
208            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
209            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
210            Self::StreamingAborted => write!(f, "StreamingAborted"),
211        }
212    }
213}
214
215impl std::error::Error for Error {}
216
217impl From<FDomainError> for Error {
218    fn from(other: FDomainError) -> Self {
219        Self::FDomain(other)
220    }
221}
222
223impl From<::fidl::Error> for Error {
224    fn from(other: ::fidl::Error) -> Self {
225        Self::Protocol(other)
226    }
227}
228
229impl From<WriteSocketError> for Error {
230    fn from(other: WriteSocketError) -> Self {
231        Self::SocketWrite(other)
232    }
233}
234
235impl From<WriteChannelError> for Error {
236    fn from(other: WriteChannelError) -> Self {
237        Self::ChannelWrite(other)
238    }
239}
240
241/// An error emitted internally by the client. Similar to [`Error`] but does not
242/// contain several variants which are irrelevant in the contexts where it is
243/// used.
244#[derive(Clone)]
245enum InnerError {
246    Protocol(::fidl::Error),
247    ProtocolStreamEventIncompatible,
248    Transport(Option<Arc<std::io::Error>>),
249}
250
251impl From<InnerError> for Error {
252    fn from(other: InnerError) -> Self {
253        match other {
254            InnerError::Protocol(p) => Error::Protocol(p),
255            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
256            InnerError::Transport(t) => Error::Transport(t),
257        }
258    }
259}
260
261impl From<::fidl::Error> for InnerError {
262    fn from(other: ::fidl::Error) -> Self {
263        InnerError::Protocol(other)
264    }
265}
266
267// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
268/// Implemented by objects which provide a transport over which we can speak the
269/// FDomain protocol.
270///
271/// The implementer must provide two things:
272/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
273///    via the `Stream` trait, which this trait requires.
274/// 2) A way to send messages. This is provided by implementing the
275///    `poll_send_message` method.
276pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
277    /// Attempt to send a message asynchronously. Messages should be sent so
278    /// that they arrive at the target in order.
279    fn poll_send_message(
280        self: Pin<&mut Self>,
281        msg: &[u8],
282        ctx: &mut Context<'_>,
283    ) -> Poll<Result<(), Option<std::io::Error>>>;
284
285    /// Optional debug information outlet.
286    fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        Ok(())
288    }
289
290    /// Whether `debug_fmt` does anything.
291    fn has_debug_fmt(&self) -> bool {
292        false
293    }
294}
295
296/// Wrapper for an `FDomainTransport` implementer that:
297/// 1) Provides a queue for outgoing messages so we need not have an await point
298///    when we submit a message.
299/// 2) Drops the transport on error, then returns the last observed error for
300///    all future operations.
301enum Transport {
302    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
303    Error(InnerError),
304}
305
306impl Transport {
307    /// Get the failure mode of the transport if it has failed.
308    fn error(&self) -> Option<InnerError> {
309        match self {
310            Transport::Transport(_, _, _) => None,
311            Transport::Error(inner_error) => Some(inner_error.clone()),
312        }
313    }
314
315    /// Enqueue a message to be sent on this transport.
316    fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
317        match self {
318            Transport::Transport(_, v, w) => {
319                v.push_back(msg);
320                w.drain(..).for_each(Waker::wake);
321                Ok(())
322            }
323            Transport::Error(e) => Err(e.clone()),
324        }
325    }
326
327    /// Push messages in the send queue out through the transport.
328    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
329        match self {
330            Transport::Error(e) => Poll::Ready(e.clone()),
331            Transport::Transport(t, v, w) => {
332                while let Some(msg) = v.front() {
333                    match t.as_mut().poll_send_message(msg, ctx) {
334                        Poll::Ready(Ok(())) => {
335                            v.pop_front();
336                        }
337                        Poll::Ready(Err(e)) => {
338                            let e = e.map(Arc::new);
339                            return Poll::Ready(InnerError::Transport(e));
340                        }
341                        Poll::Pending => return Poll::Pending,
342                    }
343                }
344
345                if v.is_empty() {
346                    w.push(ctx.waker().clone());
347                } else {
348                    ctx.waker().wake_by_ref();
349                }
350                Poll::Pending
351            }
352        }
353    }
354
355    /// Get the next incoming message from the transport.
356    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
357        match self {
358            Transport::Error(e) => Poll::Ready(Err(e.clone())),
359            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
360                Some(Ok(x)) => Poll::Ready(Ok(x)),
361                Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
362                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
363            },
364        }
365    }
366}
367
368impl Drop for Transport {
369    fn drop(&mut self) {
370        if let Transport::Transport(_, _, wakers) = self {
371            wakers.drain(..).for_each(Waker::wake);
372        }
373    }
374}
375
376/// State of a socket that is or has been read from.
377struct SocketReadState {
378    wakers: Vec<Waker>,
379    queued: VecDeque<Result<proto::SocketData, Error>>,
380    read_request_pending: bool,
381    is_streaming: bool,
382}
383
384impl SocketReadState {
385    /// Handle an incoming message, which is either a channel streaming event or
386    /// response to a `ChannelRead` request.
387    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
388        self.queued.push_back(msg);
389        std::mem::replace(&mut self.wakers, Vec::new())
390    }
391}
392
393/// State of a channel that is or has been read from.
394struct ChannelReadState {
395    wakers: Vec<Waker>,
396    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
397    read_request_pending: bool,
398    is_streaming: bool,
399}
400
401impl ChannelReadState {
402    /// Handle an incoming message, which is either a channel streaming event or
403    /// response to a `ChannelRead` request.
404    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
405        self.queued.push_back(msg);
406        std::mem::replace(&mut self.wakers, Vec::new())
407    }
408}
409
410/// Lock-protected interior of `Client`
411struct ClientInner {
412    transport: Transport,
413    transactions: HashMap<NonZeroU32, responder::Responder>,
414    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
415    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
416    next_tx_id: u32,
417    waiting_to_close: Vec<proto::HandleId>,
418    waiting_to_close_waker: Waker,
419
420    /// There is a lock around `ClientInner`, and sometimes the FIDL bindings
421    /// give us wakers that want to do handle operations synchronously on wake,
422    /// which means we can double-take the lock if we wake a waker while we hold
423    /// it. This is a place to store wakers that we'd like to be woken as soon
424    /// as we're not holding that lock, to avoid these weird reentrancy issues.
425    wakers_to_wake: Vec<Waker>,
426}
427
428impl ClientInner {
429    /// Serialize and enqueue a new transaction, including header and transaction ID.
430    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
431        if ordinal != ordinals::CLOSE {
432            self.process_waiting_to_close();
433        }
434        let tx_id = self.next_tx_id;
435
436        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
437        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
438        self.next_tx_id += 1;
439        if let Err(e) = self.transport.push_msg(msg.into()) {
440            let _ = responder.handle(self, Err(e.into()));
441        } else {
442            assert!(
443                self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
444                "Allocated same tx id twice!"
445            );
446        }
447    }
448
449    fn process_waiting_to_close(&mut self) {
450        if !self.waiting_to_close.is_empty() {
451            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
452            // We've dropped the handle object. Nobody is going to wait to read
453            // the buffers anymore. This is a safe time to drop the read state.
454            for handle in &handles {
455                let _ = self.channel_read_states.remove(handle);
456                let _ = self.socket_read_states.remove(handle);
457            }
458            self.request(
459                ordinals::CLOSE,
460                proto::FDomainCloseRequest { handles },
461                Responder::Ignore,
462            );
463        }
464    }
465
466    /// Polls the underlying transport to ensure any incoming or outgoing
467    /// messages are processed as far as possible. Errors if the transport has failed.
468    fn try_poll_transport(
469        &mut self,
470        ctx: &mut Context<'_>,
471    ) -> Poll<Result<Infallible, InnerError>> {
472        self.process_waiting_to_close();
473
474        self.waiting_to_close_waker = ctx.waker().clone();
475
476        loop {
477            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
478                return Poll::Ready(Err(e));
479            }
480            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
481                return Poll::Pending;
482            };
483            let data = result?;
484            let (header, data) = fidl_message::decode_transaction_header(&data)?;
485
486            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
487                let wakers = self.process_event(header, data)?;
488                self.wakers_to_wake.extend(wakers);
489                continue;
490            };
491
492            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
493            tx.handle(self, Ok((header, data)))?;
494        }
495    }
496
497    /// Process an incoming message that arose from an event rather than a transaction reply.
498    fn process_event(
499        &mut self,
500        header: TransactionHeader,
501        data: &[u8],
502    ) -> Result<Vec<Waker>, InnerError> {
503        match header.ordinal {
504            ordinals::ON_SOCKET_STREAMING_DATA => {
505                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
506                    header, data,
507                )?;
508                let o =
509                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
510                        wakers: Vec::new(),
511                        queued: VecDeque::new(),
512                        is_streaming: false,
513                        read_request_pending: false,
514                    });
515                match msg.socket_message {
516                    proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
517                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
518                        let ret = if let Some(error) = error {
519                            o.handle_incoming_message(Err(Error::FDomain(*error)))
520                        } else {
521                            Vec::new()
522                        };
523                        o.is_streaming = false;
524                        Ok(ret)
525                    }
526                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
527                }
528            }
529            ordinals::ON_CHANNEL_STREAMING_DATA => {
530                let msg = fidl_message::decode_message::<
531                    proto::ChannelOnChannelStreamingDataRequest,
532                >(header, data)?;
533                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
534                    ChannelReadState {
535                        wakers: Vec::new(),
536                        queued: VecDeque::new(),
537                        is_streaming: false,
538                        read_request_pending: false,
539                    }
540                });
541                match msg.channel_sent {
542                    proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
543                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
544                        let ret = if let Some(error) = error {
545                            o.handle_incoming_message(Err(Error::FDomain(*error)))
546                        } else {
547                            Vec::new()
548                        };
549                        o.is_streaming = false;
550                        Ok(ret)
551                    }
552                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
553                }
554            }
555            _ => Err(::fidl::Error::UnknownOrdinal {
556                ordinal: header.ordinal,
557                protocol_name:
558                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
559            }
560            .into()),
561        }
562    }
563
564    /// Polls the underlying transport to ensure any incoming or outgoing
565    /// messages are processed as far as possible. If a failure occurs, puts the
566    /// transport into an error state and fails all pending transactions.
567    fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
568        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
569            for (_, v) in std::mem::take(&mut self.transactions) {
570                let _ = v.handle(self, Err(e.clone()));
571            }
572            for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
573                state.queued.push_back(Err(Error::from(e.clone())));
574                self.wakers_to_wake.extend(state.wakers);
575            }
576            for (_, mut state) in self.channel_read_states.drain() {
577                state.queued.push_back(Err(Error::from(e.clone())));
578                self.wakers_to_wake.extend(state.wakers);
579            }
580            if matches!(self.transport, Transport::Transport(_, _, _)) {
581                self.transport = Transport::Error(e);
582            }
583
584            Poll::Ready(())
585        } else {
586            Poll::Pending
587        }
588    }
589
590    /// Handles the response to a `SocketRead` protocol message.
591    pub(crate) fn handle_socket_read_response(
592        &mut self,
593        msg: Result<proto::SocketData, Error>,
594        id: proto::HandleId,
595    ) {
596        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
597            wakers: Vec::new(),
598            queued: VecDeque::new(),
599            is_streaming: false,
600            read_request_pending: false,
601        });
602        let wakers = state.handle_incoming_message(msg);
603        self.wakers_to_wake.extend(wakers);
604        state.read_request_pending = false;
605    }
606
607    /// Handles the response to a `ChannelRead` protocol message.
608    pub(crate) fn handle_channel_read_response(
609        &mut self,
610        msg: Result<proto::ChannelMessage, Error>,
611        id: proto::HandleId,
612    ) {
613        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
614            wakers: Vec::new(),
615            queued: VecDeque::new(),
616            is_streaming: false,
617            read_request_pending: false,
618        });
619        let wakers = state.handle_incoming_message(msg);
620        self.wakers_to_wake.extend(wakers);
621        state.read_request_pending = false;
622    }
623}
624
625impl Drop for ClientInner {
626    fn drop(&mut self) {
627        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
628        for responder in responders {
629            let _ = responder.handle(self, Err(InnerError::Transport(None)));
630        }
631        for state in self.channel_read_states.values_mut() {
632            state.wakers.drain(..).for_each(Waker::wake);
633        }
634        for state in self.socket_read_states.values_mut() {
635            state.wakers.drain(..).for_each(Waker::wake);
636        }
637        self.waiting_to_close_waker.wake_by_ref();
638        self.wakers_to_wake.drain(..).for_each(Waker::wake);
639    }
640}
641
642/// Represents a connection to an FDomain.
643///
644/// The client is constructed by passing it a transport object which represents
645/// the raw connection to the remote FDomain. The `Client` wrapper then allows
646/// us to construct and use handles which behave similarly to their counterparts
647/// on a Fuchsia device.
648pub struct Client(pub(crate) Mutex<ClientInner>);
649
650impl std::fmt::Debug for Client {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        let inner = self.0.lock();
653        match &inner.transport {
654            Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
655                write!(f, "Client(")?;
656                transport.debug_fmt(f)?;
657                write!(f, ")")
658            }
659            Transport::Error(error) => {
660                let error = Error::from(error.clone());
661                write!(f, "Client(Failed: {error})")
662            }
663            _ => f.debug_tuple("Client").field(&"<transport>").finish(),
664        }
665    }
666}
667
668/// A client which is always disconnected. Handles that lose their clients
669/// connect to this client instead, which always returns a "Client Lost"
670/// transport failure.
671pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
672    Arc::new(Client(Mutex::new(ClientInner {
673        transport: Transport::Error(InnerError::Transport(None)),
674        transactions: HashMap::new(),
675        channel_read_states: HashMap::new(),
676        socket_read_states: HashMap::new(),
677        next_tx_id: 1,
678        waiting_to_close: Vec::new(),
679        waiting_to_close_waker: std::task::Waker::noop().clone(),
680        wakers_to_wake: Vec::new(),
681    })))
682});
683
684/// A wrapper around the FDomain client background future that ensures
685/// all pending transactions and reads are failed if the loop is dropped.
686///
687/// This prevents hangs when the transport is abruptly closed (e.g. during target reboot)
688/// by waking up any futures waiting for responses or data on channels/sockets.
689pub struct ClientLoop {
690    client: Weak<Client>,
691    fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
692}
693
694impl Future for ClientLoop {
695    type Output = ();
696    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
697        self.fut.as_mut().poll(cx)
698    }
699}
700
701impl Drop for ClientLoop {
702    fn drop(&mut self) {
703        let Some(client) = self.client.upgrade() else {
704            return;
705        };
706
707        let (channel_read_states, socket_read_states, deferred_wakers) = {
708            let mut inner = client.0.lock();
709            let transactions = std::mem::take(&mut inner.transactions);
710            log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
711            for (_, v) in transactions {
712                let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
713            }
714
715            let channel_read_states = std::mem::take(&mut inner.channel_read_states);
716            let socket_read_states = std::mem::take(&mut inner.socket_read_states);
717
718            let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
719
720            (channel_read_states, socket_read_states, deferred_wakers)
721        };
722
723        log::debug!("Failing reads on {} channels", channel_read_states.len());
724        for (_, mut state) in channel_read_states {
725            state.queued.push_back(Err(Error::Transport(None)));
726            state.wakers.into_iter().for_each(Waker::wake);
727        }
728
729        log::debug!("Failing reads on {} sockets", socket_read_states.len());
730        for (_, mut state) in socket_read_states {
731            state.queued.push_back(Err(Error::Transport(None)));
732            state.wakers.into_iter().for_each(Waker::wake);
733        }
734
735        deferred_wakers.into_iter().for_each(Waker::wake);
736    }
737}
738
739impl Client {
740    pub fn transport_status(&self) -> Result<()> {
741        match &self.0.lock().transport {
742            Transport::Error(e) => Err(e.clone().into()),
743            Transport::Transport(_, _, _) => Ok(()),
744        }
745    }
746    /// Create a new FDomain client. The `transport` argument should contain the
747    /// established connection to the target, ready to communicate the FDomain
748    /// protocol.
749    ///
750    /// The second return item is a future that must be polled to keep
751    /// transactions running.
752    pub fn new(
753        transport: impl FDomainTransport + 'static,
754    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
755        let ret = Arc::new(Client(Mutex::new(ClientInner {
756            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
757            transactions: HashMap::new(),
758            socket_read_states: HashMap::new(),
759            channel_read_states: HashMap::new(),
760            next_tx_id: 1,
761            waiting_to_close: Vec::new(),
762            waiting_to_close_waker: std::task::Waker::noop().clone(),
763            wakers_to_wake: Vec::new(),
764        })));
765
766        let client_weak = Arc::downgrade(&ret);
767        let fut = futures::future::poll_fn(move |ctx| {
768            let Some(client) = client_weak.upgrade() else {
769                return Poll::Ready(());
770            };
771
772            let (ret, deferred_wakers) = {
773                let mut inner = client.0.lock();
774                let ret = inner.poll_transport(ctx);
775                let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
776                (ret, deferred_wakers)
777            };
778            deferred_wakers.into_iter().for_each(Waker::wake);
779            ret
780        });
781
782        let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
783
784        (ret, client_loop)
785    }
786
787    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
788    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
789        let new_handle = self.new_hid();
790        self.transaction(
791            ordinals::GET_NAMESPACE,
792            proto::FDomainGetNamespaceRequest { new_handle },
793            Responder::Namespace,
794        )
795        .await?;
796        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
797    }
798
799    /// Create a new channel in the connected FDomain.
800    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
801        let id_a = self.new_hid();
802        let id_b = self.new_hid();
803        let fut = self.transaction(
804            ordinals::CREATE_CHANNEL,
805            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
806            Responder::CreateChannel,
807        );
808
809        fuchsia_async::Task::spawn(async move {
810            if let Err(e) = fut.await {
811                log::debug!("FDomain channel creation failed: {e}");
812            }
813        })
814        .detach();
815
816        (
817            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
818            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
819        )
820    }
821
822    /// Creates client and server endpoints connected to by a channel.
823    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
824        self: &Arc<Self>,
825    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
826        let (client, server) = self.create_channel();
827        let client_end = crate::fidl::ClientEnd::<F>::new(client);
828        let server_end = crate::fidl::ServerEnd::new(server);
829        (client_end, server_end)
830    }
831
832    /// Creates a client proxy and a server endpoint connected by a channel.
833    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
834        self: &Arc<Self>,
835    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
836        let (client_end, server_end) = self.create_endpoints::<F>();
837        (client_end.into_proxy(), server_end)
838    }
839
840    /// Creates a client proxy and a server request stream connected by a channel.
841    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
842        self: &Arc<Self>,
843    ) -> (F::Proxy, F::RequestStream) {
844        let (client_end, server_end) = self.create_endpoints::<F>();
845        (client_end.into_proxy(), server_end.into_stream())
846    }
847
848    /// Creates a client end and a server request stream connected by a channel.
849    pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
850        self: &Arc<Self>,
851    ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
852        let (client_end, server_end) = self.create_endpoints::<F>();
853        (client_end, server_end.into_stream())
854    }
855
856    /// Create a new socket in the connected FDomain.
857    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
858        let id_a = self.new_hid();
859        let id_b = self.new_hid();
860        let fut = self.transaction(
861            ordinals::CREATE_SOCKET,
862            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
863            Responder::CreateSocket,
864        );
865
866        fuchsia_async::Task::spawn(async move {
867            if let Err(e) = fut.await {
868                log::debug!("FDomain socket creation failed: {e}");
869            }
870        })
871        .detach();
872
873        (
874            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
875            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
876        )
877    }
878
879    /// Create a new streaming socket in the connected FDomain.
880    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
881        self.create_socket(proto::SocketType::Stream)
882    }
883
884    /// Create a new datagram socket in the connected FDomain.
885    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
886        self.create_socket(proto::SocketType::Datagram)
887    }
888
889    /// Create a new event pair in the connected FDomain.
890    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
891        let id_a = self.new_hid();
892        let id_b = self.new_hid();
893        let fut = self.transaction(
894            ordinals::CREATE_EVENT_PAIR,
895            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
896            Responder::CreateEventPair,
897        );
898
899        fuchsia_async::Task::spawn(async move {
900            if let Err(e) = fut.await {
901                log::debug!("FDomain event pair creation failed: {e}");
902            }
903        })
904        .detach();
905
906        (
907            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
908            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
909        )
910    }
911
912    /// Create a new event handle in the connected FDomain.
913    pub fn create_event(self: &Arc<Self>) -> Event {
914        let id = self.new_hid();
915        let fut = self.transaction(
916            ordinals::CREATE_EVENT,
917            proto::EventCreateEventRequest { handle: id },
918            Responder::CreateEvent,
919        );
920
921        fuchsia_async::Task::spawn(async move {
922            if let Err(e) = fut.await {
923                log::debug!("FDomain event creation failed: {e}");
924            }
925        })
926        .detach();
927
928        Event(Handle { id: id.id, client: Arc::downgrade(self) })
929    }
930
931    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
932    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
933        // TODO: On the target side we have to keep a table of these which means
934        // we can automatically detect collisions in the random value. On the
935        // client side we'd have to add a whole data structure just for that
936        // purpose. Should we?
937        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
938    }
939
940    /// Create a future which sends a FIDL message to the connected FDomain and
941    /// waits for a response.
942    ///
943    /// Calling this method queues the transaction synchronously. Awaiting is
944    /// only necessary to wait for the response.
945    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
946        self: &Arc<Self>,
947        ordinal: u64,
948        request: S,
949        f: F,
950    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
951    where
952        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
953    {
954        let mut inner = self.0.lock();
955
956        let (sender, receiver) = futures::channel::oneshot::channel();
957        inner.request(ordinal, request, f(sender));
958        receiver.map(|x| x.expect("Oneshot went away without reply!"))
959    }
960
961    /// Start getting streaming events for socket reads.
962    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
963        let mut inner = self.0.lock();
964        if let Some(e) = inner.transport.error() {
965            return Err(e.into());
966        }
967
968        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
969            wakers: Vec::new(),
970            queued: VecDeque::new(),
971            is_streaming: false,
972            read_request_pending: false,
973        });
974
975        assert!(!state.is_streaming, "Initiated streaming twice!");
976        state.is_streaming = true;
977
978        inner.request(
979            ordinals::READ_SOCKET_STREAMING_START,
980            proto::SocketReadSocketStreamingStartRequest { handle: id },
981            Responder::Ignore,
982        );
983        Ok(())
984    }
985
986    /// Stop getting streaming events for socket reads. Doesn't return errors
987    /// because it's exclusively called in destructors where we have nothing to
988    /// do with them.
989    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
990        let mut inner = self.0.lock();
991        if let Some(state) = inner.socket_read_states.get_mut(&id) {
992            if state.is_streaming {
993                state.is_streaming = false;
994                // TODO: Log?
995                let _ = inner.request(
996                    ordinals::READ_SOCKET_STREAMING_STOP,
997                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
998                    Responder::Ignore,
999                );
1000            }
1001        }
1002    }
1003
1004    /// Start getting streaming events for socket reads.
1005    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1006        let mut inner = self.0.lock();
1007        if let Some(e) = inner.transport.error() {
1008            return Err(e.into());
1009        }
1010        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1011            wakers: Vec::new(),
1012            queued: VecDeque::new(),
1013            is_streaming: false,
1014            read_request_pending: false,
1015        });
1016
1017        assert!(!state.is_streaming, "Initiated streaming twice!");
1018        state.is_streaming = true;
1019
1020        inner.request(
1021            ordinals::READ_CHANNEL_STREAMING_START,
1022            proto::ChannelReadChannelStreamingStartRequest { handle: id },
1023            Responder::Ignore,
1024        );
1025
1026        Ok(())
1027    }
1028
1029    /// Stop getting streaming events for socket reads. Doesn't return errors
1030    /// because it's exclusively called in destructors where we have nothing to
1031    /// do with them.
1032    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1033        let mut inner = self.0.lock();
1034        if let Some(state) = inner.channel_read_states.get_mut(&id) {
1035            if state.is_streaming {
1036                state.is_streaming = false;
1037                // TODO: Log?
1038                let _ = inner.request(
1039                    ordinals::READ_CHANNEL_STREAMING_STOP,
1040                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
1041                    Responder::Ignore,
1042                );
1043            }
1044        }
1045    }
1046
1047    /// Execute a read from a channel.
1048    pub(crate) fn poll_socket(
1049        &self,
1050        id: proto::HandleId,
1051        ctx: &mut Context<'_>,
1052        out: &mut [u8],
1053    ) -> Poll<Result<usize, Error>> {
1054        let mut inner = self.0.lock();
1055        if let Some(error) = inner.transport.error() {
1056            return Poll::Ready(Err(error.into()));
1057        }
1058
1059        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1060            wakers: Vec::new(),
1061            queued: VecDeque::new(),
1062            is_streaming: false,
1063            read_request_pending: false,
1064        });
1065
1066        if let Some(got) = state.queued.front_mut() {
1067            match got.as_mut() {
1068                Ok(data) => {
1069                    let read_size = std::cmp::min(data.data.len(), out.len());
1070                    out[..read_size].copy_from_slice(&data.data[..read_size]);
1071
1072                    if data.data.len() > read_size && !data.is_datagram {
1073                        let _ = data.data.drain(..read_size);
1074                    } else {
1075                        let _ = state.queued.pop_front();
1076                    }
1077
1078                    return Poll::Ready(Ok(read_size));
1079                }
1080                Err(_) => {
1081                    let err = state.queued.pop_front().unwrap().unwrap_err();
1082                    return Poll::Ready(Err(err));
1083                }
1084            }
1085        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1086            state.wakers.push(ctx.waker().clone());
1087        }
1088
1089        if !state.read_request_pending && !state.is_streaming {
1090            inner.request(
1091                ordinals::READ_SOCKET,
1092                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1093                Responder::ReadSocket(id),
1094            );
1095        }
1096
1097        Poll::Pending
1098    }
1099
1100    /// Execute a read from a channel.
1101    pub(crate) fn poll_channel(
1102        &self,
1103        id: proto::HandleId,
1104        ctx: &mut Context<'_>,
1105        for_stream: bool,
1106    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1107        let mut inner = self.0.lock();
1108        if let Some(error) = inner.transport.error() {
1109            return Poll::Ready(Some(Err(error.into())));
1110        }
1111
1112        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1113            wakers: Vec::new(),
1114            queued: VecDeque::new(),
1115            is_streaming: false,
1116            read_request_pending: false,
1117        });
1118
1119        if let Some(got) = state.queued.pop_front() {
1120            return Poll::Ready(Some(got));
1121        } else if for_stream && !state.is_streaming {
1122            return Poll::Ready(None);
1123        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1124            state.wakers.push(ctx.waker().clone());
1125        }
1126
1127        if !state.read_request_pending && !state.is_streaming {
1128            inner.request(
1129                ordinals::READ_CHANNEL,
1130                proto::ChannelReadChannelRequest { handle: id },
1131                Responder::ReadChannel(id),
1132            );
1133        }
1134
1135        Poll::Pending
1136    }
1137
1138    /// Check whether this channel is streaming
1139    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1140        let inner = self.0.lock();
1141        let Some(state) = inner.channel_read_states.get(&id) else {
1142            return false;
1143        };
1144        state.is_streaming
1145    }
1146
1147    /// Check that all the given handles are safe to transfer through a channel
1148    /// e.g. that there's no chance of in-flight reads getting dropped.
1149    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1150        let inner = self.0.lock();
1151        match handles {
1152            proto::Handles::Handles(handles) => {
1153                for handle in handles {
1154                    assert!(
1155                        !(inner.channel_read_states.contains_key(handle)
1156                            || inner.socket_read_states.contains_key(handle)),
1157                        "Tried to transfer handle after reading"
1158                    );
1159                }
1160            }
1161            proto::Handles::Dispositions(dispositions) => {
1162                for disposition in dispositions {
1163                    match &disposition.handle {
1164                        proto::HandleOp::Move_(handle) => assert!(
1165                            !(inner.channel_read_states.contains_key(handle)
1166                                || inner.socket_read_states.contains_key(handle)),
1167                            "Tried to transfer handle after reading"
1168                        ),
1169                        // Pretty sure this should be fine regardless of read state.
1170                        proto::HandleOp::Duplicate(_) => (),
1171                    }
1172                }
1173            }
1174        }
1175    }
1176}