1use crate::Error;
8use crate::encoding::{
9 Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags, Encode, Encoder, EpitaphBody,
10 MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect, TransactionHeader,
11 TransactionMessage, TransactionMessageType, TypeMarker, decode_transaction_header,
12};
13use fuchsia_sync::Mutex;
14use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
15use futures::ready;
16use futures::stream::{FusedStream, Stream};
17use futures::task::{Context, Poll, Waker};
18use slab::Slab;
19use std::collections::VecDeque;
20use std::mem;
21use std::ops::ControlFlow;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{RawWaker, RawWakerVTable};
25use zx_status;
26
27#[doc(hidden)] pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
30 mut buf: D::MessageBufEtc,
31) -> Result<T::Owned, Error>
32where
33 T::Owned: Decode<T, D>,
34{
35 let (bytes, handles) = buf.split_mut();
36 let (header, body_bytes) = decode_transaction_header(bytes)?;
37 if header.ordinal != EXPECTED_ORDINAL {
38 return Err(Error::InvalidResponseOrdinal);
39 }
40 let mut output = Decode::<T, D>::new_empty();
41 Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
42 Ok(output)
43}
44
45#[derive(Debug, Clone)]
47pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
48 inner: Arc<ClientInner<D>>,
49}
50
51pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
53 MessageResponse<D>,
54 fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
55>;
56
57#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
62 pub MaybeDone<DecodedQueryResponseFut<T, D>>,
63);
64
65impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
66 fn is_terminated(&self) -> bool {
67 matches!(self.0, MaybeDone::Gone)
68 }
69}
70
71impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
72 type Output = Result<T, Error>;
73
74 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 ready!(self.0.poll_unpin(cx));
76 let maybe_done = Pin::new(&mut self.0);
77 Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
78 }
79}
80
81impl<T> QueryResponseFut<T> {
82 pub fn check(self) -> Result<Self, Error> {
89 match self.0 {
90 MaybeDone::Done(Err(e)) => Err(e),
91 x => Ok(QueryResponseFut(x)),
92 }
93 }
94}
95
96const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
97const TXID_GENERATION_SHIFT: usize = 24;
98const TXID_GENERATION_MASK: u8 = 0x7F;
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct Txid(u32);
103#[derive(Debug, Copy, Clone, PartialEq, Eq)]
105struct InterestId(usize);
106
107impl InterestId {
108 fn from_txid(txid: Txid) -> Self {
109 InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
110 }
111}
112
113impl Txid {
114 fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
115 let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
118 let generation = (generation & TXID_GENERATION_MASK) as u32;
120
121 let txid = (generation << TXID_GENERATION_SHIFT) | id;
126
127 Txid(txid)
128 }
129
130 pub fn as_raw_id(&self) -> u32 {
132 self.0
133 }
134}
135
136impl From<u32> for Txid {
137 fn from(txid: u32) -> Self {
138 Self(txid)
139 }
140}
141
142impl<D: ResourceDialect> Client<D> {
143 pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
148 Client {
149 inner: Arc::new(ClientInner {
150 channel: channel.boxed(),
151 interests: Mutex::default(),
152 terminal_error: Mutex::default(),
153 protocol_name,
154 }),
155 }
156 }
157
158 pub fn as_channel(&self) -> &D::ProxyChannel {
160 self.inner.channel.as_channel()
161 }
162
163 pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
170 match Arc::try_unwrap(self.inner) {
180 Ok(inner) => {
181 if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
182 Ok(inner.channel.unbox())
183 } else {
184 Err(Self { inner: Arc::new(inner) })
189 }
190 }
191 Err(inner) => Err(Self { inner }),
192 }
193 }
194
195 pub fn take_event_receiver(&self) -> EventReceiver<D> {
198 {
199 let mut lock = self.inner.interests.lock();
200
201 if let EventListener::None = lock.event_listener {
202 lock.event_listener = EventListener::WillPoll;
203 } else {
204 panic!("Event stream was already taken");
205 }
206 }
207
208 EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
209 }
210
211 pub fn send<T: TypeMarker>(
213 &self,
214 body: impl Encode<T, D>,
215 ordinal: u64,
216 dynamic_flags: DynamicFlags,
217 ) -> Result<(), Error> {
218 let msg =
219 TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
220 crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
221 msg,
222 |bytes, handles| self.send_raw(bytes, handles),
223 )
224 }
225
226 pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
228 &self,
229 body: impl Encode<Request, D>,
230 dynamic_flags: DynamicFlags,
231 ) -> QueryResponseFut<Response::Owned, D>
232 where
233 Response::Owned: Decode<Response, D>,
234 {
235 self.send_query_and_decode::<Request, Response::Owned>(
236 body,
237 ORDINAL,
238 dynamic_flags,
239 |buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
240 )
241 }
242
243 pub fn send_query_and_decode<Request: TypeMarker, Output>(
246 &self,
247 body: impl Encode<Request, D>,
248 ordinal: u64,
249 dynamic_flags: DynamicFlags,
250 decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
251 ) -> QueryResponseFut<Output, D> {
252 let send_result = self.send_raw_query(|tx_id, bytes, handles| {
253 let msg = TransactionMessage {
254 header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
255 body,
256 };
257 Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
258 Ok(())
259 });
260
261 QueryResponseFut(match send_result {
262 Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
263 Err(e) => MaybeDone::Done(Err(e)),
264 })
265 }
266
267 pub fn send_raw(
269 &self,
270 bytes: &[u8],
271 handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
272 ) -> Result<(), Error> {
273 match self.inner.channel.write_etc(bytes, handles) {
274 Ok(()) | Err(None) => Ok(()),
275 Err(Some(e)) => Err(Error::ClientWrite(e.into())),
276 }
277 }
278
279 pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
281 where
282 F: for<'a, 'b> FnOnce(
283 Txid,
284 &'a mut Vec<u8>,
285 &'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
286 ) -> Result<(), Error>,
287 {
288 let id = self.inner.interests.lock().register_msg_interest();
289 crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
290 encode_msg(id, bytes, handles)?;
291 self.send_raw(bytes, handles)
292 })?;
293
294 Ok(MessageResponse { id, client: Some(self.inner.clone()) })
295 }
296}
297
298#[must_use]
299#[derive(Debug)]
301pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
302 id: Txid,
303 client: Option<Arc<ClientInner<D>>>,
305}
306
307impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
308
309impl<D: ResourceDialect> Future for MessageResponse<D> {
310 type Output = Result<D::MessageBufEtc, Error>;
311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 let this = &mut *self;
313 let res;
314 {
315 let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
316 res = client.poll_recv_msg_response(this.id, cx);
317 }
318
319 if let Poll::Ready(Ok(_)) = res {
321 this.client.take().expect("MessageResponse polled after completion");
322 }
323
324 res
325 }
326}
327
328impl<D: ResourceDialect> Drop for MessageResponse<D> {
329 fn drop(&mut self) {
330 if let Some(client) = &self.client {
331 client.interests.lock().deregister(self.id);
332 }
333 }
334}
335
336#[derive(Debug)]
339enum MessageInterest<D: ResourceDialect> {
340 WillPoll,
342 Waiting(Waker),
344 Received(D::MessageBufEtc),
346 Discard,
349}
350
351impl<D: ResourceDialect> MessageInterest<D> {
352 fn is_received(&self) -> bool {
354 matches!(*self, MessageInterest::Received(_))
355 }
356
357 fn unwrap_received(self) -> D::MessageBufEtc {
358 if let MessageInterest::Received(buf) = self {
359 buf
360 } else {
361 panic!("EXPECTED received message")
362 }
363 }
364}
365
366#[derive(Debug)]
367enum EventReceiverState {
368 Active,
369 Terminal,
370 Terminated,
371}
372
373#[derive(Debug)]
375pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
376 inner: Arc<ClientInner<D>>,
377 state: EventReceiverState,
378}
379
380impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
381
382impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
383 fn is_terminated(&self) -> bool {
384 matches!(self.state, EventReceiverState::Terminated)
385 }
386}
387
388impl<D: ResourceDialect> Stream for EventReceiver<D> {
393 type Item = Result<D::MessageBufEtc, Error>;
394
395 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
396 match self.state {
397 EventReceiverState::Active => {}
398 EventReceiverState::Terminated => {
399 panic!("polled EventReceiver after `None`");
400 }
401 EventReceiverState::Terminal => {
402 self.state = EventReceiverState::Terminated;
403 return Poll::Ready(None);
404 }
405 }
406
407 Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
408 Ok(x) => Some(Ok(x)),
409 Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
410 self.state = EventReceiverState::Terminated;
413 None
414 }
415 err @ Err(_) => {
416 self.state = EventReceiverState::Terminal;
419 Some(err)
420 }
421 })
422 }
423}
424
425impl<D: ResourceDialect> Drop for EventReceiver<D> {
426 fn drop(&mut self) {
427 self.inner.interests.lock().dropped_event_listener();
428 }
429}
430
431#[derive(Debug, Default)]
432enum EventListener {
433 #[default]
435 None,
436 WillPoll,
438 Some(Waker),
440}
441
442impl EventListener {
443 fn is_some(&self) -> bool {
444 matches!(self, EventListener::Some(_))
445 }
446}
447
448#[derive(Debug)]
450struct ClientInner<D: ResourceDialect> {
451 channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
453
454 interests: Mutex<Interests<D>>,
456
457 terminal_error: Mutex<Option<Error>>,
460
461 protocol_name: &'static str,
463}
464
465#[derive(Debug)]
466struct Interests<D: ResourceDialect> {
467 messages: Slab<MessageInterest<D>>,
468 events: VecDeque<D::MessageBufEtc>,
469 event_listener: EventListener,
470 waker_count: usize,
472 generation: u8,
478}
479
480impl<D: ResourceDialect> Default for Interests<D> {
481 fn default() -> Self {
482 Interests {
483 messages: Slab::new(),
484 events: Default::default(),
485 event_listener: Default::default(),
486 waker_count: 0,
487 generation: 0,
488 }
489 }
490}
491
492impl<D: ResourceDialect> Interests<D> {
493 fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
495 self.events.push_back(buf);
496 self.take_event_waker()
497 }
498
499 fn take_event_waker(&mut self) -> Option<Waker> {
501 if self.event_listener.is_some() {
502 let EventListener::Some(waker) =
503 mem::replace(&mut self.event_listener, EventListener::WillPoll)
504 else {
505 unreachable!()
506 };
507
508 self.waker_count -= 1;
510 Some(waker)
511 } else {
512 None
513 }
514 }
515
516 fn event_waker(&self) -> Option<&Waker> {
518 match &self.event_listener {
519 EventListener::Some(waker) => Some(waker),
520 _ => None,
521 }
522 }
523
524 fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
527 let InterestId(raw_id) = InterestId::from_txid(txid);
528 let Some(interest) = self.messages.get_mut(raw_id) else {
531 return Err(Error::InvalidResponseTxid);
533 };
534
535 let mut waker = None;
536 if let MessageInterest::Discard = interest {
537 self.messages.remove(raw_id);
538 } else if let MessageInterest::Waiting(w) =
539 mem::replace(interest, MessageInterest::Received(buf))
540 {
541 waker = Some(w);
542
543 self.waker_count -= 1;
545 }
546
547 Ok(waker)
548 }
549
550 fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
553 let InterestId(raw_id) = InterestId::from_txid(txid);
554 let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
555 match interest {
556 MessageInterest::Received(_) => {
557 return Some(self.messages.remove(raw_id).unwrap_received());
558 }
559 MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
560 MessageInterest::WillPoll => self.waker_count += 1,
561 MessageInterest::Waiting(_) => {}
562 }
563 *interest = MessageInterest::Waiting(cx.waker().clone());
564 None
565 }
566
567 fn deregister(&mut self, txid: Txid) {
569 let InterestId(raw_id) = InterestId::from_txid(txid);
570 match self.messages[raw_id] {
571 MessageInterest::Received(_) => {
572 self.messages.remove(raw_id);
573 return;
574 }
575 MessageInterest::WillPoll => {}
576 MessageInterest::Waiting(_) => self.waker_count -= 1,
577 MessageInterest::Discard => unreachable!(),
578 }
579 self.messages[raw_id] = MessageInterest::Discard;
580 }
581
582 fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
584 self.events.pop_front().or_else(|| {
585 if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
586 .is_some()
587 {
588 self.waker_count += 1;
589 }
590 None
591 })
592 }
593
594 fn dropped_event_listener(&mut self) {
596 if self.event_listener.is_some() {
597 self.waker_count -= 1;
599 }
600 self.event_listener = EventListener::None;
601 }
602
603 fn register_msg_interest(&mut self) -> Txid {
608 self.generation = self.generation.wrapping_add(1);
609 Txid::from_interest_id(
612 InterestId(self.messages.insert(MessageInterest::WillPoll)),
613 self.generation,
614 )
615 }
616}
617
618impl<D: ResourceDialect> ClientInner<D> {
619 fn poll_recv_event(
620 self: &Arc<Self>,
621 cx: &Context<'_>,
622 ) -> Poll<Result<D::MessageBufEtc, Error>> {
623 if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
625 return Poll::Ready(Ok(msg_buf));
626 }
627
628 let maybe_terminal_error = self.recv_all(Some(Txid(0)));
631
632 let mut lock = self.interests.lock();
633
634 if let Some(msg_buf) = lock.events.pop_front() {
635 Poll::Ready(Ok(msg_buf))
636 } else {
637 maybe_terminal_error?;
638 Poll::Pending
639 }
640 }
641
642 fn poll_recv_msg_response(
645 self: &Arc<Self>,
646 txid: Txid,
647 cx: &Context<'_>,
648 ) -> Poll<Result<D::MessageBufEtc, Error>> {
649 if let Some(buf) = self.interests.lock().register(txid, cx) {
651 return Poll::Ready(Ok(buf));
652 }
653
654 let maybe_terminal_error = self.recv_all(Some(txid));
657
658 let InterestId(raw_id) = InterestId::from_txid(txid);
659 let mut interests = self.interests.lock();
660 if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
661 let buf = interests.messages.remove(raw_id).unwrap_received();
664 Poll::Ready(Ok(buf))
665 } else {
666 maybe_terminal_error?;
667 Poll::Pending
668 }
669 }
670
671 fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
681 let mut terminal_error = self.terminal_error.lock();
685 if let Some(error) = terminal_error.as_ref() {
686 return Err(error.clone());
687 }
688
689 let recv_once = |waker| {
690 let cx = &mut Context::from_waker(&waker);
691
692 let mut buf = D::MessageBufEtc::new();
693 let result = self.channel.recv_etc_from(cx, &mut buf);
694 match result {
695 Poll::Ready(Ok(())) => {}
696 Poll::Ready(Err(None)) => {
697 return Err(Error::ClientChannelClosed {
700 status: zx_status::Status::PEER_CLOSED,
701 protocol_name: self.protocol_name,
702 epitaph: None,
703 #[cfg(not(target_os = "fuchsia"))]
704 reason: self.channel.closed_reason(),
705 });
706 }
707 Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
708 Poll::Pending => return Ok(ControlFlow::Break(())),
709 };
710
711 let (bytes, _) = buf.split_mut();
712 let (header, body_bytes) = decode_transaction_header(bytes)?;
713 if header.is_epitaph() {
714 let handles = &mut [];
716 let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
717 Decoder::<D>::decode_into::<EpitaphBody>(
718 &header,
719 body_bytes,
720 handles,
721 &mut epitaph_body,
722 )?;
723 return Err(Error::ClientChannelClosed {
724 status: epitaph_body.error,
725 protocol_name: self.protocol_name,
726 epitaph: Some(epitaph_body.error.into_raw() as u32),
727 #[cfg(not(target_os = "fuchsia"))]
728 reason: self.channel.closed_reason(),
729 });
730 }
731
732 let txid = Txid(header.tx_id);
733
734 let waker = {
735 buf.shrink_bytes_to_fit();
736 let mut interests = self.interests.lock();
737 if txid == Txid(0) {
738 interests.push_event(buf)
739 } else {
740 interests.push_message(txid, buf)?
741 }
742 };
743
744 if want_txid != Some(txid)
746 && let Some(waker) = waker
747 {
748 waker.wake();
749 }
750
751 Ok(ControlFlow::Continue(()))
752 };
753
754 loop {
755 let waker = {
756 let interests = self.interests.lock();
757 if interests.waker_count == 0 {
758 return Ok(());
759 } else if interests.waker_count == 1 {
760 if let Some(waker) = interests.event_waker() {
766 waker.clone()
767 } else {
768 interests
769 .messages
770 .iter()
771 .find_map(|(_, interest)| {
772 if let MessageInterest::Waiting(waker) = interest {
773 Some(waker.clone())
774 } else {
775 None
776 }
777 })
778 .unwrap()
779 }
780 } else {
781 let weak = Arc::downgrade(self);
782 let waker = ClientWaker(Arc::new(move || {
783 if let Some(strong) = weak.upgrade() {
784 #[cfg(target_os = "fuchsia")]
787 if strong.recv_all(None).is_ok() {
788 return;
789 }
790
791 strong.wake_all();
792 }
793 }));
794 unsafe {
799 Waker::from_raw(RawWaker::new(
800 Arc::into_raw(Arc::new(waker)) as *const (),
801 &WAKER_VTABLE,
802 ))
803 }
804 }
805 };
806
807 match recv_once(waker) {
808 Ok(ControlFlow::Continue(())) => {}
809 Ok(ControlFlow::Break(())) => return Ok(()),
810 Err(error) => {
811 self.wake_all();
813 return Err(terminal_error.insert(error).clone());
814 }
815 }
816 }
817 }
818
819 fn wake_all(&self) {
821 let mut lock = self.interests.lock();
822 for (_, interest) in &mut lock.messages {
823 if let MessageInterest::Waiting(_) = interest {
824 let MessageInterest::Waiting(waker) =
825 mem::replace(interest, MessageInterest::WillPoll)
826 else {
827 unreachable!()
828 };
829 waker.wake();
830 }
831 }
832 if let Some(waker) = lock.take_event_waker() {
833 waker.wake();
834 }
835 lock.waker_count = 0;
836 }
837}
838
839#[derive(Clone)]
840struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
841
842static WAKER_VTABLE: RawWakerVTable =
843 RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
844
845unsafe fn clone_waker(data: *const ()) -> RawWaker {
846 unsafe { Arc::increment_strong_count(data as *const ClientWaker) };
847 RawWaker::new(data, &WAKER_VTABLE)
848}
849
850unsafe fn wake(data: *const ()) {
851 unsafe { Arc::from_raw(data as *const ClientWaker) }.0();
852}
853
854unsafe fn wake_by_ref(data: *const ()) {
855 mem::ManuallyDrop::new(unsafe { Arc::from_raw(data as *const ClientWaker) }).0();
856}
857
858unsafe fn drop_waker(data: *const ()) {
859 unsafe { Arc::from_raw(data as *const ClientWaker) };
860}
861
862#[cfg(target_os = "fuchsia")]
863pub mod sync {
864 use super::*;
867 use crate::endpoints::ProtocolMarker;
868 use std::mem::MaybeUninit;
869 use zx::MessageBufEtc;
870
871 #[derive(Debug)]
873 pub struct Client {
874 channel: zx::Channel,
876 }
877
878 impl Client {
879 pub fn new(channel: zx::Channel) -> Self {
881 Client { channel }
882 }
883
884 pub fn as_channel(&self) -> &zx::Channel {
886 &self.channel
887 }
888
889 pub fn into_channel(self) -> zx::Channel {
891 self.channel
892 }
893
894 pub fn send<T: TypeMarker>(
896 &self,
897 body: impl Encode<T, DefaultFuchsiaResourceDialect>,
898 ordinal: u64,
899 dynamic_flags: DynamicFlags,
900 ) -> Result<(), Error> {
901 let mut write_bytes = Vec::new();
902 let mut write_handles = Vec::new();
903 let msg = TransactionMessage {
904 header: TransactionHeader::new(0, ordinal, dynamic_flags),
905 body,
906 };
907 Encoder::encode::<TransactionMessageType<T>>(
908 &mut write_bytes,
909 &mut write_handles,
910 msg,
911 )?;
912 match self.channel.write_etc(&write_bytes, &mut write_handles) {
913 Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
914 Err(e) => Err(Error::ClientWrite(e.into())),
915 }
916 }
917
918 pub fn send_query<Request: TypeMarker, Response: TypeMarker, P: ProtocolMarker>(
920 &self,
921 body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
922 ordinal: u64,
923 dynamic_flags: DynamicFlags,
924 deadline: zx::MonotonicInstant,
925 ) -> Result<Response::Owned, Error>
926 where
927 Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
928 {
929 let mut write_bytes = Vec::new();
930 let mut write_handles = Vec::new();
931
932 let msg = TransactionMessage {
933 header: TransactionHeader::new(0, ordinal, dynamic_flags),
934 body,
935 };
936 Encoder::encode::<TransactionMessageType<Request>>(
937 &mut write_bytes,
938 &mut write_handles,
939 msg,
940 )?;
941
942 let mut bytes_out =
945 Vec::<MaybeUninit<u8>>::with_capacity(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
946 unsafe { bytes_out.set_len(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize) };
949
950 let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
954 zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
955
956 let (bytes_out, handles_out) = self
959 .channel
960 .call_etc_uninit(
961 deadline,
962 &write_bytes,
963 &mut write_handles,
964 bytes_out.as_mut_slice(),
965 handles_out,
966 )
967 .map_err(|e| self.wrap_error::<P, _>(Error::ClientCall, e))?;
968
969 let (header, body_bytes) = decode_transaction_header(bytes_out)?;
970 if header.ordinal != ordinal {
971 return Err(Error::InvalidResponseOrdinal);
972 }
973 let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
974 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
975 &header,
976 body_bytes,
977 handles_out,
978 &mut output,
979 )?;
980 Ok(output)
981 }
982
983 pub fn wait_for_event<P: ProtocolMarker>(
985 &self,
986 deadline: zx::MonotonicInstant,
987 ) -> Result<MessageBufEtc, Error> {
988 let mut buf = zx::MessageBufEtc::new();
989 buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
990 buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
991
992 loop {
993 self.channel
994 .wait_one(
995 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
996 deadline,
997 )
998 .map_err(|e| self.wrap_error::<P, _>(Error::ClientEvent, e))?;
999 match self.channel.read_etc(&mut buf) {
1000 Ok(()) => {
1001 let (header, body_bytes) = decode_transaction_header(buf.bytes())
1004 .map_err(|_| Error::InvalidHeader)?;
1005 if header.is_epitaph() {
1006 let handles = &mut [];
1009 let mut epitaph_body =
1010 Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
1011 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
1012 &header,
1013 body_bytes,
1014 handles,
1015 &mut epitaph_body,
1016 )?;
1017 return Err(Error::ClientChannelClosed {
1018 status: epitaph_body.error,
1019 protocol_name: P::DEBUG_NAME,
1020 epitaph: Some(epitaph_body.error.into_raw() as u32),
1021 });
1022 }
1023 if header.tx_id != 0 {
1024 return Err(Error::UnexpectedSyncResponse);
1025 }
1026 return Ok(buf);
1027 }
1028 Err(zx::Status::SHOULD_WAIT) => {
1029 continue;
1031 }
1032 Err(e) => {
1033 return Err(self.wrap_error::<P, _>(|x| Error::ClientRead(x.into()), e));
1034 }
1035 }
1036 }
1037 }
1038
1039 fn wrap_error<P: ProtocolMarker, T: Fn(zx_status::Status) -> Error>(
1043 &self,
1044 variant: T,
1045 err: zx_status::Status,
1046 ) -> Error {
1047 if err == zx_status::Status::PEER_CLOSED {
1048 Error::ClientChannelClosed {
1049 status: zx_status::Status::PEER_CLOSED,
1050 protocol_name: P::DEBUG_NAME,
1051 epitaph: None,
1052 }
1053 } else {
1054 variant(err)
1055 }
1056 }
1057 }
1058}
1059
1060#[cfg(all(test, target_os = "fuchsia"))]
1061mod tests {
1062 use super::*;
1063 use crate::encoding::MAGIC_NUMBER_INITIAL;
1064 use crate::endpoints::{ControlHandle, ProtocolMarker, Proxy, RequestStream, SynchronousProxy};
1065 use crate::epitaph::{self, ChannelEpitaphExt};
1066 use crate::{Channel, OnSignalsRef, ServeInner};
1067 use anyhow::{Context as _, Error};
1068 use assert_matches::assert_matches;
1069 use fuchsia_async as fasync;
1070 use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
1071 use futures::channel::oneshot;
1072 use futures::stream::{FuturesUnordered, Stream};
1073 use futures::{StreamExt, TryFutureExt, join};
1074 use futures_test::task::new_count_waker;
1075 use std::future::pending;
1076 use std::task::{Wake, Waker};
1077 use std::thread;
1078 use zx::MessageBufEtc;
1079
1080 const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
1081 const SEND_ORDINAL: u64 = 42 << 32;
1082 const SEND_DATA: u8 = 55;
1083
1084 const EVENT_ORDINAL: u64 = 854 << 23;
1085
1086 struct TestProtocolMarker;
1087 impl ProtocolMarker for TestProtocolMarker {
1088 type Proxy = TestProxy;
1089 type SynchronousProxy = TestSynchronousProxy;
1090 type RequestStream = TestRequestStream;
1091 const DEBUG_NAME: &str = "test_protocol";
1092 }
1093
1094 struct TestProxy;
1095 impl Proxy for TestProxy {
1096 type Protocol = TestProtocolMarker;
1097 fn from_channel(_inner: AsyncChannel) -> Self {
1098 unimplemented!();
1099 }
1100 fn into_channel(self) -> Result<AsyncChannel, Self> {
1101 unimplemented!();
1102 }
1103 fn as_channel(&self) -> &AsyncChannel {
1104 unimplemented!();
1105 }
1106 }
1107
1108 struct TestSynchronousProxy;
1109 impl SynchronousProxy for TestSynchronousProxy {
1110 type Proxy = TestProxy;
1111 type Protocol = TestProtocolMarker;
1112 fn from_channel(_inner: Channel) -> Self {
1113 unimplemented!();
1114 }
1115 fn into_channel(self) -> Channel {
1116 unimplemented!();
1117 }
1118 fn as_channel(&self) -> &Channel {
1119 unimplemented!();
1120 }
1121 }
1122
1123 struct TestRequestStream;
1124 impl RequestStream for TestRequestStream {
1125 type Protocol = TestProtocolMarker;
1126 type ControlHandle = TestControlHandle;
1127 fn control_handle(&self) -> Self::ControlHandle {
1128 unimplemented!();
1129 }
1130 fn from_channel(_inner: AsyncChannel) -> Self {
1131 unimplemented!();
1132 }
1133 fn into_inner(self) -> (Arc<ServeInner>, bool) {
1134 unimplemented!();
1135 }
1136
1137 fn from_inner(_inner: Arc<ServeInner>, _is_terminated: bool) -> Self {
1138 unimplemented!();
1139 }
1140 }
1141 impl Stream for TestRequestStream {
1142 type Item = Result<(), crate::Error>;
1143 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1144 unimplemented!();
1145 }
1146 }
1147
1148 struct TestControlHandle;
1149 impl ControlHandle for TestControlHandle {
1150 fn shutdown(&self) {
1151 unimplemented!();
1152 }
1153 fn shutdown_with_epitaph(&self, _status: zx_status::Status) {
1154 unimplemented!();
1155 }
1156 fn is_closed(&self) -> bool {
1157 unimplemented!();
1158 }
1159 fn on_closed(&self) -> OnSignalsRef<'_> {
1160 unimplemented!();
1161 }
1162 fn signal_peer(
1163 &self,
1164 _clear_mask: zx::Signals,
1165 _set_mask: zx::Signals,
1166 ) -> Result<(), zx_status::Status> {
1167 unimplemented!();
1168 }
1169 }
1170
1171 #[rustfmt::skip]
1172 fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
1173 [
1174 txid_index, 0, 0, txid_generation, 2, 0, 0, MAGIC_NUMBER_INITIAL,
1177 0, 0, 0, 0, SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, SEND_DATA, 0, 0, 0, 0, 0, 0, 0, ]
1182 }
1183
1184 fn expected_sent_bytes_oneway() -> [u8; 24] {
1185 expected_sent_bytes(0, 0)
1186 }
1187
1188 fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
1189 let (bytes, handles) = (&mut vec![], &mut vec![]);
1190 encode_transaction(header, bytes, handles);
1191 channel.write_etc(bytes, handles).expect("Server channel write failed");
1192 }
1193
1194 fn encode_transaction(
1195 header: TransactionHeader,
1196 bytes: &mut Vec<u8>,
1197 handles: &mut Vec<zx::HandleDisposition<'static>>,
1198 ) {
1199 let event = TransactionMessage { header, body: SEND_DATA };
1200 Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
1201 bytes, handles, event,
1202 )
1203 .expect("Encoding failure");
1204 }
1205
1206 #[test]
1207 fn sync_client() -> Result<(), Error> {
1208 let (client_end, server_end) = zx::Channel::create();
1209 let client = sync::Client::new(client_end);
1210 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
1211 let mut received = MessageBufEtc::new();
1212 server_end.read_etc(&mut received).context("reading")?;
1213 assert_eq!(received.bytes(), expected_sent_bytes_oneway());
1214 Ok(())
1215 }
1216
1217 #[test]
1218 fn sync_client_with_response() -> Result<(), Error> {
1219 let (client_end, server_end) = zx::Channel::create();
1220 let client = sync::Client::new(client_end);
1221 thread::spawn(move || {
1222 let mut received = MessageBufEtc::new();
1224 server_end
1225 .wait_one(
1226 zx::Signals::CHANNEL_READABLE,
1227 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1228 )
1229 .expect("failed to wait for channel readable");
1230 server_end.read_etc(&mut received).expect("failed to read on server end");
1231 let (buf, _handles) = received.split_mut();
1232 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1233 assert_eq!(header.ordinal, SEND_ORDINAL);
1234 send_transaction(
1235 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1236 &server_end,
1237 );
1238 });
1239 let response_data = client
1240 .send_query::<u8, u8, TestProtocolMarker>(
1241 SEND_DATA,
1242 SEND_ORDINAL,
1243 DynamicFlags::empty(),
1244 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1245 )
1246 .context("sending query")?;
1247 assert_eq!(SEND_DATA, response_data);
1248 Ok(())
1249 }
1250
1251 #[test]
1252 fn sync_client_with_event_and_response() -> Result<(), Error> {
1253 let (client_end, server_end) = zx::Channel::create();
1254 let client = sync::Client::new(client_end);
1255 thread::spawn(move || {
1256 let mut received = MessageBufEtc::new();
1258 server_end
1259 .as_handle_ref()
1260 .wait_one(
1261 zx::Signals::CHANNEL_READABLE,
1262 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1263 )
1264 .expect("failed to wait for channel readable");
1265 server_end.read_etc(&mut received).expect("failed to read on server end");
1266 let (buf, _handles) = received.split_mut();
1267 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1268 assert_ne!(header.tx_id, 0);
1269 assert_eq!(header.ordinal, SEND_ORDINAL);
1270 send_transaction(
1272 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1273 &server_end,
1274 );
1275 send_transaction(
1278 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1279 &server_end,
1280 );
1281 });
1282 let response_data = client
1283 .send_query::<u8, u8, TestProtocolMarker>(
1284 SEND_DATA,
1285 SEND_ORDINAL,
1286 DynamicFlags::empty(),
1287 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1288 )
1289 .context("sending query")?;
1290 assert_eq!(SEND_DATA, response_data);
1291
1292 let event_buf = client
1293 .wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1294 zx::MonotonicDuration::from_seconds(5),
1295 ))
1296 .context("waiting for event")?;
1297 let (bytes, _handles) = event_buf.split();
1298 let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
1299 assert_eq!(header.ordinal, EVENT_ORDINAL);
1300
1301 Ok(())
1302 }
1303
1304 #[test]
1305 fn sync_client_with_racing_events() -> Result<(), Error> {
1306 let (client_end, server_end) = zx::Channel::create();
1307 let client1 = Arc::new(sync::Client::new(client_end));
1308 let client2 = client1.clone();
1309
1310 let thread1 = thread::spawn(move || {
1311 let result = client1.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1312 zx::MonotonicDuration::from_seconds(5),
1313 ));
1314 assert!(result.is_ok());
1315 });
1316
1317 let thread2 = thread::spawn(move || {
1318 let result = client2.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1319 zx::MonotonicDuration::from_seconds(5),
1320 ));
1321 assert!(result.is_ok());
1322 });
1323
1324 send_transaction(
1325 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1326 &server_end,
1327 );
1328 send_transaction(
1329 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1330 &server_end,
1331 );
1332
1333 assert!(thread1.join().is_ok());
1334 assert!(thread2.join().is_ok());
1335
1336 Ok(())
1337 }
1338
1339 #[test]
1340 fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
1341 let (client_end, server_end) = zx::Channel::create();
1342 let client = sync::Client::new(client_end);
1343 send_transaction(
1344 TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
1345 &server_end,
1346 );
1347 assert_matches!(
1348 client.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1349 zx::MonotonicDuration::from_seconds(5)
1350 )),
1351 Err(crate::Error::UnexpectedSyncResponse)
1352 );
1353 Ok(())
1354 }
1355
1356 #[test]
1357 fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
1358 let (client_end, server_end) = zx::Channel::create();
1359 let client = sync::Client::new(client_end);
1360 drop(server_end);
1361 assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
1362 Ok(())
1363 }
1364
1365 #[test]
1366 fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
1367 let (client_end, server_end) = zx::Channel::create();
1368 let client = sync::Client::new(client_end);
1369 drop(server_end);
1370 assert_matches!(
1371 client.send_query::<u8, u8, TestProtocolMarker>(
1372 SEND_DATA,
1373 SEND_ORDINAL,
1374 DynamicFlags::empty(),
1375 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1376 ),
1377 Err(crate::Error::ClientChannelClosed {
1378 status: zx_status::Status::PEER_CLOSED,
1379 protocol_name: "test_protocol",
1380 epitaph: None,
1381 })
1382 );
1383 Ok(())
1384 }
1385
1386 #[test]
1389 fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
1390 let (client_end, server_end) = zx::Channel::create();
1391 let client = sync::Client::new(client_end);
1392 server_end
1394 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1395 .expect("failed to write epitaph");
1396 assert_matches!(
1397 client.send_query::<u8, u8, TestProtocolMarker>(
1398 SEND_DATA,
1399 SEND_ORDINAL,
1400 DynamicFlags::empty(),
1401 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1402 ),
1403 Err(crate::Error::ClientChannelClosed {
1404 status: zx_status::Status::PEER_CLOSED,
1405 protocol_name: "test_protocol",
1406 epitaph: None,
1407 })
1408 );
1409 Ok(())
1410 }
1411
1412 #[test]
1413 fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
1414 let (client_end, server_end) = zx::Channel::create();
1415 let client = sync::Client::new(client_end);
1416 server_end
1418 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1419 .expect("failed to write epitaph");
1420 assert_matches!(
1421 client.wait_for_event::<TestProtocolMarker>(zx::MonotonicInstant::after(
1422 zx::MonotonicDuration::from_seconds(5)
1423 )),
1424 Err(crate::Error::ClientChannelClosed {
1425 status: zx_status::Status::UNAVAILABLE,
1426 protocol_name: "test_protocol",
1427 epitaph: Some(epitaph),
1428 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1429 );
1430 Ok(())
1431 }
1432
1433 #[test]
1434 fn sync_client_into_channel() -> Result<(), Error> {
1435 let (client_end, _server_end) = zx::Channel::create();
1436 let client_end_raw = client_end.raw_handle();
1437 let client = sync::Client::new(client_end);
1438 assert_eq!(client.into_channel().raw_handle(), client_end_raw);
1439 Ok(())
1440 }
1441
1442 #[fasync::run_singlethreaded(test)]
1443 async fn client() {
1444 let (client_end, server_end) = zx::Channel::create();
1445 let client_end = AsyncChannel::from_channel(client_end);
1446 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1447
1448 let server = AsyncChannel::from_channel(server_end);
1449 let receiver = async move {
1450 let mut buffer = MessageBufEtc::new();
1451 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1452 assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
1453 };
1454
1455 let receiver = receiver
1457 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1458 panic!("did not receive message in time!")
1459 });
1460
1461 client
1462 .send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
1463 .expect("failed to send msg");
1464
1465 receiver.await;
1466 }
1467
1468 #[fasync::run_singlethreaded(test)]
1469 async fn client_with_response() {
1470 let (client_end, server_end) = zx::Channel::create();
1471 let client_end = AsyncChannel::from_channel(client_end);
1472 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1473
1474 let server = AsyncChannel::from_channel(server_end);
1475 let mut buffer = MessageBufEtc::new();
1476 let receiver = async move {
1477 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1478 let two_way_tx_id = 1u8;
1479 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1480
1481 let (bytes, handles) = (&mut vec![], &mut vec![]);
1482 let header =
1483 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1484 encode_transaction(header, bytes, handles);
1485 server.write_etc(bytes, handles).expect("Server channel write failed");
1486 };
1487
1488 let receiver = receiver
1490 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1491 panic!("did not receiver message in time!")
1492 });
1493
1494 let sender = client
1495 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1496 .map_ok(|x| assert_eq!(x, SEND_DATA))
1497 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1498
1499 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1501 panic!("did not receive response in time!")
1502 });
1503
1504 let ((), ()) = join!(receiver, sender);
1505 }
1506
1507 #[fasync::run_singlethreaded(test)]
1508 async fn client_with_response_receives_epitaph() {
1509 let (client_end, server_end) = zx::Channel::create();
1510 let client_end = AsyncChannel::from_channel(client_end);
1511 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1512
1513 let server = AsyncChannel::from_channel(server_end);
1514 let mut buffer = zx::MessageBufEtc::new();
1515 let receiver = async move {
1516 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1517 server
1518 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1519 .expect("failed to write epitaph");
1520 };
1521 let receiver = receiver
1523 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1524 panic!("did not receive message in time!")
1525 });
1526
1527 let sender = async move {
1528 const ORDINAL: u64 = 42 << 32;
1529 let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
1530 assert_matches!(
1531 result,
1532 Err(crate::Error::ClientChannelClosed {
1533 status: zx_status::Status::UNAVAILABLE,
1534 protocol_name: "test_protocol",
1535 epitaph: Some(epitaph),
1536 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1537 );
1538 };
1539 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1541 panic!("did not receive response in time!")
1542 });
1543
1544 let ((), ()) = join!(receiver, sender);
1545 }
1546
1547 #[fasync::run_singlethreaded(test)]
1548 #[should_panic]
1549 async fn event_cant_be_taken_twice() {
1550 let (client_end, _) = zx::Channel::create();
1551 let client_end = AsyncChannel::from_channel(client_end);
1552 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1553 let _foo = client.take_event_receiver();
1554 client.take_event_receiver();
1555 }
1556
1557 #[fasync::run_singlethreaded(test)]
1558 async fn event_can_be_taken_after_drop() {
1559 let (client_end, _) = zx::Channel::create();
1560 let client_end = AsyncChannel::from_channel(client_end);
1561 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1562 let foo = client.take_event_receiver();
1563 drop(foo);
1564 client.take_event_receiver();
1565 }
1566
1567 #[fasync::run_singlethreaded(test)]
1568 async fn receiver_termination_test() {
1569 let (client_end, _) = zx::Channel::create();
1570 let client_end = AsyncChannel::from_channel(client_end);
1571 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1572 let mut foo = client.take_event_receiver();
1573 assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
1574 let _ = foo.next().await;
1575 assert!(
1576 foo.is_terminated(),
1577 "receiver should report terminated after seeing channel is closed"
1578 );
1579 }
1580
1581 #[fasync::run_singlethreaded(test)]
1582 #[should_panic(expected = "polled EventReceiver after `None`")]
1583 async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
1584 let (client_end, _) = zx::Channel::create();
1585 let client_end = AsyncChannel::from_channel(client_end);
1586 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1587 let foo = client.take_event_receiver();
1588 drop(foo);
1589 let mut bar = client.take_event_receiver();
1590 assert!(bar.next().await.is_none(), "read on closed channel should return none");
1591 let _ = bar.next().await;
1593 }
1594
1595 #[fasync::run_singlethreaded(test)]
1596 #[should_panic(expected = "polled EventReceiver after `None`")]
1597 async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
1598 let (client_end, server_end) = zx::Channel::create();
1599 let client_end = AsyncChannel::from_channel(client_end);
1600 let server_end = AsyncChannel::from_channel(server_end);
1601 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1602 let mut stream = client.take_event_receiver();
1603
1604 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1605 .expect("wrote epitaph");
1606 drop(server_end);
1607
1608 assert_matches!(
1609 stream.next().await,
1610 Some(Err(crate::Error::ClientChannelClosed {
1611 status: zx_status::Status::UNAVAILABLE,
1612 protocol_name: "test_protocol",
1613 epitaph: Some(epitaph),
1614 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1615 );
1616 assert_matches!(stream.next().await, None);
1617 let _ = stream.next().await;
1619 }
1620
1621 #[fasync::run_singlethreaded(test)]
1622 async fn event_can_be_taken() {
1623 let (client_end, _) = zx::Channel::create();
1624 let client_end = AsyncChannel::from_channel(client_end);
1625 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1626 client.take_event_receiver();
1627 }
1628
1629 #[fasync::run_singlethreaded(test)]
1630 async fn event_received() {
1631 let (client_end, server_end) = zx::Channel::create();
1632 let client_end = AsyncChannel::from_channel(client_end);
1633 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1634
1635 let server = AsyncChannel::from_channel(server_end);
1637 let (bytes, handles) = (&mut vec![], &mut vec![]);
1638 const ORDINAL: u64 = 5;
1639 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1640 encode_transaction(header, bytes, handles);
1641 server.write_etc(bytes, handles).expect("Server channel write failed");
1642 drop(server);
1643
1644 let recv = client
1645 .take_event_receiver()
1646 .into_future()
1647 .then(|(x, stream)| {
1648 let x = x.expect("should contain one element");
1649 let x = x.expect("fidl error");
1650 let x: i32 =
1651 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1652 .expect("failed to decode event");
1653 assert_eq!(x, 55);
1654 stream.into_future()
1655 })
1656 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1657
1658 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1660 panic!("did not receive event in time!")
1661 });
1662
1663 recv.await;
1664 }
1665
1666 #[fasync::run_singlethreaded(test)]
1670 async fn receiver_can_be_taken_after_end_of_stream() {
1671 let (client_end, server_end) = zx::Channel::create();
1672 let client_end = AsyncChannel::from_channel(client_end);
1673 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1674
1675 let server = AsyncChannel::from_channel(server_end);
1677 let (bytes, handles) = (&mut vec![], &mut vec![]);
1678 const ORDINAL: u64 = 5;
1679 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1680 encode_transaction(header, bytes, handles);
1681 server.write_etc(bytes, handles).expect("Server channel write failed");
1682 drop(server);
1683
1684 {
1688 let recv = client
1689 .take_event_receiver()
1690 .into_future()
1691 .then(|(x, stream)| {
1692 let x = x.expect("should contain one element");
1693 let x = x.expect("fidl error");
1694 let x: i32 =
1695 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1696 .expect("failed to decode event");
1697 assert_eq!(x, 55);
1698 stream.into_future()
1699 })
1700 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1701
1702 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1704 panic!("did not receive event in time!")
1705 });
1706
1707 recv.await;
1708 }
1709
1710 let mut c = client.take_event_receiver();
1713 assert!(
1714 c.next().await.is_none(),
1715 "receiver on closed channel should return none on first call"
1716 );
1717 }
1718
1719 #[fasync::run_singlethreaded(test)]
1720 async fn event_incompatible_format() {
1721 let (client_end, server_end) = zx::Channel::create();
1722 let client_end = AsyncChannel::from_channel(client_end);
1723 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1724
1725 let server = AsyncChannel::from_channel(server_end);
1727 let (bytes, handles) = (&mut vec![], &mut vec![]);
1728 let header = TransactionHeader::new_full(
1729 0,
1730 5,
1731 crate::encoding::Context {
1732 wire_format_version: crate::encoding::WireFormatVersion::V2,
1733 },
1734 DynamicFlags::empty(),
1735 0,
1736 );
1737 encode_transaction(header, bytes, handles);
1738 server.write_etc(bytes, handles).expect("Server channel write failed");
1739 drop(server);
1740
1741 let mut event_receiver = client.take_event_receiver();
1742 let recv = event_receiver.next().map(|event| {
1743 assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
1744 });
1745
1746 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1748 panic!("did not receive event in time!")
1749 });
1750
1751 recv.await;
1752 }
1753
1754 #[test]
1755 fn client_always_wakes_pending_futures() {
1756 let mut executor = fasync::TestExecutor::new();
1757
1758 let (client_end, server_end) = zx::Channel::create();
1759 let client_end = AsyncChannel::from_channel(client_end);
1760 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1761
1762 let mut event_receiver = client.take_event_receiver();
1763
1764 let (response_waker, response_waker_count) = new_count_waker();
1766 let response_cx = &mut Context::from_waker(&response_waker);
1767 let mut response_txid = Txid(0);
1768 let mut response_future = client
1769 .send_raw_query(|tx_id, bytes, handles| {
1770 response_txid = tx_id;
1771 let header = TransactionHeader::new(
1772 response_txid.as_raw_id(),
1773 SEND_ORDINAL,
1774 DynamicFlags::empty(),
1775 );
1776 encode_transaction(header, bytes, handles);
1777 Ok(())
1778 })
1779 .expect("Couldn't send query");
1780 assert!(response_future.poll_unpin(response_cx).is_pending());
1781
1782 let (event_waker, event_waker_count) = new_count_waker();
1784 let event_cx = &mut Context::from_waker(&event_waker);
1785 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1786
1787 assert_eq!(response_waker_count.get(), 0);
1789 assert_eq!(event_waker_count.get(), 0);
1790
1791 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1793
1794 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1796
1797 assert_eq!(response_waker_count.get(), 0);
1799 assert_eq!(event_waker_count.get(), 1);
1800
1801 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1803
1804 send_transaction(
1806 TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
1807 &server_end,
1808 );
1809
1810 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1812
1813 assert_eq!(response_waker_count.get(), 1);
1815 }
1816
1817 #[test]
1818 fn client_always_wakes_pending_futures_on_epitaph() {
1819 let mut executor = fasync::TestExecutor::new();
1820
1821 let (client_end, server_end) = zx::Channel::create();
1822 let client_end = AsyncChannel::from_channel(client_end);
1823 let server_end = AsyncChannel::from_channel(server_end);
1824 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1825
1826 let mut event_receiver = client.take_event_receiver();
1827
1828 let (response1_waker, response1_waker_count) = new_count_waker();
1830 let response1_cx = &mut Context::from_waker(&response1_waker);
1831 let mut response1_future = client
1832 .send_raw_query(|tx_id, bytes, handles| {
1833 let header =
1834 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1835 encode_transaction(header, bytes, handles);
1836 Ok(())
1837 })
1838 .expect("Couldn't send query");
1839 assert!(response1_future.poll_unpin(response1_cx).is_pending());
1840
1841 let (event_waker, event_waker_count) = new_count_waker();
1843 let event_cx = &mut Context::from_waker(&event_waker);
1844 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1845
1846 let (response2_waker, response2_waker_count) = new_count_waker();
1848 let response2_cx = &mut Context::from_waker(&response2_waker);
1849 let mut response2_future = client
1850 .send_raw_query(|tx_id, bytes, handles| {
1851 let header =
1852 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1853 encode_transaction(header, bytes, handles);
1854 Ok(())
1855 })
1856 .expect("Couldn't send query");
1857 assert!(response2_future.poll_unpin(response2_cx).is_pending());
1858
1859 let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
1860
1861 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1863
1864 assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
1866
1867 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1869 .expect("wrote epitaph");
1870
1871 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1873
1874 for wake_count in &wakers {
1877 assert_eq!(wake_count.get(), 1);
1878 }
1879
1880 assert_matches!(
1882 response1_future.poll_unpin(response1_cx),
1883 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1884 status: zx_status::Status::UNAVAILABLE,
1885 protocol_name: "test_protocol",
1886 epitaph: Some(epitaph),
1887 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1888 );
1889
1890 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1892
1893 assert_matches!(
1895 response2_future.poll_unpin(response2_cx),
1896 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1897 status: zx_status::Status::UNAVAILABLE,
1898 protocol_name: "test_protocol",
1899 epitaph: Some(epitaph),
1900 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1901 );
1902
1903 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1905 }
1906
1907 #[fasync::run_singlethreaded(test)]
1908 async fn client_allows_take_event_stream_even_if_event_delivered() {
1909 let (client_end, server_end) = zx::Channel::create();
1910 let client_end = AsyncChannel::from_channel(client_end);
1911 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1912
1913 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1915
1916 let (response_waker, _response_waker_count) = new_count_waker();
1918 let response_cx = &mut Context::from_waker(&response_waker);
1919 let mut response_future =
1920 client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
1921 assert!(response_future.poll_unpin(response_cx).is_pending());
1922
1923 let mut _event_receiver = client.take_event_receiver();
1925 }
1926
1927 #[fasync::run_singlethreaded(test)]
1928 async fn client_reports_epitaph_from_all_read_actions() {
1929 #[derive(Debug, PartialEq)]
1930 enum Action {
1931 SendMsg, SendQuery, WaitQuery, RecvEvent, }
1936 impl Action {
1937 fn should_report_epitaph(&self) -> bool {
1938 match self {
1939 Action::SendMsg | Action::SendQuery => false,
1940 Action::WaitQuery | Action::RecvEvent => true,
1941 }
1942 }
1943 }
1944 use Action::*;
1945 for two_actions in &[
1948 [SendMsg, SendMsg],
1949 [SendMsg, SendQuery],
1950 [SendMsg, WaitQuery],
1951 [SendMsg, RecvEvent],
1952 [SendQuery, SendMsg],
1953 [SendQuery, SendQuery],
1954 [SendQuery, WaitQuery],
1955 [SendQuery, RecvEvent],
1956 [WaitQuery, SendMsg],
1957 [WaitQuery, SendQuery],
1958 [WaitQuery, WaitQuery],
1959 [WaitQuery, RecvEvent],
1960 [RecvEvent, SendMsg],
1961 [RecvEvent, SendQuery],
1962 [RecvEvent, WaitQuery],
1963 ] {
1966 let (client_end, server_end) = zx::Channel::create();
1967 let client_end = AsyncChannel::from_channel(client_end);
1968 let client = Client::new(client_end, "test_protocol");
1969
1970 let server_end = AsyncChannel::from_channel(server_end);
1972 server_end
1973 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1974 .expect("failed to write epitaph");
1975
1976 let mut event_receiver = client.take_event_receiver();
1977
1978 for (index, action) in two_actions.iter().enumerate() {
1980 let err = match action {
1981 SendMsg => {
1982 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
1983 }
1984 WaitQuery => client
1985 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1986 .await
1987 .err(),
1988 SendQuery => client
1989 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1990 .check()
1991 .err(),
1992 RecvEvent => event_receiver.next().await.unwrap().err(),
1993 };
1994 let details = format!("index: {index:?}, two_actions: {two_actions:?}");
1995 match err {
1996 None => assert!(
1997 !action.should_report_epitaph(),
1998 "expected epitaph, but succeeded.\n{details}"
1999 ),
2000 Some(crate::Error::ClientChannelClosed {
2001 status: zx_status::Status::UNAVAILABLE,
2002 protocol_name: "test_protocol",
2003 epitaph: Some(epitaph),
2004 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
2005 action.should_report_epitaph(),
2006 "got epitaph unexpectedly.\n{details}",
2007 ),
2008 Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
2009 }
2010 }
2011
2012 if two_actions.contains(&RecvEvent) {
2014 assert_matches!(event_receiver.next().await, None);
2015 }
2016 }
2017 }
2018
2019 #[test]
2020 fn client_query_result_check() {
2021 let mut executor = fasync::TestExecutor::new();
2022 let (client_end, server_end) = zx::Channel::create();
2023 let client_end = AsyncChannel::from_channel(client_end);
2024 let client = Client::new(client_end, "test_protocol");
2025
2026 let server = AsyncChannel::from_channel(server_end);
2027
2028 let active_fut =
2030 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2031
2032 let mut checked_fut = active_fut.check().expect("failed to check future");
2033
2034 let mut buffer = MessageBufEtc::new();
2036 executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
2037 let two_way_tx_id = 1u8;
2038 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2039
2040 let (bytes, handles) = (&mut vec![], &mut vec![]);
2041 let header =
2042 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2043 encode_transaction(header, bytes, handles);
2044 server.write_etc(bytes, handles).expect("Server channel write failed");
2045
2046 executor
2047 .run_singlethreaded(&mut checked_fut)
2048 .map(|x| assert_eq!(x, SEND_DATA))
2049 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2050
2051 drop(server);
2053
2054 let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2055
2056 let mut checked_fut = query_fut.check().expect("failed to check future");
2058 assert_matches!(
2060 executor.run_singlethreaded(&mut checked_fut),
2061 Err(crate::Error::ClientChannelClosed {
2062 status: zx_status::Status::PEER_CLOSED,
2063 protocol_name: "test_protocol",
2064 epitaph: None,
2065 })
2066 );
2067 }
2068
2069 #[fasync::run_singlethreaded(test)]
2070 async fn client_into_channel() {
2071 let (client_end, _server_end) = zx::Channel::create();
2074 let client_end = AsyncChannel::from_channel(client_end);
2075 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2076
2077 assert!(client.into_channel().is_ok());
2078 }
2079
2080 #[fasync::run_singlethreaded(test)]
2081 async fn client_into_channel_outstanding_messages() {
2082 let (client_end, _server_end) = zx::Channel::create();
2085 let client_end = AsyncChannel::from_channel(client_end);
2086 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2087
2088 {
2089 let _sender =
2092 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2093 }
2094
2095 assert!(client.into_channel().is_err());
2096 }
2097
2098 #[fasync::run_singlethreaded(test)]
2099 async fn client_into_channel_active_clone() {
2100 let (client_end, _server_end) = zx::Channel::create();
2103 let client_end = AsyncChannel::from_channel(client_end);
2104 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2105
2106 let _cloned_client = client.clone();
2107
2108 assert!(client.into_channel().is_err());
2109 }
2110
2111 #[fasync::run_singlethreaded(test)]
2112 async fn client_into_channel_outstanding_messages_get_received() {
2113 let (client_end, server_end) = zx::Channel::create();
2114 let client_end = AsyncChannel::from_channel(client_end);
2115 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2116
2117 let server = AsyncChannel::from_channel(server_end);
2118 let mut buffer = MessageBufEtc::new();
2119 let receiver = async move {
2120 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2121 let two_way_tx_id = 1u8;
2122 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2123
2124 let (bytes, handles) = (&mut vec![], &mut vec![]);
2125 let header =
2126 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2127 encode_transaction(header, bytes, handles);
2128 server.write_etc(bytes, handles).expect("Server channel write failed");
2129 };
2130
2131 let receiver = receiver
2133 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2134 panic!("did not receiver message in time!")
2135 });
2136
2137 let sender = client
2138 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2139 .map_ok(|x| assert_eq!(x, SEND_DATA))
2140 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2141
2142 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2144 panic!("did not receive response in time!")
2145 });
2146
2147 let ((), ()) = join!(receiver, sender);
2148
2149 assert!(client.into_channel().is_ok());
2150 }
2151
2152 #[fasync::run_singlethreaded(test)]
2153 async fn client_decode_errors_are_broadcast() {
2154 let (client_end, server_end) = zx::Channel::create();
2155 let client_end = AsyncChannel::from_channel(client_end);
2156 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2157
2158 let server = AsyncChannel::from_channel(server_end);
2159
2160 let _server = fasync::Task::spawn(async move {
2161 let mut buffer = MessageBufEtc::new();
2162 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2163 let two_way_tx_id = 1u8;
2164 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2165
2166 let (bytes, handles) = (&mut vec![], &mut vec![]);
2167 let header =
2168 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2169 encode_transaction(header, bytes, handles);
2170 bytes[4] = 0;
2172 server.write_etc(bytes, handles).expect("Server channel write failed");
2173
2174 pending::<()>().await;
2176 });
2177
2178 let futures = FuturesUnordered::new();
2179
2180 for _ in 0..4 {
2181 futures.push(async {
2182 assert_matches!(
2183 client
2184 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2185 .map_ok(|x| assert_eq!(x, SEND_DATA))
2186 .await,
2187 Err(crate::Error::UnsupportedWireFormatVersion)
2188 );
2189 });
2190 }
2191
2192 futures
2193 .collect::<Vec<_>>()
2194 .on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
2195 .await;
2196 }
2197
2198 #[fasync::run_singlethreaded(test)]
2199 async fn into_channel_from_waker_succeeds() {
2200 let (client_end, server_end) = zx::Channel::create();
2201 let client_end = AsyncChannel::from_channel(client_end);
2202 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2203
2204 let server = AsyncChannel::from_channel(server_end);
2205 let mut buffer = MessageBufEtc::new();
2206 let receiver = async move {
2207 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2208 let two_way_tx_id = 1u8;
2209 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2210
2211 let (bytes, handles) = (&mut vec![], &mut vec![]);
2212 let header =
2213 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2214 encode_transaction(header, bytes, handles);
2215 server.write_etc(bytes, handles).expect("Server channel write failed");
2216 };
2217
2218 struct Sender {
2219 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
2220 }
2221
2222 let (done_tx, done_rx) = oneshot::channel();
2223
2224 let sender = Arc::new(Sender {
2225 future: Mutex::new(Box::pin(async move {
2226 client
2227 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2228 .map_ok(|x| assert_eq!(x, SEND_DATA))
2229 .unwrap_or_else(|e| panic!("fidl error: {e:?}"))
2230 .await;
2231
2232 assert!(client.into_channel().is_ok());
2233
2234 let _ = done_tx.send(());
2235 })),
2236 });
2237
2238 impl Wake for Sender {
2243 fn wake(self: Arc<Self>) {
2244 self.wake_by_ref();
2245 }
2246 fn wake_by_ref(self: &Arc<Self>) {
2247 assert!(
2248 self.future
2249 .lock()
2250 .poll_unpin(&mut Context::from_waker(Waker::noop()))
2251 .is_ready()
2252 );
2253 }
2254 }
2255
2256 let waker = Waker::from(sender.clone());
2257
2258 assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
2259
2260 receiver.await;
2261
2262 done_rx.await.unwrap();
2263 }
2264}