Skip to main content

fidl/
client.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of a client for a fidl interface.
6
7use crate::Error;
8use crate::encoding::{
9    Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags, Encode, Encoder, EpitaphBody,
10    MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect, TransactionHeader,
11    TransactionMessage, TransactionMessageType, TypeMarker, decode_transaction_header,
12};
13use fuchsia_sync::Mutex;
14use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
15use futures::ready;
16use futures::stream::{FusedStream, Stream};
17use futures::task::{Context, Poll, Waker};
18use slab::Slab;
19use std::collections::VecDeque;
20use std::mem;
21use std::ops::ControlFlow;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{RawWaker, RawWakerVTable};
25use zx_status;
26
27/// Decodes the body of `buf` as the FIDL type `T`.
28#[doc(hidden)] // only exported for use in macros or generated code
29pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
30    mut buf: D::MessageBufEtc,
31) -> Result<T::Owned, Error>
32where
33    T::Owned: Decode<T, D>,
34{
35    let (bytes, handles) = buf.split_mut();
36    let (header, body_bytes) = decode_transaction_header(bytes)?;
37    if header.ordinal != EXPECTED_ORDINAL {
38        return Err(Error::InvalidResponseOrdinal);
39    }
40    let mut output = Decode::<T, D>::new_empty();
41    Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
42    Ok(output)
43}
44
45/// A FIDL client which can be used to send buffers and receive responses via a channel.
46#[derive(Debug, Clone)]
47pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
48    inner: Arc<ClientInner<D>>,
49}
50
51/// A future representing the decoded and transformed response to a FIDL query.
52pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
53    MessageResponse<D>,
54    fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
55>;
56
57/// A future representing the result of a FIDL query, with early error detection available if the
58/// message couldn't be sent.
59#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
62    pub MaybeDone<DecodedQueryResponseFut<T, D>>,
63);
64
65impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
66    fn is_terminated(&self) -> bool {
67        matches!(self.0, MaybeDone::Gone)
68    }
69}
70
71impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
72    type Output = Result<T, Error>;
73
74    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75        ready!(self.0.poll_unpin(cx));
76        let maybe_done = Pin::new(&mut self.0);
77        Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
78    }
79}
80
81impl<T> QueryResponseFut<T> {
82    /// Check to see if the query has an error. If there was en error sending, this returns it and
83    /// the error is returned, otherwise it returns self, which can then be awaited on:
84    /// i.e. match echo_proxy.echo("something").check() {
85    ///      Err(e) => error!("Couldn't send: {}", e),
86    ///      Ok(fut) => fut.await
87    /// }
88    pub fn check(self) -> Result<Self, Error> {
89        match self.0 {
90            MaybeDone::Done(Err(e)) => Err(e),
91            x => Ok(QueryResponseFut(x)),
92        }
93    }
94}
95
96const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
97const TXID_GENERATION_SHIFT: usize = 24;
98const TXID_GENERATION_MASK: u8 = 0x7F;
99
100/// A FIDL transaction id. Will not be zero for a message that includes a response.
101#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct Txid(u32);
103/// A message interest id.
104#[derive(Debug, Copy, Clone, PartialEq, Eq)]
105struct InterestId(usize);
106
107impl InterestId {
108    fn from_txid(txid: Txid) -> Self {
109        InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
110    }
111}
112
113impl Txid {
114    fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
115        // Base the transaction id on the slab slot + 1
116        // (slab slots are zero-based and txid zero is special)
117        let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
118        // And a 7-bit generation number.
119        let generation = (generation & TXID_GENERATION_MASK) as u32;
120
121        // Combine them:
122        //  - top bit zero to indicate a userspace generated txid.
123        //  - 7 bits of generation
124        //  - 24 bits based on the interest id
125        let txid = (generation << TXID_GENERATION_SHIFT) | id;
126
127        Txid(txid)
128    }
129
130    /// Get the raw u32 transaction ID.
131    pub fn as_raw_id(&self) -> u32 {
132        self.0
133    }
134}
135
136impl From<u32> for Txid {
137    fn from(txid: u32) -> Self {
138        Self(txid)
139    }
140}
141
142impl<D: ResourceDialect> Client<D> {
143    /// Create a new client.
144    ///
145    /// `channel` is the asynchronous channel over which data is sent and received.
146    /// `event_ordinals` are the ordinals on which events will be received.
147    pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
148        Client {
149            inner: Arc::new(ClientInner {
150                channel: channel.boxed(),
151                interests: Mutex::default(),
152                terminal_error: Mutex::default(),
153                protocol_name,
154            }),
155        }
156    }
157
158    /// Get a reference to the client's underlying channel.
159    pub fn as_channel(&self) -> &D::ProxyChannel {
160        self.inner.channel.as_channel()
161    }
162
163    /// Attempt to convert the `Client` back into a channel.
164    ///
165    /// This will only succeed if there are no active clones of this `Client`,
166    /// no currently-alive `EventReceiver` or `MessageResponse`s that came from
167    /// this `Client`, and no outstanding messages awaiting a response, even if
168    /// that response will be discarded.
169    pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
170        // We need to check the message_interests table to make sure there are no outstanding
171        // interests, since an interest might still exist even if all EventReceivers and
172        // MessageResponses have been dropped. That would lead to returning an AsyncChannel which
173        // could then later receive the outstanding response unexpectedly.
174        //
175        // We do try_unwrap before checking the message_interests to avoid a race where another
176        // thread inserts a new value into message_interests after we check
177        // message_interests.is_empty(), but before we get to try_unwrap. This forces us to create a
178        // new Arc if message_interests isn't empty, since try_unwrap destroys the original Arc.
179        match Arc::try_unwrap(self.inner) {
180            Ok(inner) => {
181                if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
182                    Ok(inner.channel.unbox())
183                } else {
184                    // This creates a new arc if there are outstanding interests. This will drop
185                    // weak references, and whilst we do create a weak reference to ClientInner if
186                    // we use it as a waker, it doesn't matter because if we have got this far, the
187                    // waker is obsolete: no tasks are waiting.
188                    Err(Self { inner: Arc::new(inner) })
189                }
190            }
191            Err(inner) => Err(Self { inner }),
192        }
193    }
194
195    /// Retrieve the stream of event messages for the `Client`.
196    /// Panics if the stream was already taken.
197    pub fn take_event_receiver(&self) -> EventReceiver<D> {
198        {
199            let mut lock = self.inner.interests.lock();
200
201            if let EventListener::None = lock.event_listener {
202                lock.event_listener = EventListener::WillPoll;
203            } else {
204                panic!("Event stream was already taken");
205            }
206        }
207
208        EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
209    }
210
211    /// Encodes and sends a request without expecting a response.
212    pub fn send<T: TypeMarker>(
213        &self,
214        body: impl Encode<T, D>,
215        ordinal: u64,
216        dynamic_flags: DynamicFlags,
217    ) -> Result<(), Error> {
218        let msg =
219            TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
220        crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
221            msg,
222            |bytes, handles| self.send_raw(bytes, handles),
223        )
224    }
225
226    /// Encodes and sends a request. Returns a future that decodes the response.
227    pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
228        &self,
229        body: impl Encode<Request, D>,
230        dynamic_flags: DynamicFlags,
231    ) -> QueryResponseFut<Response::Owned, D>
232    where
233        Response::Owned: Decode<Response, D>,
234    {
235        self.send_query_and_decode::<Request, Response::Owned>(
236            body,
237            ORDINAL,
238            dynamic_flags,
239            |buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
240        )
241    }
242
243    /// Encodes and sends a request. Returns a future that decodes the response
244    /// using the given `decode` function.
245    pub fn send_query_and_decode<Request: TypeMarker, Output>(
246        &self,
247        body: impl Encode<Request, D>,
248        ordinal: u64,
249        dynamic_flags: DynamicFlags,
250        decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
251    ) -> QueryResponseFut<Output, D> {
252        let send_result = self.send_raw_query(|tx_id, bytes, handles| {
253            let msg = TransactionMessage {
254                header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
255                body,
256            };
257            Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
258            Ok(())
259        });
260
261        QueryResponseFut(match send_result {
262            Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
263            Err(e) => MaybeDone::Done(Err(e)),
264        })
265    }
266
267    /// Sends a raw message without expecting a response.
268    pub fn send_raw(
269        &self,
270        bytes: &[u8],
271        handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
272    ) -> Result<(), Error> {
273        match self.inner.channel.write_etc(bytes, handles) {
274            Ok(()) | Err(None) => Ok(()),
275            Err(Some(e)) => Err(Error::ClientWrite(e.into())),
276        }
277    }
278
279    /// Sends a raw query and receives a response future.
280    pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
281    where
282        F: for<'a, 'b> FnOnce(
283            Txid,
284            &'a mut Vec<u8>,
285            &'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
286        ) -> Result<(), Error>,
287    {
288        let id = self.inner.interests.lock().register_msg_interest();
289        crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
290            encode_msg(id, bytes, handles)?;
291            self.send_raw(bytes, handles)
292        })?;
293
294        Ok(MessageResponse { id, client: Some(self.inner.clone()) })
295    }
296}
297
298#[must_use]
299/// A future which polls for the response to a client message.
300#[derive(Debug)]
301pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
302    id: Txid,
303    // `None` if the message response has been received
304    client: Option<Arc<ClientInner<D>>>,
305}
306
307impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
308
309impl<D: ResourceDialect> Future for MessageResponse<D> {
310    type Output = Result<D::MessageBufEtc, Error>;
311    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312        let this = &mut *self;
313        let res;
314        {
315            let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
316            res = client.poll_recv_msg_response(this.id, cx);
317        }
318
319        // Drop the client reference if the response has been received
320        if let Poll::Ready(Ok(_)) = res {
321            this.client.take().expect("MessageResponse polled after completion");
322        }
323
324        res
325    }
326}
327
328impl<D: ResourceDialect> Drop for MessageResponse<D> {
329    fn drop(&mut self) {
330        if let Some(client) = &self.client {
331            client.interests.lock().deregister(self.id);
332        }
333    }
334}
335
336/// An enum reprenting either a resolved message interest or a task on which to alert
337/// that a response message has arrived.
338#[derive(Debug)]
339enum MessageInterest<D: ResourceDialect> {
340    /// A new `MessageInterest`
341    WillPoll,
342    /// A task is waiting to receive a response, and can be awoken with `Waker`.
343    Waiting(Waker),
344    /// A message has been received, and a task will poll to receive it.
345    Received(D::MessageBufEtc),
346    /// A message has not been received, but the person interested in the response
347    /// no longer cares about it, so the message should be discared upon arrival.
348    Discard,
349}
350
351impl<D: ResourceDialect> MessageInterest<D> {
352    /// Check if a message has been received.
353    fn is_received(&self) -> bool {
354        matches!(*self, MessageInterest::Received(_))
355    }
356
357    fn unwrap_received(self) -> D::MessageBufEtc {
358        if let MessageInterest::Received(buf) = self {
359            buf
360        } else {
361            panic!("EXPECTED received message")
362        }
363    }
364}
365
366#[derive(Debug)]
367enum EventReceiverState {
368    Active,
369    Terminal,
370    Terminated,
371}
372
373/// A stream of events as `MessageBufEtc`s.
374#[derive(Debug)]
375pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
376    inner: Arc<ClientInner<D>>,
377    state: EventReceiverState,
378}
379
380impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
381
382impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
383    fn is_terminated(&self) -> bool {
384        matches!(self.state, EventReceiverState::Terminated)
385    }
386}
387
388/// This implementation holds up two invariants
389///   (1) After `None` is returned, the next poll panics
390///   (2) Until this instance is dropped, no other EventReceiver may claim the
391///       event channel by calling Client::take_event_receiver.
392impl<D: ResourceDialect> Stream for EventReceiver<D> {
393    type Item = Result<D::MessageBufEtc, Error>;
394
395    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
396        match self.state {
397            EventReceiverState::Active => {}
398            EventReceiverState::Terminated => {
399                panic!("polled EventReceiver after `None`");
400            }
401            EventReceiverState::Terminal => {
402                self.state = EventReceiverState::Terminated;
403                return Poll::Ready(None);
404            }
405        }
406
407        Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
408            Ok(x) => Some(Ok(x)),
409            Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
410                // The channel is closed, with no epitaph. Set our internal state so that on
411                // the next poll_next() we panic and is_terminated() returns an appropriate value.
412                self.state = EventReceiverState::Terminated;
413                None
414            }
415            err @ Err(_) => {
416                // We've received a terminal error. Return it and set our internal state so that on
417                // the next poll_next() we return a None and terminate the stream.
418                self.state = EventReceiverState::Terminal;
419                Some(err)
420            }
421        })
422    }
423}
424
425impl<D: ResourceDialect> Drop for EventReceiver<D> {
426    fn drop(&mut self) {
427        self.inner.interests.lock().dropped_event_listener();
428    }
429}
430
431#[derive(Debug, Default)]
432enum EventListener {
433    /// No one is listening for the event
434    #[default]
435    None,
436    /// Someone is listening for the event but has not yet polled
437    WillPoll,
438    /// Someone is listening for the event and can be woken via the `Waker`
439    Some(Waker),
440}
441
442impl EventListener {
443    fn is_some(&self) -> bool {
444        matches!(self, EventListener::Some(_))
445    }
446}
447
448/// A shared client channel which tracks EXPECTED and received responses
449#[derive(Debug)]
450struct ClientInner<D: ResourceDialect> {
451    /// The channel that leads to the server we are connected to.
452    channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
453
454    /// Tracks the state of responses to two-way messages and events.
455    interests: Mutex<Interests<D>>,
456
457    /// A terminal error, which can be a server provided epitaph, or None if the channel is still
458    /// active.
459    terminal_error: Mutex<Option<Error>>,
460
461    /// The `ProtocolMarker::DEBUG_NAME` for the service this client connects to.
462    protocol_name: &'static str,
463}
464
465#[derive(Debug)]
466struct Interests<D: ResourceDialect> {
467    messages: Slab<MessageInterest<D>>,
468    events: VecDeque<D::MessageBufEtc>,
469    event_listener: EventListener,
470    /// The number of wakers registered waiting for either a message or an event.
471    waker_count: usize,
472    /// Txid generation.
473    /// This is incremented every time we mint a new txid (see register_msg_interest).
474    /// The lower 7 bits are incorporated into the txid.
475    /// This is so that a client repeatedly making calls will have distinct txids for each call.
476    /// Not necessary for correctness but _very_ useful for tracing and debugging.
477    generation: u8,
478}
479
480impl<D: ResourceDialect> Default for Interests<D> {
481    fn default() -> Self {
482        Interests {
483            messages: Slab::new(),
484            events: Default::default(),
485            event_listener: Default::default(),
486            waker_count: 0,
487            generation: 0,
488        }
489    }
490}
491
492impl<D: ResourceDialect> Interests<D> {
493    /// Receives an event and returns a waker, if any.
494    fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
495        self.events.push_back(buf);
496        self.take_event_waker()
497    }
498
499    /// Returns the waker for the task waiting for events, if any.
500    fn take_event_waker(&mut self) -> Option<Waker> {
501        if self.event_listener.is_some() {
502            let EventListener::Some(waker) =
503                mem::replace(&mut self.event_listener, EventListener::WillPoll)
504            else {
505                unreachable!()
506            };
507
508            // Matches the +1 in `register_event_listener`.
509            self.waker_count -= 1;
510            Some(waker)
511        } else {
512            None
513        }
514    }
515
516    /// Returns a reference to the waker.
517    fn event_waker(&self) -> Option<&Waker> {
518        match &self.event_listener {
519            EventListener::Some(waker) => Some(waker),
520            _ => None,
521        }
522    }
523
524    /// Receive a message, waking the waiter if they are waiting to poll and `wake` is true.
525    /// Returns an error of the message isn't found.
526    fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
527        let InterestId(raw_id) = InterestId::from_txid(txid);
528        // Look for a message interest with the given ID.
529        // If one is found, store the message so that it can be picked up later.
530        let Some(interest) = self.messages.get_mut(raw_id) else {
531            // TODO(https://fxbug.dev/42066009): Should close the channel.
532            return Err(Error::InvalidResponseTxid);
533        };
534
535        let mut waker = None;
536        if let MessageInterest::Discard = interest {
537            self.messages.remove(raw_id);
538        } else if let MessageInterest::Waiting(w) =
539            mem::replace(interest, MessageInterest::Received(buf))
540        {
541            waker = Some(w);
542
543            // Matches the +1 in `register`.
544            self.waker_count -= 1;
545        }
546
547        Ok(waker)
548    }
549
550    /// Registers the waker from `cx` if the message has not already been received, replacing any
551    /// previous waker registered.  Returns the message if it has been received.
552    fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
553        let InterestId(raw_id) = InterestId::from_txid(txid);
554        let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
555        match interest {
556            MessageInterest::Received(_) => {
557                return Some(self.messages.remove(raw_id).unwrap_received());
558            }
559            MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
560            MessageInterest::WillPoll => self.waker_count += 1,
561            MessageInterest::Waiting(_) => {}
562        }
563        *interest = MessageInterest::Waiting(cx.waker().clone());
564        None
565    }
566
567    /// Deregisters an interest.
568    fn deregister(&mut self, txid: Txid) {
569        let InterestId(raw_id) = InterestId::from_txid(txid);
570        match self.messages[raw_id] {
571            MessageInterest::Received(_) => {
572                self.messages.remove(raw_id);
573                return;
574            }
575            MessageInterest::WillPoll => {}
576            MessageInterest::Waiting(_) => self.waker_count -= 1,
577            MessageInterest::Discard => unreachable!(),
578        }
579        self.messages[raw_id] = MessageInterest::Discard;
580    }
581
582    /// Registers an event listener.
583    fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
584        self.events.pop_front().or_else(|| {
585            if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
586                .is_some()
587            {
588                self.waker_count += 1;
589            }
590            None
591        })
592    }
593
594    /// Indicates the event listener has been dropped.
595    fn dropped_event_listener(&mut self) {
596        if self.event_listener.is_some() {
597            // Matches the +1 in register_event_listener.
598            self.waker_count -= 1;
599        }
600        self.event_listener = EventListener::None;
601    }
602
603    /// Registers interest in a response message.
604    ///
605    /// This function returns a new transaction ID which should be used to send a message
606    /// via the channel. Responses are then received using `poll_recv_msg_response`.
607    fn register_msg_interest(&mut self) -> Txid {
608        self.generation = self.generation.wrapping_add(1);
609        // TODO(cramertj) use `try_from` here and assert that the conversion from
610        // `usize` to `u32` hasn't overflowed.
611        Txid::from_interest_id(
612            InterestId(self.messages.insert(MessageInterest::WillPoll)),
613            self.generation,
614        )
615    }
616}
617
618impl<D: ResourceDialect> ClientInner<D> {
619    fn poll_recv_event(
620        self: &Arc<Self>,
621        cx: &Context<'_>,
622    ) -> Poll<Result<D::MessageBufEtc, Error>> {
623        // Update the EventListener with the latest waker, remove any stale WillPoll state
624        if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
625            return Poll::Ready(Ok(msg_buf));
626        }
627
628        // Process any data on the channel, registering any tasks still waiting to wake when the
629        // channel becomes ready.
630        let maybe_terminal_error = self.recv_all(Some(Txid(0)));
631
632        let mut lock = self.interests.lock();
633
634        if let Some(msg_buf) = lock.events.pop_front() {
635            Poll::Ready(Ok(msg_buf))
636        } else {
637            maybe_terminal_error?;
638            Poll::Pending
639        }
640    }
641
642    /// Poll for the response to `txid`, registering the waker associated with `cx` to be awoken,
643    /// or returning the response buffer if it has been received.
644    fn poll_recv_msg_response(
645        self: &Arc<Self>,
646        txid: Txid,
647        cx: &Context<'_>,
648    ) -> Poll<Result<D::MessageBufEtc, Error>> {
649        // Register our waker with the interest if we haven't received a message yet.
650        if let Some(buf) = self.interests.lock().register(txid, cx) {
651            return Poll::Ready(Ok(buf));
652        }
653
654        // Process any data on the channel, registering tasks still waiting for wake when the
655        // channel becomes ready.
656        let maybe_terminal_error = self.recv_all(Some(txid));
657
658        let InterestId(raw_id) = InterestId::from_txid(txid);
659        let mut interests = self.interests.lock();
660        if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
661            // If we got the result remove the received buffer and return, freeing up the
662            // space for a new message.
663            let buf = interests.messages.remove(raw_id).unwrap_received();
664            Poll::Ready(Ok(buf))
665        } else {
666            maybe_terminal_error?;
667            Poll::Pending
668        }
669    }
670
671    /// Poll for the receipt of any response message or an event.
672    /// Wakers present in any MessageInterest or the EventReceiver when this is called will be
673    /// notified when their message arrives or when there is new data if the channel is empty.
674    ///
675    /// All errors are terminal, so once an error has been encountered, all subsequent calls will
676    /// produce the same error.  The error might be due to the reception of an epitaph, the peer end
677    /// of the channel being closed, a decode error or some other error.  Before using this terminal
678    /// error, callers *should* check to see if a response or event has been received as they
679    /// should normally, at least for the PEER_CLOSED case, be delivered before the terminal error.
680    fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
681        // Acquire a mutex so that only one thread can read from the underlying channel
682        // at a time. Channel is already synchronized, but we need to also decode the
683        // FIDL message header atomically so that epitaphs can be properly handled.
684        let mut terminal_error = self.terminal_error.lock();
685        if let Some(error) = terminal_error.as_ref() {
686            return Err(error.clone());
687        }
688
689        let recv_once = |waker| {
690            let cx = &mut Context::from_waker(&waker);
691
692            let mut buf = D::MessageBufEtc::new();
693            let result = self.channel.recv_etc_from(cx, &mut buf);
694            match result {
695                Poll::Ready(Ok(())) => {}
696                Poll::Ready(Err(None)) => {
697                    // The channel has been closed, and no epitaph was received.
698                    // Set the epitaph to PEER_CLOSED.
699                    return Err(Error::ClientChannelClosed {
700                        status: zx_status::Status::PEER_CLOSED,
701                        protocol_name: self.protocol_name,
702                        epitaph: None,
703                        #[cfg(not(target_os = "fuchsia"))]
704                        reason: self.channel.closed_reason(),
705                    });
706                }
707                Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
708                Poll::Pending => return Ok(ControlFlow::Break(())),
709            };
710
711            let (bytes, _) = buf.split_mut();
712            let (header, body_bytes) = decode_transaction_header(bytes)?;
713            if header.is_epitaph() {
714                // Received an epitaph. Record this so that everyone receives the same epitaph.
715                let handles = &mut [];
716                let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
717                Decoder::<D>::decode_into::<EpitaphBody>(
718                    &header,
719                    body_bytes,
720                    handles,
721                    &mut epitaph_body,
722                )?;
723                return Err(Error::ClientChannelClosed {
724                    status: epitaph_body.error,
725                    protocol_name: self.protocol_name,
726                    epitaph: Some(epitaph_body.error.into_raw() as u32),
727                    #[cfg(not(target_os = "fuchsia"))]
728                    reason: self.channel.closed_reason(),
729                });
730            }
731
732            let txid = Txid(header.tx_id);
733
734            let waker = {
735                buf.shrink_bytes_to_fit();
736                let mut interests = self.interests.lock();
737                if txid == Txid(0) {
738                    interests.push_event(buf)
739                } else {
740                    interests.push_message(txid, buf)?
741                }
742            };
743
744            // Skip waking if the message was for the caller.
745            if want_txid != Some(txid)
746                && let Some(waker) = waker
747            {
748                waker.wake();
749            }
750
751            Ok(ControlFlow::Continue(()))
752        };
753
754        loop {
755            let waker = {
756                let interests = self.interests.lock();
757                if interests.waker_count == 0 {
758                    return Ok(());
759                } else if interests.waker_count == 1 {
760                    // There's only one waker, so just use the waker for the one interest.  This
761                    // is also required to allow `into_channel` to work, which relies on
762                    // `Arc::try_into` which won't always work if we use a waker based on
763                    // `ClientInner` (even if it's weak), because there can be races where the
764                    // reference count on ClientInner is > 1.
765                    if let Some(waker) = interests.event_waker() {
766                        waker.clone()
767                    } else {
768                        interests
769                            .messages
770                            .iter()
771                            .find_map(|(_, interest)| {
772                                if let MessageInterest::Waiting(waker) = interest {
773                                    Some(waker.clone())
774                                } else {
775                                    None
776                                }
777                            })
778                            .unwrap()
779                    }
780                } else {
781                    let weak = Arc::downgrade(self);
782                    let waker = ClientWaker(Arc::new(move || {
783                        if let Some(strong) = weak.upgrade() {
784                            // On host, we can't call recv_all because there are reentrancy issues; the waker is
785                            // woken whilst locks are held on the channel which recv_all needs.
786                            #[cfg(target_os = "fuchsia")]
787                            if strong.recv_all(None).is_ok() {
788                                return;
789                            }
790
791                            strong.wake_all();
792                        }
793                    }));
794                    // If there's more than one waker, use a waker that points to
795                    // `ClientInner` which will read the message and figure out which is
796                    // the correct task to wake.
797                    // SAFETY: We meet the requirements specified by RawWaker.
798                    unsafe {
799                        Waker::from_raw(RawWaker::new(
800                            Arc::into_raw(Arc::new(waker)) as *const (),
801                            &WAKER_VTABLE,
802                        ))
803                    }
804                }
805            };
806
807            match recv_once(waker) {
808                Ok(ControlFlow::Continue(())) => {}
809                Ok(ControlFlow::Break(())) => return Ok(()),
810                Err(error) => {
811                    // Broadcast all errors.
812                    self.wake_all();
813                    return Err(terminal_error.insert(error).clone());
814                }
815            }
816        }
817    }
818
819    /// Wakes all tasks that have polled on this channel.
820    fn wake_all(&self) {
821        let mut lock = self.interests.lock();
822        for (_, interest) in &mut lock.messages {
823            if let MessageInterest::Waiting(_) = interest {
824                let MessageInterest::Waiting(waker) =
825                    mem::replace(interest, MessageInterest::WillPoll)
826                else {
827                    unreachable!()
828                };
829                waker.wake();
830            }
831        }
832        if let Some(waker) = lock.take_event_waker() {
833            waker.wake();
834        }
835        lock.waker_count = 0;
836    }
837}
838
839#[derive(Clone)]
840struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
841
842static WAKER_VTABLE: RawWakerVTable =
843    RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
844
845unsafe fn clone_waker(data: *const ()) -> RawWaker {
846    unsafe { Arc::increment_strong_count(data as *const ClientWaker) };
847    RawWaker::new(data, &WAKER_VTABLE)
848}
849
850unsafe fn wake(data: *const ()) {
851    unsafe { Arc::from_raw(data as *const ClientWaker) }.0();
852}
853
854unsafe fn wake_by_ref(data: *const ()) {
855    mem::ManuallyDrop::new(unsafe { Arc::from_raw(data as *const ClientWaker) }).0();
856}
857
858unsafe fn drop_waker(data: *const ()) {
859    unsafe { Arc::from_raw(data as *const ClientWaker) };
860}
861
862#[cfg(target_os = "fuchsia")]
863pub mod sync {
864    //! Synchronous FIDL Client
865
866    use super::*;
867    use crate::endpoints::ProtocolMarker;
868    use std::mem::MaybeUninit;
869    use zx::MessageBufEtc;
870
871    /// A synchronous client for making FIDL calls.
872    #[derive(Debug)]
873    pub struct Client {
874        // Underlying channel
875        channel: zx::Channel,
876    }
877
878    impl Client {
879        /// Create a new synchronous FIDL client.
880        pub fn new(channel: zx::Channel) -> Self {
881            Client { channel }
882        }
883
884        /// Return a reference to the underlying channel for the client.
885        pub fn as_channel(&self) -> &zx::Channel {
886            &self.channel
887        }
888
889        /// Get the underlying channel out of the client.
890        pub fn into_channel(self) -> zx::Channel {
891            self.channel
892        }
893
894        /// Send a new message.
895        pub fn send<T: TypeMarker>(
896            &self,
897            body: impl Encode<T, DefaultFuchsiaResourceDialect>,
898            ordinal: u64,
899            dynamic_flags: DynamicFlags,
900        ) -> Result<(), Error> {
901            let mut write_bytes = Vec::new();
902            let mut write_handles = Vec::new();
903            let msg = TransactionMessage {
904                header: TransactionHeader::new(0, ordinal, dynamic_flags),
905                body,
906            };
907            Encoder::encode::<TransactionMessageType<T>>(
908                &mut write_bytes,
909                &mut write_handles,
910                msg,
911            )?;
912            match self.channel.write_etc(&write_bytes, &mut write_handles) {
913                Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
914                Err(e) => Err(Error::ClientWrite(e.into())),
915            }
916        }
917
918        /// Send a new message expecting a response.
919        pub fn send_query<Request: TypeMarker, Response: TypeMarker, P: ProtocolMarker>(
920            &self,
921            body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
922            ordinal: u64,
923            dynamic_flags: DynamicFlags,
924            deadline: zx::MonotonicInstant,
925        ) -> Result<Response::Owned, Error>
926        where
927            Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
928        {
929            let mut write_bytes = Vec::new();
930            let mut write_handles = Vec::new();
931
932            let msg = TransactionMessage {
933                header: TransactionHeader::new(0, ordinal, dynamic_flags),
934                body,
935            };
936            Encoder::encode::<TransactionMessageType<Request>>(
937                &mut write_bytes,
938                &mut write_handles,
939                msg,
940            )?;
941
942            // Heap allocate the buffer, because on the stack, all the pages would be written to by
943            // the compiler (see stack probing).
944            let mut bytes_out =
945                Vec::<MaybeUninit<u8>>::with_capacity(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
946            // SAFETY: Because the type is MaybeUninit, having it use uninitialized memory
947            // is safe.
948            unsafe { bytes_out.set_len(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize) };
949
950            // Stack-allocate these buffers to avoid the heap and reuse any populated pages from
951            // previous function calls. Use uninitialized memory so that the only writes to this
952            // array will be by the kernel for whatever's actually used for the reply.
953            let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
954                zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
955
956            // TODO: We should be able to use the same memory to back the bytes we use for writing
957            // and reading.
958            let (bytes_out, handles_out) = self
959                .channel
960                .call_etc_uninit(
961                    deadline,
962                    &write_bytes,
963                    &mut write_handles,
964                    bytes_out.as_mut_slice(),
965                    handles_out,
966                )
967                .map_err(|e| self.wrap_error::<P, _>(Error::ClientCall, e))?;
968
969            let (header, body_bytes) = decode_transaction_header(bytes_out)?;
970            if header.ordinal != ordinal {
971                return Err(Error::InvalidResponseOrdinal);
972            }
973            let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
974            Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
975                &header,
976                body_bytes,
977                handles_out,
978                &mut output,
979            )?;
980            Ok(output)
981        }
982
983        /// Wait for an event to arrive on the underlying channel.
984        pub fn wait_for_event<P: ProtocolMarker>(
985            &self,
986            deadline: zx::MonotonicInstant,
987        ) -> Result<MessageBufEtc, Error> {
988            let mut buf = zx::MessageBufEtc::new();
989            buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
990            buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
991
992            loop {
993                self.channel
994                    .wait_one(
995                        zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
996                        deadline,
997                    )
998                    .map_err(|e| self.wrap_error::<P, _>(Error::ClientEvent, e))?;
999                match self.channel.read_etc(&mut buf) {
1000                    Ok(()) => {
1001                        // We succeeded in reading the message. Check that it is
1002                        // an event not a two-way method reply.
1003                        let (header, body_bytes) = decode_transaction_header(buf.bytes())
1004                            .map_err(|_| Error::InvalidHeader)?;
1005                        if header.is_epitaph() {
1006                            // Received an epitaph. For the sync bindings, epitaphs are only
1007                            // reported by wait_for_event.
1008                            let handles = &mut [];
1009                            let mut epitaph_body =
1010                                Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
1011                            Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
1012                                &header,
1013                                body_bytes,
1014                                handles,
1015                                &mut epitaph_body,
1016                            )?;
1017                            return Err(Error::ClientChannelClosed {
1018                                status: epitaph_body.error,
1019                                protocol_name: P::DEBUG_NAME,
1020                                epitaph: Some(epitaph_body.error.into_raw() as u32),
1021                            });
1022                        }
1023                        if header.tx_id != 0 {
1024                            return Err(Error::UnexpectedSyncResponse);
1025                        }
1026                        return Ok(buf);
1027                    }
1028                    Err(zx::Status::SHOULD_WAIT) => {
1029                        // Some other thread read the message we woke up to read.
1030                        continue;
1031                    }
1032                    Err(e) => {
1033                        return Err(self.wrap_error::<P, _>(|x| Error::ClientRead(x.into()), e));
1034                    }
1035                }
1036            }
1037        }
1038
1039        /// Wraps an error in the given `variant` of the `Error` enum, except
1040        /// for `zx_status::Status::PEER_CLOSED`, in which case it uses the
1041        /// `Error::ClientChannelClosed` variant.
1042        fn wrap_error<P: ProtocolMarker, T: Fn(zx_status::Status) -> Error>(
1043            &self,
1044            variant: T,
1045            err: zx_status::Status,
1046        ) -> Error {
1047            if err == zx_status::Status::PEER_CLOSED {
1048                Error::ClientChannelClosed {
1049                    status: zx_status::Status::PEER_CLOSED,
1050                    protocol_name: P::DEBUG_NAME,
1051                    epitaph: None,
1052                }
1053            } else {
1054                variant(err)
1055            }
1056        }
1057    }
1058}
1059
1060#[cfg(all(test, target_os = "fuchsia"))]
1061mod tests {
1062    use super::*;
1063    use crate::encoding::MAGIC_NUMBER_INITIAL;
1064    use crate::endpoints::{ControlHandle, ProtocolMarker, Proxy, RequestStream, SynchronousProxy};
1065    use crate::epitaph::{self, ChannelEpitaphExt};
1066    use crate::{Channel, OnSignalsRef, ServeInner};
1067    use anyhow::{Context as _, Error};
1068    use assert_matches::assert_matches;
1069    use fuchsia_async as fasync;
1070    use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
1071    use futures::channel::oneshot;
1072    use futures::stream::{FuturesUnordered, Stream};
1073    use futures::{StreamExt, TryFutureExt, join};
1074    use futures_test::task::new_count_waker;
1075    use std::future::pending;
1076    use std::task::{Wake, Waker};
1077    use std::thread;
1078    use zx::MessageBufEtc;
1079
1080    const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
1081    const SEND_ORDINAL: u64 = 42 << 32;
1082    const SEND_DATA: u8 = 55;
1083
1084    const EVENT_ORDINAL: u64 = 854 << 23;
1085
1086    struct TestProtocolMarker;
1087    impl ProtocolMarker for TestProtocolMarker {
1088        type Proxy = TestProxy;
1089        type SynchronousProxy = TestSynchronousProxy;
1090        type RequestStream = TestRequestStream;
1091        const DEBUG_NAME: &str = "test_protocol";
1092    }
1093
1094    struct TestProxy;
1095    impl Proxy for TestProxy {
1096        type Protocol = TestProtocolMarker;
1097        fn from_channel(_inner: AsyncChannel) -> Self {
1098            unimplemented!();
1099        }
1100        fn into_channel(self) -> Result<AsyncChannel, Self> {
1101            unimplemented!();
1102        }
1103        fn as_channel(&self) -> &AsyncChannel {
1104            unimplemented!();
1105        }
1106    }
1107
1108    struct TestSynchronousProxy;
1109    impl SynchronousProxy for TestSynchronousProxy {
1110        type Proxy = TestProxy;
1111        type Protocol = TestProtocolMarker;
1112        fn from_channel(_inner: Channel) -> Self {
1113            unimplemented!();
1114        }
1115        fn into_channel(self) -> Channel {
1116            unimplemented!();
1117        }
1118        fn as_channel(&self) -> &Channel {
1119            unimplemented!();
1120        }
1121    }
1122
1123    struct TestRequestStream;
1124    impl RequestStream for TestRequestStream {
1125        type Protocol = TestProtocolMarker;
1126        type ControlHandle = TestControlHandle;
1127        fn control_handle(&self) -> Self::ControlHandle {
1128            unimplemented!();
1129        }
1130        fn from_channel(_inner: AsyncChannel) -> Self {
1131            unimplemented!();
1132        }
1133        fn into_inner(self) -> (Arc<ServeInner>, bool) {
1134            unimplemented!();
1135        }
1136
1137        fn from_inner(_inner: Arc<ServeInner>, _is_terminated: bool) -> Self {
1138            unimplemented!();
1139        }
1140    }
1141    impl Stream for TestRequestStream {
1142        type Item = Result<(), crate::Error>;
1143        fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1144            unimplemented!();
1145        }
1146    }
1147
1148    struct TestControlHandle;
1149    impl ControlHandle for TestControlHandle {
1150        fn shutdown(&self) {
1151            unimplemented!();
1152        }
1153        fn shutdown_with_epitaph(&self, _status: zx_status::Status) {
1154            unimplemented!();
1155        }
1156        fn is_closed(&self) -> bool {
1157            unimplemented!();
1158        }
1159        fn on_closed(&self) -> OnSignalsRef<'_> {
1160            unimplemented!();
1161        }
1162        fn signal_peer(
1163            &self,
1164            _clear_mask: zx::Signals,
1165            _set_mask: zx::Signals,
1166        ) -> Result<(), zx_status::Status> {
1167            unimplemented!();
1168        }
1169    }
1170
1171    #[rustfmt::skip]
1172    fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
1173        [
1174            txid_index, 0, 0, txid_generation, // 32 bit tx_id
1175            2, 0, 0, // flags
1176            MAGIC_NUMBER_INITIAL,
1177            0, 0, 0, 0, // low bytes of 64 bit ordinal
1178            SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, // high bytes of 64 bit ordinal
1179            SEND_DATA, // 8 bit data
1180            0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data
1181        ]
1182    }
1183
1184    fn expected_sent_bytes_oneway() -> [u8; 24] {
1185        expected_sent_bytes(0, 0)
1186    }
1187
1188    fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
1189        let (bytes, handles) = (&mut vec![], &mut vec![]);
1190        encode_transaction(header, bytes, handles);
1191        channel.write_etc(bytes, handles).expect("Server channel write failed");
1192    }
1193
1194    fn encode_transaction(
1195        header: TransactionHeader,
1196        bytes: &mut Vec<u8>,
1197        handles: &mut Vec<zx::HandleDisposition<'static>>,
1198    ) {
1199        let event = TransactionMessage { header, body: SEND_DATA };
1200        Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
1201            bytes, handles, event,
1202        )
1203        .expect("Encoding failure");
1204    }
1205
1206    #[test]
1207    fn sync_client() -> Result<(), Error> {
1208        let (client_end, server_end) = zx::Channel::create();
1209        let client = sync::Client::new(client_end);
1210        client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
1211        let mut received = MessageBufEtc::new();
1212        server_end.read_etc(&mut received).context("reading")?;
1213        assert_eq!(received.bytes(), expected_sent_bytes_oneway());
1214        Ok(())
1215    }
1216
1217    #[test]
1218    fn sync_client_with_response() -> Result<(), Error> {
1219        let (client_end, server_end) = zx::Channel::create();
1220        let client = sync::Client::new(client_end);
1221        thread::spawn(move || {
1222            // Server
1223            let mut received = MessageBufEtc::new();
1224            server_end
1225                .wait_one(
1226                    zx::Signals::CHANNEL_READABLE,
1227                    zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1228                )
1229                .expect("failed to wait for channel readable");
1230            server_end.read_etc(&mut received).expect("failed to read on server end");
1231            let (buf, _handles) = received.split_mut();
1232            let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1233            assert_eq!(header.ordinal, SEND_ORDINAL);
1234            send_transaction(
1235                TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1236                &server_end,
1237            );
1238        });
1239        let response_data = client
1240            .send_query::<u8, u8, TestProtocolMarker>(
1241                SEND_DATA,
1242                SEND_ORDINAL,
1243                DynamicFlags::empty(),
1244                zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1245            )
1246            .context("sending query")?;
1247        assert_eq!(SEND_DATA, response_data);
1248        Ok(())
1249    }
1250
1251    #[test]
1252    fn sync_client_with_event_and_response() -> Result<(), Error> {
1253        let (client_end, server_end) = zx::Channel::create();
1254        let client = sync::Client::new(client_end);
1255        thread::spawn(move || {
1256            // Server
1257            let mut received = MessageBufEtc::new();
1258            server_end
1259                .as_handle_ref()
1260                .wait_one(
1261                    zx::Signals::CHANNEL_READABLE,
1262                    zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1263                )
1264                .expect("failed to wait for channel readable");
1265            server_end.read_etc(&mut received).expect("failed to read on server end");
1266            let (buf, _handles) = received.split_mut();
1267            let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1268            assert_ne!(header.tx_id, 0);
1269            assert_eq!(header.ordinal, SEND_ORDINAL);
1270            // First, send an event.
1271            send_transaction(
1272                TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1273                &server_end,
1274            );
1275            // Then send the reply. The kernel should pick the correct message to deliver based
1276            // on the tx_id.
1277            send_transaction(
1278                TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1279                &server_end,
1280            );
1281        });
1282        let response_data = client
1283            .send_query::<u8, u8, TestProtocolMarker>(
1284                SEND_DATA,
1285                SEND_ORDINAL,
1286                DynamicFlags::empty(),
1287                zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1288            )
1289            .context("sending query")?;
1290        assert_eq!(SEND_DATA, response_data);
1291
1292        let event_buf = client
1293            .wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1294                zx::MonotonicDuration::from_seconds(5),
1295            ))
1296            .context("waiting for event")?;
1297        let (bytes, _handles) = event_buf.split();
1298        let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
1299        assert_eq!(header.ordinal, EVENT_ORDINAL);
1300
1301        Ok(())
1302    }
1303
1304    #[test]
1305    fn sync_client_with_racing_events() -> Result<(), Error> {
1306        let (client_end, server_end) = zx::Channel::create();
1307        let client1 = Arc::new(sync::Client::new(client_end));
1308        let client2 = client1.clone();
1309
1310        let thread1 = thread::spawn(move || {
1311            let result = client1.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1312                zx::MonotonicDuration::from_seconds(5),
1313            ));
1314            assert!(result.is_ok());
1315        });
1316
1317        let thread2 = thread::spawn(move || {
1318            let result = client2.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1319                zx::MonotonicDuration::from_seconds(5),
1320            ));
1321            assert!(result.is_ok());
1322        });
1323
1324        send_transaction(
1325            TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1326            &server_end,
1327        );
1328        send_transaction(
1329            TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1330            &server_end,
1331        );
1332
1333        assert!(thread1.join().is_ok());
1334        assert!(thread2.join().is_ok());
1335
1336        Ok(())
1337    }
1338
1339    #[test]
1340    fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
1341        let (client_end, server_end) = zx::Channel::create();
1342        let client = sync::Client::new(client_end);
1343        send_transaction(
1344            TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
1345            &server_end,
1346        );
1347        assert_matches!(
1348            client.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1349                zx::MonotonicDuration::from_seconds(5)
1350            )),
1351            Err(crate::Error::UnexpectedSyncResponse)
1352        );
1353        Ok(())
1354    }
1355
1356    #[test]
1357    fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
1358        let (client_end, server_end) = zx::Channel::create();
1359        let client = sync::Client::new(client_end);
1360        drop(server_end);
1361        assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
1362        Ok(())
1363    }
1364
1365    #[test]
1366    fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
1367        let (client_end, server_end) = zx::Channel::create();
1368        let client = sync::Client::new(client_end);
1369        drop(server_end);
1370        assert_matches!(
1371            client.send_query::<u8, u8, TestProtocolMarker>(
1372                SEND_DATA,
1373                SEND_ORDINAL,
1374                DynamicFlags::empty(),
1375                zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1376            ),
1377            Err(crate::Error::ClientChannelClosed {
1378                status: zx_status::Status::PEER_CLOSED,
1379                protocol_name: "test_protocol",
1380                epitaph: None,
1381            })
1382        );
1383        Ok(())
1384    }
1385
1386    // TODO(https://fxbug.dev/42153053): When the sync client supports epitaphs, rename
1387    // these tests and change the asserts to expect zx_status::Status::UNAVAILABLE.
1388    #[test]
1389    fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
1390        let (client_end, server_end) = zx::Channel::create();
1391        let client = sync::Client::new(client_end);
1392        // Close the server channel with an epitaph.
1393        server_end
1394            .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1395            .expect("failed to write epitaph");
1396        assert_matches!(
1397            client.send_query::<u8, u8, TestProtocolMarker>(
1398                SEND_DATA,
1399                SEND_ORDINAL,
1400                DynamicFlags::empty(),
1401                zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1402            ),
1403            Err(crate::Error::ClientChannelClosed {
1404                status: zx_status::Status::PEER_CLOSED,
1405                protocol_name: "test_protocol",
1406                epitaph: None,
1407            })
1408        );
1409        Ok(())
1410    }
1411
1412    #[test]
1413    fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
1414        let (client_end, server_end) = zx::Channel::create();
1415        let client = sync::Client::new(client_end);
1416        // Close the server channel with an epitaph.
1417        server_end
1418            .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1419            .expect("failed to write epitaph");
1420        assert_matches!(
1421            client.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1422                zx::MonotonicDuration::from_seconds(5)
1423            )),
1424            Err(crate::Error::ClientChannelClosed {
1425                status: zx_status::Status::UNAVAILABLE,
1426                protocol_name: "test_protocol",
1427                epitaph: Some(epitaph),
1428            }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1429        );
1430        Ok(())
1431    }
1432
1433    #[test]
1434    fn sync_client_into_channel() -> Result<(), Error> {
1435        let (client_end, _server_end) = zx::Channel::create();
1436        let client_end_raw = client_end.raw_handle();
1437        let client = sync::Client::new(client_end);
1438        assert_eq!(client.into_channel().raw_handle(), client_end_raw);
1439        Ok(())
1440    }
1441
1442    #[fasync::run_singlethreaded(test)]
1443    async fn client() {
1444        let (client_end, server_end) = zx::Channel::create();
1445        let client_end = AsyncChannel::from_channel(client_end);
1446        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1447
1448        let server = AsyncChannel::from_channel(server_end);
1449        let receiver = async move {
1450            let mut buffer = MessageBufEtc::new();
1451            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1452            assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
1453        };
1454
1455        // add a timeout to receiver so if test is broken it doesn't take forever
1456        let receiver = receiver
1457            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1458                panic!("did not receive message in time!")
1459            });
1460
1461        client
1462            .send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
1463            .expect("failed to send msg");
1464
1465        receiver.await;
1466    }
1467
1468    #[fasync::run_singlethreaded(test)]
1469    async fn client_with_response() {
1470        let (client_end, server_end) = zx::Channel::create();
1471        let client_end = AsyncChannel::from_channel(client_end);
1472        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1473
1474        let server = AsyncChannel::from_channel(server_end);
1475        let mut buffer = MessageBufEtc::new();
1476        let receiver = async move {
1477            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1478            let two_way_tx_id = 1u8;
1479            assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1480
1481            let (bytes, handles) = (&mut vec![], &mut vec![]);
1482            let header =
1483                TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1484            encode_transaction(header, bytes, handles);
1485            server.write_etc(bytes, handles).expect("Server channel write failed");
1486        };
1487
1488        // add a timeout to receiver so if test is broken it doesn't take forever
1489        let receiver = receiver
1490            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1491                panic!("did not receiver message in time!")
1492            });
1493
1494        let sender = client
1495            .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1496            .map_ok(|x| assert_eq!(x, SEND_DATA))
1497            .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1498
1499        // add a timeout to receiver so if test is broken it doesn't take forever
1500        let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1501            panic!("did not receive response in time!")
1502        });
1503
1504        let ((), ()) = join!(receiver, sender);
1505    }
1506
1507    #[fasync::run_singlethreaded(test)]
1508    async fn client_with_response_receives_epitaph() {
1509        let (client_end, server_end) = zx::Channel::create();
1510        let client_end = AsyncChannel::from_channel(client_end);
1511        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1512
1513        let server = AsyncChannel::from_channel(server_end);
1514        let mut buffer = zx::MessageBufEtc::new();
1515        let receiver = async move {
1516            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1517            server
1518                .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1519                .expect("failed to write epitaph");
1520        };
1521        // add a timeout to receiver so if test is broken it doesn't take forever
1522        let receiver = receiver
1523            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1524                panic!("did not receive message in time!")
1525            });
1526
1527        let sender = async move {
1528            const ORDINAL: u64 = 42 << 32;
1529            let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
1530            assert_matches!(
1531                result,
1532                Err(crate::Error::ClientChannelClosed {
1533                    status: zx_status::Status::UNAVAILABLE,
1534                    protocol_name: "test_protocol",
1535                    epitaph: Some(epitaph),
1536                }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1537            );
1538        };
1539        // add a timeout to sender so if test is broken it doesn't take forever
1540        let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1541            panic!("did not receive response in time!")
1542        });
1543
1544        let ((), ()) = join!(receiver, sender);
1545    }
1546
1547    #[fasync::run_singlethreaded(test)]
1548    #[should_panic]
1549    async fn event_cant_be_taken_twice() {
1550        let (client_end, _) = zx::Channel::create();
1551        let client_end = AsyncChannel::from_channel(client_end);
1552        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1553        let _foo = client.take_event_receiver();
1554        client.take_event_receiver();
1555    }
1556
1557    #[fasync::run_singlethreaded(test)]
1558    async fn event_can_be_taken_after_drop() {
1559        let (client_end, _) = zx::Channel::create();
1560        let client_end = AsyncChannel::from_channel(client_end);
1561        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1562        let foo = client.take_event_receiver();
1563        drop(foo);
1564        client.take_event_receiver();
1565    }
1566
1567    #[fasync::run_singlethreaded(test)]
1568    async fn receiver_termination_test() {
1569        let (client_end, _) = zx::Channel::create();
1570        let client_end = AsyncChannel::from_channel(client_end);
1571        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1572        let mut foo = client.take_event_receiver();
1573        assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
1574        let _ = foo.next().await;
1575        assert!(
1576            foo.is_terminated(),
1577            "receiver should report terminated after seeing channel is closed"
1578        );
1579    }
1580
1581    #[fasync::run_singlethreaded(test)]
1582    #[should_panic(expected = "polled EventReceiver after `None`")]
1583    async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
1584        let (client_end, _) = zx::Channel::create();
1585        let client_end = AsyncChannel::from_channel(client_end);
1586        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1587        let foo = client.take_event_receiver();
1588        drop(foo);
1589        let mut bar = client.take_event_receiver();
1590        assert!(bar.next().await.is_none(), "read on closed channel should return none");
1591        // this should panic
1592        let _ = bar.next().await;
1593    }
1594
1595    #[fasync::run_singlethreaded(test)]
1596    #[should_panic(expected = "polled EventReceiver after `None`")]
1597    async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
1598        let (client_end, server_end) = zx::Channel::create();
1599        let client_end = AsyncChannel::from_channel(client_end);
1600        let server_end = AsyncChannel::from_channel(server_end);
1601        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1602        let mut stream = client.take_event_receiver();
1603
1604        epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1605            .expect("wrote epitaph");
1606        drop(server_end);
1607
1608        assert_matches!(
1609            stream.next().await,
1610            Some(Err(crate::Error::ClientChannelClosed {
1611                status: zx_status::Status::UNAVAILABLE,
1612                protocol_name: "test_protocol",
1613                epitaph: Some(epitaph),
1614            })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1615        );
1616        assert_matches!(stream.next().await, None);
1617        // this should panic
1618        let _ = stream.next().await;
1619    }
1620
1621    #[fasync::run_singlethreaded(test)]
1622    async fn event_can_be_taken() {
1623        let (client_end, _) = zx::Channel::create();
1624        let client_end = AsyncChannel::from_channel(client_end);
1625        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1626        client.take_event_receiver();
1627    }
1628
1629    #[fasync::run_singlethreaded(test)]
1630    async fn event_received() {
1631        let (client_end, server_end) = zx::Channel::create();
1632        let client_end = AsyncChannel::from_channel(client_end);
1633        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1634
1635        // Send the event from the server
1636        let server = AsyncChannel::from_channel(server_end);
1637        let (bytes, handles) = (&mut vec![], &mut vec![]);
1638        const ORDINAL: u64 = 5;
1639        let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1640        encode_transaction(header, bytes, handles);
1641        server.write_etc(bytes, handles).expect("Server channel write failed");
1642        drop(server);
1643
1644        let recv = client
1645            .take_event_receiver()
1646            .into_future()
1647            .then(|(x, stream)| {
1648                let x = x.expect("should contain one element");
1649                let x = x.expect("fidl error");
1650                let x: i32 =
1651                    decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1652                        .expect("failed to decode event");
1653                assert_eq!(x, 55);
1654                stream.into_future()
1655            })
1656            .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1657
1658        // add a timeout to receiver so if test is broken it doesn't take forever
1659        let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1660            panic!("did not receive event in time!")
1661        });
1662
1663        recv.await;
1664    }
1665
1666    /// Tests that the event receiver can be taken, the stream read to the end,
1667    /// the receiver dropped, and then a new receiver gotten from taking the
1668    /// stream again.
1669    #[fasync::run_singlethreaded(test)]
1670    async fn receiver_can_be_taken_after_end_of_stream() {
1671        let (client_end, server_end) = zx::Channel::create();
1672        let client_end = AsyncChannel::from_channel(client_end);
1673        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1674
1675        // Send the event from the server
1676        let server = AsyncChannel::from_channel(server_end);
1677        let (bytes, handles) = (&mut vec![], &mut vec![]);
1678        const ORDINAL: u64 = 5;
1679        let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1680        encode_transaction(header, bytes, handles);
1681        server.write_etc(bytes, handles).expect("Server channel write failed");
1682        drop(server);
1683
1684        // Create a block to make sure the first event receiver is dropped.
1685        // Creating the block is a bit of paranoia, because awaiting the
1686        // future moves the receiver anyway.
1687        {
1688            let recv = client
1689                .take_event_receiver()
1690                .into_future()
1691                .then(|(x, stream)| {
1692                    let x = x.expect("should contain one element");
1693                    let x = x.expect("fidl error");
1694                    let x: i32 =
1695                        decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1696                            .expect("failed to decode event");
1697                    assert_eq!(x, 55);
1698                    stream.into_future()
1699                })
1700                .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1701
1702            // add a timeout to receiver so if test is broken it doesn't take forever
1703            let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1704                panic!("did not receive event in time!")
1705            });
1706
1707            recv.await;
1708        }
1709
1710        // if we take the event stream again, we should be able to get the next
1711        // without a panic, but that should be none
1712        let mut c = client.take_event_receiver();
1713        assert!(
1714            c.next().await.is_none(),
1715            "receiver on closed channel should return none on first call"
1716        );
1717    }
1718
1719    #[fasync::run_singlethreaded(test)]
1720    async fn event_incompatible_format() {
1721        let (client_end, server_end) = zx::Channel::create();
1722        let client_end = AsyncChannel::from_channel(client_end);
1723        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1724
1725        // Send the event from the server
1726        let server = AsyncChannel::from_channel(server_end);
1727        let (bytes, handles) = (&mut vec![], &mut vec![]);
1728        let header = TransactionHeader::new_full(
1729            0,
1730            5,
1731            crate::encoding::Context {
1732                wire_format_version: crate::encoding::WireFormatVersion::V2,
1733            },
1734            DynamicFlags::empty(),
1735            0,
1736        );
1737        encode_transaction(header, bytes, handles);
1738        server.write_etc(bytes, handles).expect("Server channel write failed");
1739        drop(server);
1740
1741        let mut event_receiver = client.take_event_receiver();
1742        let recv = event_receiver.next().map(|event| {
1743            assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
1744        });
1745
1746        // add a timeout to receiver so if test is broken it doesn't take forever
1747        let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1748            panic!("did not receive event in time!")
1749        });
1750
1751        recv.await;
1752    }
1753
1754    #[test]
1755    fn client_always_wakes_pending_futures() {
1756        let mut executor = fasync::TestExecutor::new();
1757
1758        let (client_end, server_end) = zx::Channel::create();
1759        let client_end = AsyncChannel::from_channel(client_end);
1760        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1761
1762        let mut event_receiver = client.take_event_receiver();
1763
1764        // first poll on a response
1765        let (response_waker, response_waker_count) = new_count_waker();
1766        let response_cx = &mut Context::from_waker(&response_waker);
1767        let mut response_txid = Txid(0);
1768        let mut response_future = client
1769            .send_raw_query(|tx_id, bytes, handles| {
1770                response_txid = tx_id;
1771                let header = TransactionHeader::new(
1772                    response_txid.as_raw_id(),
1773                    SEND_ORDINAL,
1774                    DynamicFlags::empty(),
1775                );
1776                encode_transaction(header, bytes, handles);
1777                Ok(())
1778            })
1779            .expect("Couldn't send query");
1780        assert!(response_future.poll_unpin(response_cx).is_pending());
1781
1782        // then, poll on an event
1783        let (event_waker, event_waker_count) = new_count_waker();
1784        let event_cx = &mut Context::from_waker(&event_waker);
1785        assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1786
1787        // at this point, nothing should have been woken
1788        assert_eq!(response_waker_count.get(), 0);
1789        assert_eq!(event_waker_count.get(), 0);
1790
1791        // next, simulate an event coming in
1792        send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1793
1794        // get event loop to deliver readiness notifications to channels
1795        let _ = executor.run_until_stalled(&mut future::pending::<()>());
1796
1797        // The event wake should be woken but not the response_waker.
1798        assert_eq!(response_waker_count.get(), 0);
1799        assert_eq!(event_waker_count.get(), 1);
1800
1801        // we'll pretend event_waker was woken, and have that poll out the event
1802        assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1803
1804        // next, simulate a response coming in
1805        send_transaction(
1806            TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
1807            &server_end,
1808        );
1809
1810        // get event loop to deliver readiness notifications to channels
1811        let _ = executor.run_until_stalled(&mut future::pending::<()>());
1812
1813        // response waker should now get woken.
1814        assert_eq!(response_waker_count.get(), 1);
1815    }
1816
1817    #[test]
1818    fn client_always_wakes_pending_futures_on_epitaph() {
1819        let mut executor = fasync::TestExecutor::new();
1820
1821        let (client_end, server_end) = zx::Channel::create();
1822        let client_end = AsyncChannel::from_channel(client_end);
1823        let server_end = AsyncChannel::from_channel(server_end);
1824        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1825
1826        let mut event_receiver = client.take_event_receiver();
1827
1828        // first poll on a response
1829        let (response1_waker, response1_waker_count) = new_count_waker();
1830        let response1_cx = &mut Context::from_waker(&response1_waker);
1831        let mut response1_future = client
1832            .send_raw_query(|tx_id, bytes, handles| {
1833                let header =
1834                    TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1835                encode_transaction(header, bytes, handles);
1836                Ok(())
1837            })
1838            .expect("Couldn't send query");
1839        assert!(response1_future.poll_unpin(response1_cx).is_pending());
1840
1841        // then, poll on an event
1842        let (event_waker, event_waker_count) = new_count_waker();
1843        let event_cx = &mut Context::from_waker(&event_waker);
1844        assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1845
1846        // poll on another response
1847        let (response2_waker, response2_waker_count) = new_count_waker();
1848        let response2_cx = &mut Context::from_waker(&response2_waker);
1849        let mut response2_future = client
1850            .send_raw_query(|tx_id, bytes, handles| {
1851                let header =
1852                    TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1853                encode_transaction(header, bytes, handles);
1854                Ok(())
1855            })
1856            .expect("Couldn't send query");
1857        assert!(response2_future.poll_unpin(response2_cx).is_pending());
1858
1859        let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
1860
1861        // get event loop to deliver readiness notifications to channels
1862        let _ = executor.run_until_stalled(&mut future::pending::<()>());
1863
1864        // at this point, nothing should have been woken
1865        assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
1866
1867        // next, simulate an epitaph without closing
1868        epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1869            .expect("wrote epitaph");
1870
1871        // get event loop to deliver readiness notifications to channels
1872        let _ = executor.run_until_stalled(&mut future::pending::<()>());
1873
1874        // All the wakers should be woken up because the channel is ready to read, and the message
1875        // could be for any of them.
1876        for wake_count in &wakers {
1877            assert_eq!(wake_count.get(), 1);
1878        }
1879
1880        // pretend that response1 woke and poll that to completion.
1881        assert_matches!(
1882            response1_future.poll_unpin(response1_cx),
1883            Poll::Ready(Err(crate::Error::ClientChannelClosed {
1884                status: zx_status::Status::UNAVAILABLE,
1885                protocol_name: "test_protocol",
1886                epitaph: Some(epitaph),
1887            })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1888        );
1889
1890        // get event loop to deliver readiness notifications to channels
1891        let _ = executor.run_until_stalled(&mut future::pending::<()>());
1892
1893        // poll response2 to completion.
1894        assert_matches!(
1895            response2_future.poll_unpin(response2_cx),
1896            Poll::Ready(Err(crate::Error::ClientChannelClosed {
1897                status: zx_status::Status::UNAVAILABLE,
1898                protocol_name: "test_protocol",
1899                epitaph: Some(epitaph),
1900            })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1901        );
1902
1903        // poll the event stream to completion.
1904        assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1905    }
1906
1907    #[fasync::run_singlethreaded(test)]
1908    async fn client_allows_take_event_stream_even_if_event_delivered() {
1909        let (client_end, server_end) = zx::Channel::create();
1910        let client_end = AsyncChannel::from_channel(client_end);
1911        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1912
1913        // first simulate an event coming in, even though nothing has polled
1914        send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1915
1916        // next, poll on a response
1917        let (response_waker, _response_waker_count) = new_count_waker();
1918        let response_cx = &mut Context::from_waker(&response_waker);
1919        let mut response_future =
1920            client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
1921        assert!(response_future.poll_unpin(response_cx).is_pending());
1922
1923        // then, make sure we can still take the event receiver without panicking
1924        let mut _event_receiver = client.take_event_receiver();
1925    }
1926
1927    #[fasync::run_singlethreaded(test)]
1928    async fn client_reports_epitaph_from_all_read_actions() {
1929        #[derive(Debug, PartialEq)]
1930        enum Action {
1931            SendMsg,   // send a one-way message
1932            SendQuery, // send a two-way message and just call .check()
1933            WaitQuery, // send a two-way message and wait for the response
1934            RecvEvent, // wait to receive an event
1935        }
1936        impl Action {
1937            fn should_report_epitaph(&self) -> bool {
1938                match self {
1939                    Action::SendMsg | Action::SendQuery => false,
1940                    Action::WaitQuery | Action::RecvEvent => true,
1941                }
1942            }
1943        }
1944        use Action::*;
1945        // Test all permutations of two actions. Verify the epitaph is reported
1946        // twice (2 reads), once (1 read, 1 write), or not at all (2 writes).
1947        for two_actions in &[
1948            [SendMsg, SendMsg],
1949            [SendMsg, SendQuery],
1950            [SendMsg, WaitQuery],
1951            [SendMsg, RecvEvent],
1952            [SendQuery, SendMsg],
1953            [SendQuery, SendQuery],
1954            [SendQuery, WaitQuery],
1955            [SendQuery, RecvEvent],
1956            [WaitQuery, SendMsg],
1957            [WaitQuery, SendQuery],
1958            [WaitQuery, WaitQuery],
1959            [WaitQuery, RecvEvent],
1960            [RecvEvent, SendMsg],
1961            [RecvEvent, SendQuery],
1962            [RecvEvent, WaitQuery],
1963            // No [RecvEvent, RecvEvent] because it behaves differently: after
1964            // reporting an epitaph, the next call returns None.
1965        ] {
1966            let (client_end, server_end) = zx::Channel::create();
1967            let client_end = AsyncChannel::from_channel(client_end);
1968            let client = Client::new(client_end, "test_protocol");
1969
1970            // Immediately close the FIDL channel with an epitaph.
1971            let server_end = AsyncChannel::from_channel(server_end);
1972            server_end
1973                .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1974                .expect("failed to write epitaph");
1975
1976            let mut event_receiver = client.take_event_receiver();
1977
1978            // Assert that each action reports the epitaph.
1979            for (index, action) in two_actions.iter().enumerate() {
1980                let err = match action {
1981                    SendMsg => {
1982                        client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
1983                    }
1984                    WaitQuery => client
1985                        .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1986                        .await
1987                        .err(),
1988                    SendQuery => client
1989                        .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1990                        .check()
1991                        .err(),
1992                    RecvEvent => event_receiver.next().await.unwrap().err(),
1993                };
1994                let details = format!("index: {index:?}, two_actions: {two_actions:?}");
1995                match err {
1996                    None => assert!(
1997                        !action.should_report_epitaph(),
1998                        "expected epitaph, but succeeded.\n{details}"
1999                    ),
2000                    Some(crate::Error::ClientChannelClosed {
2001                        status: zx_status::Status::UNAVAILABLE,
2002                        protocol_name: "test_protocol",
2003                        epitaph: Some(epitaph),
2004                    }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
2005                        action.should_report_epitaph(),
2006                        "got epitaph unexpectedly.\n{details}",
2007                    ),
2008                    Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
2009                }
2010            }
2011
2012            // If we got the epitaph from RecvEvent, the next should return None.
2013            if two_actions.contains(&RecvEvent) {
2014                assert_matches!(event_receiver.next().await, None);
2015            }
2016        }
2017    }
2018
2019    #[test]
2020    fn client_query_result_check() {
2021        let mut executor = fasync::TestExecutor::new();
2022        let (client_end, server_end) = zx::Channel::create();
2023        let client_end = AsyncChannel::from_channel(client_end);
2024        let client = Client::new(client_end, "test_protocol");
2025
2026        let server = AsyncChannel::from_channel(server_end);
2027
2028        // Sending works, and checking when a message successfully sends returns itself.
2029        let active_fut =
2030            client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2031
2032        let mut checked_fut = active_fut.check().expect("failed to check future");
2033
2034        // Should be able to complete the query even after checking.
2035        let mut buffer = MessageBufEtc::new();
2036        executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
2037        let two_way_tx_id = 1u8;
2038        assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2039
2040        let (bytes, handles) = (&mut vec![], &mut vec![]);
2041        let header =
2042            TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2043        encode_transaction(header, bytes, handles);
2044        server.write_etc(bytes, handles).expect("Server channel write failed");
2045
2046        executor
2047            .run_singlethreaded(&mut checked_fut)
2048            .map(|x| assert_eq!(x, SEND_DATA))
2049            .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2050
2051        // Close the server channel, meaning the next query will fail.
2052        drop(server);
2053
2054        let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2055
2056        // The check succeeds, because we do not expose PEER_CLOSED on writes.
2057        let mut checked_fut = query_fut.check().expect("failed to check future");
2058        // But the query will fail when it tries to read the response.
2059        assert_matches!(
2060            executor.run_singlethreaded(&mut checked_fut),
2061            Err(crate::Error::ClientChannelClosed {
2062                status: zx_status::Status::PEER_CLOSED,
2063                protocol_name: "test_protocol",
2064                epitaph: None,
2065            })
2066        );
2067    }
2068
2069    #[fasync::run_singlethreaded(test)]
2070    async fn client_into_channel() {
2071        // This test doesn't actually do any async work, but the fuchsia
2072        // executor must be set up in order to create the channel.
2073        let (client_end, _server_end) = zx::Channel::create();
2074        let client_end = AsyncChannel::from_channel(client_end);
2075        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2076
2077        assert!(client.into_channel().is_ok());
2078    }
2079
2080    #[fasync::run_singlethreaded(test)]
2081    async fn client_into_channel_outstanding_messages() {
2082        // This test doesn't actually do any async work, but the fuchsia
2083        // executor must be set up in order to create the channel.
2084        let (client_end, _server_end) = zx::Channel::create();
2085        let client_end = AsyncChannel::from_channel(client_end);
2086        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2087
2088        {
2089            // Create a send future to insert a message interest but drop it
2090            // before a response can be received.
2091            let _sender =
2092                client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2093        }
2094
2095        assert!(client.into_channel().is_err());
2096    }
2097
2098    #[fasync::run_singlethreaded(test)]
2099    async fn client_into_channel_active_clone() {
2100        // This test doesn't actually do any async work, but the fuchsia
2101        // executor must be set up in order to create the channel.
2102        let (client_end, _server_end) = zx::Channel::create();
2103        let client_end = AsyncChannel::from_channel(client_end);
2104        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2105
2106        let _cloned_client = client.clone();
2107
2108        assert!(client.into_channel().is_err());
2109    }
2110
2111    #[fasync::run_singlethreaded(test)]
2112    async fn client_into_channel_outstanding_messages_get_received() {
2113        let (client_end, server_end) = zx::Channel::create();
2114        let client_end = AsyncChannel::from_channel(client_end);
2115        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2116
2117        let server = AsyncChannel::from_channel(server_end);
2118        let mut buffer = MessageBufEtc::new();
2119        let receiver = async move {
2120            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2121            let two_way_tx_id = 1u8;
2122            assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2123
2124            let (bytes, handles) = (&mut vec![], &mut vec![]);
2125            let header =
2126                TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2127            encode_transaction(header, bytes, handles);
2128            server.write_etc(bytes, handles).expect("Server channel write failed");
2129        };
2130
2131        // add a timeout to receiver so if test is broken it doesn't take forever
2132        let receiver = receiver
2133            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2134                panic!("did not receiver message in time!")
2135            });
2136
2137        let sender = client
2138            .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2139            .map_ok(|x| assert_eq!(x, SEND_DATA))
2140            .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2141
2142        // add a timeout to receiver so if test is broken it doesn't take forever
2143        let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2144            panic!("did not receive response in time!")
2145        });
2146
2147        let ((), ()) = join!(receiver, sender);
2148
2149        assert!(client.into_channel().is_ok());
2150    }
2151
2152    #[fasync::run_singlethreaded(test)]
2153    async fn client_decode_errors_are_broadcast() {
2154        let (client_end, server_end) = zx::Channel::create();
2155        let client_end = AsyncChannel::from_channel(client_end);
2156        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2157
2158        let server = AsyncChannel::from_channel(server_end);
2159
2160        let _server = fasync::Task::spawn(async move {
2161            let mut buffer = MessageBufEtc::new();
2162            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2163            let two_way_tx_id = 1u8;
2164            assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2165
2166            let (bytes, handles) = (&mut vec![], &mut vec![]);
2167            let header =
2168                TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2169            encode_transaction(header, bytes, handles);
2170            // Zero out the at-rest flags which will give this message an invalid version.
2171            bytes[4] = 0;
2172            server.write_etc(bytes, handles).expect("Server channel write failed");
2173
2174            // Wait forever to stop the channel from being closed.
2175            pending::<()>().await;
2176        });
2177
2178        let futures = FuturesUnordered::new();
2179
2180        for _ in 0..4 {
2181            futures.push(async {
2182                assert_matches!(
2183                    client
2184                        .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2185                        .map_ok(|x| assert_eq!(x, SEND_DATA))
2186                        .await,
2187                    Err(crate::Error::UnsupportedWireFormatVersion)
2188                );
2189            });
2190        }
2191
2192        futures
2193            .collect::<Vec<_>>()
2194            .on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
2195            .await;
2196    }
2197
2198    #[fasync::run_singlethreaded(test)]
2199    async fn into_channel_from_waker_succeeds() {
2200        let (client_end, server_end) = zx::Channel::create();
2201        let client_end = AsyncChannel::from_channel(client_end);
2202        let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2203
2204        let server = AsyncChannel::from_channel(server_end);
2205        let mut buffer = MessageBufEtc::new();
2206        let receiver = async move {
2207            server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2208            let two_way_tx_id = 1u8;
2209            assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2210
2211            let (bytes, handles) = (&mut vec![], &mut vec![]);
2212            let header =
2213                TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2214            encode_transaction(header, bytes, handles);
2215            server.write_etc(bytes, handles).expect("Server channel write failed");
2216        };
2217
2218        struct Sender {
2219            future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
2220        }
2221
2222        let (done_tx, done_rx) = oneshot::channel();
2223
2224        let sender = Arc::new(Sender {
2225            future: Mutex::new(Box::pin(async move {
2226                client
2227                    .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2228                    .map_ok(|x| assert_eq!(x, SEND_DATA))
2229                    .unwrap_or_else(|e| panic!("fidl error: {e:?}"))
2230                    .await;
2231
2232                assert!(client.into_channel().is_ok());
2233
2234                let _ = done_tx.send(());
2235            })),
2236        });
2237
2238        // This test isn't typically how this would work; normally, the future would get woken and
2239        // an executor would be responsible for running the task.  We do it this way because if this
2240        // works, then it means the case where `into_channel` is used after a response is received
2241        // on a multi-threaded executor will always work (which isn't easy to test directly).
2242        impl Wake for Sender {
2243            fn wake(self: Arc<Self>) {
2244                self.wake_by_ref();
2245            }
2246            fn wake_by_ref(self: &Arc<Self>) {
2247                assert!(
2248                    self.future
2249                        .lock()
2250                        .poll_unpin(&mut Context::from_waker(Waker::noop()))
2251                        .is_ready()
2252                );
2253            }
2254        }
2255
2256        let waker = Waker::from(sender.clone());
2257
2258        assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
2259
2260        receiver.await;
2261
2262        done_rx.await.unwrap();
2263    }
2264}