1use crate::Error;
8use crate::encoding::{
9 Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags, Encode, Encoder, EpitaphBody,
10 MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect, TransactionHeader,
11 TransactionMessage, TransactionMessageType, TypeMarker, decode_transaction_header,
12};
13use fuchsia_sync::Mutex;
14use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
15use futures::ready;
16use futures::stream::{FusedStream, Stream};
17use futures::task::{Context, Poll, Waker};
18use slab::Slab;
19use std::collections::VecDeque;
20use std::mem;
21use std::ops::ControlFlow;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{RawWaker, RawWakerVTable};
25use zx_status;
26
27#[doc(hidden)] pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
30 mut buf: D::MessageBufEtc,
31) -> Result<T::Owned, Error>
32where
33 T::Owned: Decode<T, D>,
34{
35 let (bytes, handles) = buf.split_mut();
36 let (header, body_bytes) = decode_transaction_header(bytes)?;
37 if header.ordinal != EXPECTED_ORDINAL {
38 return Err(Error::InvalidResponseOrdinal);
39 }
40 let mut output = Decode::<T, D>::new_empty();
41 Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
42 Ok(output)
43}
44
45#[derive(Debug, Clone)]
47pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
48 inner: Arc<ClientInner<D>>,
49}
50
51pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
53 MessageResponse<D>,
54 fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
55>;
56
57#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
62 pub MaybeDone<DecodedQueryResponseFut<T, D>>,
63);
64
65impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
66 fn is_terminated(&self) -> bool {
67 matches!(self.0, MaybeDone::Gone)
68 }
69}
70
71impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
72 type Output = Result<T, Error>;
73
74 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 ready!(self.0.poll_unpin(cx));
76 let maybe_done = Pin::new(&mut self.0);
77 Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
78 }
79}
80
81impl<T> QueryResponseFut<T> {
82 pub fn check(self) -> Result<Self, Error> {
89 match self.0 {
90 MaybeDone::Done(Err(e)) => Err(e),
91 x => Ok(QueryResponseFut(x)),
92 }
93 }
94}
95
96const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
97const TXID_GENERATION_SHIFT: usize = 24;
98const TXID_GENERATION_MASK: u8 = 0x7F;
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct Txid(u32);
103#[derive(Debug, Copy, Clone, PartialEq, Eq)]
105struct InterestId(usize);
106
107impl InterestId {
108 fn from_txid(txid: Txid) -> Self {
109 InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
110 }
111}
112
113impl Txid {
114 fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
115 let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
118 let generation = (generation & TXID_GENERATION_MASK) as u32;
120
121 let txid = (generation << TXID_GENERATION_SHIFT) | id;
126
127 Txid(txid)
128 }
129
130 pub fn as_raw_id(&self) -> u32 {
132 self.0
133 }
134}
135
136impl From<u32> for Txid {
137 fn from(txid: u32) -> Self {
138 Self(txid)
139 }
140}
141
142impl<D: ResourceDialect> Client<D> {
143 pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
148 Client {
149 inner: Arc::new(ClientInner {
150 channel: channel.boxed(),
151 interests: Mutex::default(),
152 terminal_error: Mutex::default(),
153 protocol_name,
154 }),
155 }
156 }
157
158 pub fn as_channel(&self) -> &D::ProxyChannel {
160 self.inner.channel.as_channel()
161 }
162
163 pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
170 match Arc::try_unwrap(self.inner) {
180 Ok(inner) => {
181 if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
182 Ok(inner.channel.unbox())
183 } else {
184 Err(Self { inner: Arc::new(inner) })
189 }
190 }
191 Err(inner) => Err(Self { inner }),
192 }
193 }
194
195 pub fn take_event_receiver(&self) -> EventReceiver<D> {
198 {
199 let mut lock = self.inner.interests.lock();
200
201 if let EventListener::None = lock.event_listener {
202 lock.event_listener = EventListener::WillPoll;
203 } else {
204 panic!("Event stream was already taken");
205 }
206 }
207
208 EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
209 }
210
211 pub fn send<T: TypeMarker>(
213 &self,
214 body: impl Encode<T, D>,
215 ordinal: u64,
216 dynamic_flags: DynamicFlags,
217 ) -> Result<(), Error> {
218 let msg =
219 TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
220 crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
221 msg,
222 |bytes, handles| self.send_raw(bytes, handles),
223 )
224 }
225
226 pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
228 &self,
229 body: impl Encode<Request, D>,
230 dynamic_flags: DynamicFlags,
231 ) -> QueryResponseFut<Response::Owned, D>
232 where
233 Response::Owned: Decode<Response, D>,
234 {
235 self.send_query_and_decode::<Request, Response::Owned>(
236 body,
237 ORDINAL,
238 dynamic_flags,
239 |buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
240 )
241 }
242
243 pub fn send_query_and_decode<Request: TypeMarker, Output>(
246 &self,
247 body: impl Encode<Request, D>,
248 ordinal: u64,
249 dynamic_flags: DynamicFlags,
250 decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
251 ) -> QueryResponseFut<Output, D> {
252 let send_result = self.send_raw_query(|tx_id, bytes, handles| {
253 let msg = TransactionMessage {
254 header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
255 body,
256 };
257 Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
258 Ok(())
259 });
260
261 QueryResponseFut(match send_result {
262 Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
263 Err(e) => MaybeDone::Done(Err(e)),
264 })
265 }
266
267 pub fn send_raw(
269 &self,
270 bytes: &[u8],
271 handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
272 ) -> Result<(), Error> {
273 match self.inner.channel.write_etc(bytes, handles) {
274 Ok(()) | Err(None) => Ok(()),
275 Err(Some(e)) => Err(Error::ClientWrite(e.into())),
276 }
277 }
278
279 pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
281 where
282 F: for<'a, 'b> FnOnce(
283 Txid,
284 &'a mut Vec<u8>,
285 &'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
286 ) -> Result<(), Error>,
287 {
288 let id = self.inner.interests.lock().register_msg_interest();
289 crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
290 encode_msg(id, bytes, handles)?;
291 self.send_raw(bytes, handles)
292 })?;
293
294 Ok(MessageResponse { id, client: Some(self.inner.clone()) })
295 }
296}
297
298#[must_use]
299#[derive(Debug)]
301pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
302 id: Txid,
303 client: Option<Arc<ClientInner<D>>>,
305}
306
307impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
308
309impl<D: ResourceDialect> Future for MessageResponse<D> {
310 type Output = Result<D::MessageBufEtc, Error>;
311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 let this = &mut *self;
313 let res;
314 {
315 let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
316 res = client.poll_recv_msg_response(this.id, cx);
317 }
318
319 if let Poll::Ready(Ok(_)) = res {
321 this.client.take().expect("MessageResponse polled after completion");
322 }
323
324 res
325 }
326}
327
328impl<D: ResourceDialect> Drop for MessageResponse<D> {
329 fn drop(&mut self) {
330 if let Some(client) = &self.client {
331 client.interests.lock().deregister(self.id);
332 }
333 }
334}
335
336#[derive(Debug)]
339enum MessageInterest<D: ResourceDialect> {
340 WillPoll,
342 Waiting(Waker),
344 Received(D::MessageBufEtc),
346 Discard,
349}
350
351impl<D: ResourceDialect> MessageInterest<D> {
352 fn is_received(&self) -> bool {
354 matches!(*self, MessageInterest::Received(_))
355 }
356
357 fn unwrap_received(self) -> D::MessageBufEtc {
358 if let MessageInterest::Received(buf) = self {
359 buf
360 } else {
361 panic!("EXPECTED received message")
362 }
363 }
364}
365
366#[derive(Debug)]
367enum EventReceiverState {
368 Active,
369 Terminal,
370 Terminated,
371}
372
373#[derive(Debug)]
375pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
376 inner: Arc<ClientInner<D>>,
377 state: EventReceiverState,
378}
379
380impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
381
382impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
383 fn is_terminated(&self) -> bool {
384 matches!(self.state, EventReceiverState::Terminated)
385 }
386}
387
388impl<D: ResourceDialect> Stream for EventReceiver<D> {
393 type Item = Result<D::MessageBufEtc, Error>;
394
395 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
396 match self.state {
397 EventReceiverState::Active => {}
398 EventReceiverState::Terminated => {
399 panic!("polled EventReceiver after `None`");
400 }
401 EventReceiverState::Terminal => {
402 self.state = EventReceiverState::Terminated;
403 return Poll::Ready(None);
404 }
405 }
406
407 Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
408 Ok(x) => Some(Ok(x)),
409 Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
410 self.state = EventReceiverState::Terminated;
413 None
414 }
415 err @ Err(_) => {
416 self.state = EventReceiverState::Terminal;
419 Some(err)
420 }
421 })
422 }
423}
424
425impl<D: ResourceDialect> Drop for EventReceiver<D> {
426 fn drop(&mut self) {
427 self.inner.interests.lock().dropped_event_listener();
428 }
429}
430
431#[derive(Debug, Default)]
432enum EventListener {
433 #[default]
435 None,
436 WillPoll,
438 Some(Waker),
440}
441
442impl EventListener {
443 fn is_some(&self) -> bool {
444 matches!(self, EventListener::Some(_))
445 }
446}
447
448#[derive(Debug)]
450struct ClientInner<D: ResourceDialect> {
451 channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
453
454 interests: Mutex<Interests<D>>,
456
457 terminal_error: Mutex<Option<Error>>,
460
461 protocol_name: &'static str,
463}
464
465#[derive(Debug)]
466struct Interests<D: ResourceDialect> {
467 messages: Slab<MessageInterest<D>>,
468 events: VecDeque<D::MessageBufEtc>,
469 event_listener: EventListener,
470 waker_count: usize,
472 generation: u8,
478}
479
480impl<D: ResourceDialect> Default for Interests<D> {
481 fn default() -> Self {
482 Interests {
483 messages: Slab::new(),
484 events: Default::default(),
485 event_listener: Default::default(),
486 waker_count: 0,
487 generation: 0,
488 }
489 }
490}
491
492impl<D: ResourceDialect> Interests<D> {
493 fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
495 self.events.push_back(buf);
496 self.take_event_waker()
497 }
498
499 fn take_event_waker(&mut self) -> Option<Waker> {
501 if self.event_listener.is_some() {
502 let EventListener::Some(waker) =
503 mem::replace(&mut self.event_listener, EventListener::WillPoll)
504 else {
505 unreachable!()
506 };
507
508 self.waker_count -= 1;
510 Some(waker)
511 } else {
512 None
513 }
514 }
515
516 fn event_waker(&self) -> Option<&Waker> {
518 match &self.event_listener {
519 EventListener::Some(waker) => Some(waker),
520 _ => None,
521 }
522 }
523
524 fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
527 let InterestId(raw_id) = InterestId::from_txid(txid);
528 let Some(interest) = self.messages.get_mut(raw_id) else {
531 return Err(Error::InvalidResponseTxid);
533 };
534
535 let mut waker = None;
536 if let MessageInterest::Discard = interest {
537 self.messages.remove(raw_id);
538 } else if let MessageInterest::Waiting(w) =
539 mem::replace(interest, MessageInterest::Received(buf))
540 {
541 waker = Some(w);
542
543 self.waker_count -= 1;
545 }
546
547 Ok(waker)
548 }
549
550 fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
553 let InterestId(raw_id) = InterestId::from_txid(txid);
554 let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
555 match interest {
556 MessageInterest::Received(_) => {
557 return Some(self.messages.remove(raw_id).unwrap_received());
558 }
559 MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
560 MessageInterest::WillPoll => self.waker_count += 1,
561 MessageInterest::Waiting(_) => {}
562 }
563 *interest = MessageInterest::Waiting(cx.waker().clone());
564 None
565 }
566
567 fn deregister(&mut self, txid: Txid) {
569 let InterestId(raw_id) = InterestId::from_txid(txid);
570 match self.messages[raw_id] {
571 MessageInterest::Received(_) => {
572 self.messages.remove(raw_id);
573 return;
574 }
575 MessageInterest::WillPoll => {}
576 MessageInterest::Waiting(_) => self.waker_count -= 1,
577 MessageInterest::Discard => unreachable!(),
578 }
579 self.messages[raw_id] = MessageInterest::Discard;
580 }
581
582 fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
584 self.events.pop_front().or_else(|| {
585 if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
586 .is_some()
587 {
588 self.waker_count += 1;
589 }
590 None
591 })
592 }
593
594 fn dropped_event_listener(&mut self) {
596 if self.event_listener.is_some() {
597 self.waker_count -= 1;
599 }
600 self.event_listener = EventListener::None;
601 }
602
603 fn register_msg_interest(&mut self) -> Txid {
608 self.generation = self.generation.wrapping_add(1);
609 Txid::from_interest_id(
612 InterestId(self.messages.insert(MessageInterest::WillPoll)),
613 self.generation,
614 )
615 }
616}
617
618impl<D: ResourceDialect> ClientInner<D> {
619 fn poll_recv_event(
620 self: &Arc<Self>,
621 cx: &Context<'_>,
622 ) -> Poll<Result<D::MessageBufEtc, Error>> {
623 if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
625 return Poll::Ready(Ok(msg_buf));
626 }
627
628 let maybe_terminal_error = self.recv_all(Some(Txid(0)));
631
632 let mut lock = self.interests.lock();
633
634 if let Some(msg_buf) = lock.events.pop_front() {
635 Poll::Ready(Ok(msg_buf))
636 } else {
637 maybe_terminal_error?;
638 Poll::Pending
639 }
640 }
641
642 fn poll_recv_msg_response(
645 self: &Arc<Self>,
646 txid: Txid,
647 cx: &Context<'_>,
648 ) -> Poll<Result<D::MessageBufEtc, Error>> {
649 if let Some(buf) = self.interests.lock().register(txid, cx) {
651 return Poll::Ready(Ok(buf));
652 }
653
654 let maybe_terminal_error = self.recv_all(Some(txid));
657
658 let InterestId(raw_id) = InterestId::from_txid(txid);
659 let mut interests = self.interests.lock();
660 if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
661 let buf = interests.messages.remove(raw_id).unwrap_received();
664 Poll::Ready(Ok(buf))
665 } else {
666 maybe_terminal_error?;
667 Poll::Pending
668 }
669 }
670
671 fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
681 let mut terminal_error = self.terminal_error.lock();
685 if let Some(error) = terminal_error.as_ref() {
686 return Err(error.clone());
687 }
688
689 let recv_once = |waker| {
690 let cx = &mut Context::from_waker(&waker);
691
692 let mut buf = D::MessageBufEtc::new();
693 let result = self.channel.recv_etc_from(cx, &mut buf);
694 match result {
695 Poll::Ready(Ok(())) => {}
696 Poll::Ready(Err(None)) => {
697 return Err(Error::ClientChannelClosed {
700 status: zx_status::Status::PEER_CLOSED,
701 protocol_name: self.protocol_name,
702 epitaph: None,
703 #[cfg(not(target_os = "fuchsia"))]
704 reason: self.channel.closed_reason(),
705 });
706 }
707 Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
708 Poll::Pending => return Ok(ControlFlow::Break(())),
709 };
710
711 let (bytes, _) = buf.split_mut();
712 let (header, body_bytes) = decode_transaction_header(bytes)?;
713 if header.is_epitaph() {
714 let handles = &mut [];
716 let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
717 Decoder::<D>::decode_into::<EpitaphBody>(
718 &header,
719 body_bytes,
720 handles,
721 &mut epitaph_body,
722 )?;
723 return Err(Error::ClientChannelClosed {
724 status: epitaph_body.error,
725 protocol_name: self.protocol_name,
726 epitaph: Some(epitaph_body.error.into_raw() as u32),
727 #[cfg(not(target_os = "fuchsia"))]
728 reason: self.channel.closed_reason(),
729 });
730 }
731
732 let txid = Txid(header.tx_id);
733
734 let waker = {
735 buf.shrink_bytes_to_fit();
736 let mut interests = self.interests.lock();
737 if txid == Txid(0) {
738 interests.push_event(buf)
739 } else {
740 interests.push_message(txid, buf)?
741 }
742 };
743
744 if want_txid != Some(txid)
746 && let Some(waker) = waker
747 {
748 waker.wake();
749 }
750
751 Ok(ControlFlow::Continue(()))
752 };
753
754 loop {
755 let waker = {
756 let interests = self.interests.lock();
757 if interests.waker_count == 0 {
758 return Ok(());
759 } else if interests.waker_count == 1 {
760 if let Some(waker) = interests.event_waker() {
766 waker.clone()
767 } else {
768 interests
769 .messages
770 .iter()
771 .find_map(|(_, interest)| {
772 if let MessageInterest::Waiting(waker) = interest {
773 Some(waker.clone())
774 } else {
775 None
776 }
777 })
778 .unwrap()
779 }
780 } else {
781 let weak = Arc::downgrade(self);
782 let waker = ClientWaker(Arc::new(move || {
783 if let Some(strong) = weak.upgrade() {
784 #[cfg(target_os = "fuchsia")]
787 if strong.recv_all(None).is_ok() {
788 return;
789 }
790
791 strong.wake_all();
792 }
793 }));
794 unsafe {
799 Waker::from_raw(RawWaker::new(
800 Arc::into_raw(Arc::new(waker)) as *const (),
801 &WAKER_VTABLE,
802 ))
803 }
804 }
805 };
806
807 match recv_once(waker) {
808 Ok(ControlFlow::Continue(())) => {}
809 Ok(ControlFlow::Break(())) => return Ok(()),
810 Err(error) => {
811 self.wake_all();
813 return Err(terminal_error.insert(error).clone());
814 }
815 }
816 }
817 }
818
819 fn wake_all(&self) {
821 let mut lock = self.interests.lock();
822 for (_, interest) in &mut lock.messages {
823 if let MessageInterest::Waiting(_) = interest {
824 let MessageInterest::Waiting(waker) =
825 mem::replace(interest, MessageInterest::WillPoll)
826 else {
827 unreachable!()
828 };
829 waker.wake();
830 }
831 }
832 if let Some(waker) = lock.take_event_waker() {
833 waker.wake();
834 }
835 lock.waker_count = 0;
836 }
837}
838
839#[derive(Clone)]
840struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
841
842static WAKER_VTABLE: RawWakerVTable =
843 RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
844
845unsafe fn clone_waker(data: *const ()) -> RawWaker {
846 unsafe { Arc::increment_strong_count(data as *const ClientWaker) };
847 RawWaker::new(data, &WAKER_VTABLE)
848}
849
850unsafe fn wake(data: *const ()) {
851 unsafe { Arc::from_raw(data as *const ClientWaker) }.0();
852}
853
854unsafe fn wake_by_ref(data: *const ()) {
855 mem::ManuallyDrop::new(unsafe { Arc::from_raw(data as *const ClientWaker) }).0();
856}
857
858unsafe fn drop_waker(data: *const ()) {
859 unsafe { Arc::from_raw(data as *const ClientWaker) };
860}
861
862#[cfg(target_os = "fuchsia")]
863pub mod sync {
864 use super::*;
867 use std::mem::MaybeUninit;
868 use zx::{self as zx, AsHandleRef, MessageBufEtc};
869
870 #[derive(Debug)]
872 pub struct Client {
873 channel: zx::Channel,
875
876 protocol_name: &'static str,
878 }
879
880 impl Client {
881 pub fn new(channel: zx::Channel, protocol_name: &'static str) -> Self {
883 Client { channel, protocol_name }
884 }
885
886 pub fn as_channel(&self) -> &zx::Channel {
888 &self.channel
889 }
890
891 pub fn into_channel(self) -> zx::Channel {
893 self.channel
894 }
895
896 pub fn send<T: TypeMarker>(
898 &self,
899 body: impl Encode<T, DefaultFuchsiaResourceDialect>,
900 ordinal: u64,
901 dynamic_flags: DynamicFlags,
902 ) -> Result<(), Error> {
903 let mut write_bytes = Vec::new();
904 let mut write_handles = Vec::new();
905 let msg = TransactionMessage {
906 header: TransactionHeader::new(0, ordinal, dynamic_flags),
907 body,
908 };
909 Encoder::encode::<TransactionMessageType<T>>(
910 &mut write_bytes,
911 &mut write_handles,
912 msg,
913 )?;
914 match self.channel.write_etc(&write_bytes, &mut write_handles) {
915 Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
916 Err(e) => Err(Error::ClientWrite(e.into())),
917 }
918 }
919
920 pub fn send_query<Request: TypeMarker, Response: TypeMarker>(
922 &self,
923 body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
924 ordinal: u64,
925 dynamic_flags: DynamicFlags,
926 deadline: zx::MonotonicInstant,
927 ) -> Result<Response::Owned, Error>
928 where
929 Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
930 {
931 let mut write_bytes = Vec::new();
932 let mut write_handles = Vec::new();
933
934 let msg = TransactionMessage {
935 header: TransactionHeader::new(0, ordinal, dynamic_flags),
936 body,
937 };
938 Encoder::encode::<TransactionMessageType<Request>>(
939 &mut write_bytes,
940 &mut write_handles,
941 msg,
942 )?;
943
944 let mut bytes_out =
947 Vec::<MaybeUninit<u8>>::with_capacity(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
948 unsafe { bytes_out.set_len(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize) };
951
952 let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
956 zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
957
958 let (bytes_out, handles_out) = self
961 .channel
962 .call_etc_uninit(
963 deadline,
964 &write_bytes,
965 &mut write_handles,
966 bytes_out.as_mut_slice(),
967 handles_out,
968 )
969 .map_err(|e| self.wrap_error(Error::ClientCall, e))?;
970
971 let (header, body_bytes) = decode_transaction_header(bytes_out)?;
972 if header.ordinal != ordinal {
973 return Err(Error::InvalidResponseOrdinal);
974 }
975 let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
976 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
977 &header,
978 body_bytes,
979 handles_out,
980 &mut output,
981 )?;
982 Ok(output)
983 }
984
985 pub fn wait_for_event(
987 &self,
988 deadline: zx::MonotonicInstant,
989 ) -> Result<MessageBufEtc, Error> {
990 let mut buf = zx::MessageBufEtc::new();
991 buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
992 buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
993
994 loop {
995 self.channel
996 .wait_handle(
997 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
998 deadline,
999 )
1000 .map_err(|e| self.wrap_error(Error::ClientEvent, e))?;
1001 match self.channel.read_etc(&mut buf) {
1002 Ok(()) => {
1003 let (header, body_bytes) = decode_transaction_header(buf.bytes())
1006 .map_err(|_| Error::InvalidHeader)?;
1007 if header.is_epitaph() {
1008 let handles = &mut [];
1011 let mut epitaph_body =
1012 Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
1013 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
1014 &header,
1015 body_bytes,
1016 handles,
1017 &mut epitaph_body,
1018 )?;
1019 return Err(Error::ClientChannelClosed {
1020 status: epitaph_body.error,
1021 protocol_name: self.protocol_name,
1022 epitaph: Some(epitaph_body.error.into_raw() as u32),
1023 });
1024 }
1025 if header.tx_id != 0 {
1026 return Err(Error::UnexpectedSyncResponse);
1027 }
1028 return Ok(buf);
1029 }
1030 Err(zx::Status::SHOULD_WAIT) => {
1031 continue;
1033 }
1034 Err(e) => {
1035 return Err(self.wrap_error(|x| Error::ClientRead(x.into()), e));
1036 }
1037 }
1038 }
1039 }
1040
1041 fn wrap_error<T: Fn(zx_status::Status) -> Error>(
1045 &self,
1046 variant: T,
1047 err: zx_status::Status,
1048 ) -> Error {
1049 if err == zx_status::Status::PEER_CLOSED {
1050 Error::ClientChannelClosed {
1051 status: zx_status::Status::PEER_CLOSED,
1052 protocol_name: self.protocol_name,
1053 epitaph: None,
1054 }
1055 } else {
1056 variant(err)
1057 }
1058 }
1059 }
1060}
1061
1062#[cfg(all(test, target_os = "fuchsia"))]
1063mod tests {
1064 use super::*;
1065 use crate::encoding::MAGIC_NUMBER_INITIAL;
1066 use crate::epitaph::{self, ChannelEpitaphExt};
1067 use anyhow::{Context as _, Error};
1068 use assert_matches::assert_matches;
1069 use fuchsia_async as fasync;
1070 use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
1071 use futures::channel::oneshot;
1072 use futures::stream::FuturesUnordered;
1073 use futures::task::{ArcWake, noop_waker, waker};
1074 use futures::{StreamExt, TryFutureExt, join};
1075 use futures_test::task::new_count_waker;
1076 use std::future::pending;
1077 use std::thread;
1078 use zx::{AsHandleRef, MessageBufEtc};
1079
1080 const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
1081 const SEND_ORDINAL: u64 = 42 << 32;
1082 const SEND_DATA: u8 = 55;
1083
1084 const EVENT_ORDINAL: u64 = 854 << 23;
1085
1086 #[rustfmt::skip]
1087 fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
1088 [
1089 txid_index, 0, 0, txid_generation, 2, 0, 0, MAGIC_NUMBER_INITIAL,
1092 0, 0, 0, 0, SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, SEND_DATA, 0, 0, 0, 0, 0, 0, 0, ]
1097 }
1098
1099 fn expected_sent_bytes_oneway() -> [u8; 24] {
1100 expected_sent_bytes(0, 0)
1101 }
1102
1103 fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
1104 let (bytes, handles) = (&mut vec![], &mut vec![]);
1105 encode_transaction(header, bytes, handles);
1106 channel.write_etc(bytes, handles).expect("Server channel write failed");
1107 }
1108
1109 fn encode_transaction(
1110 header: TransactionHeader,
1111 bytes: &mut Vec<u8>,
1112 handles: &mut Vec<zx::HandleDisposition<'static>>,
1113 ) {
1114 let event = TransactionMessage { header, body: SEND_DATA };
1115 Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
1116 bytes, handles, event,
1117 )
1118 .expect("Encoding failure");
1119 }
1120
1121 #[test]
1122 fn sync_client() -> Result<(), Error> {
1123 let (client_end, server_end) = zx::Channel::create();
1124 let client = sync::Client::new(client_end, "test_protocol");
1125 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
1126 let mut received = MessageBufEtc::new();
1127 server_end.read_etc(&mut received).context("reading")?;
1128 assert_eq!(received.bytes(), expected_sent_bytes_oneway());
1129 Ok(())
1130 }
1131
1132 #[test]
1133 fn sync_client_with_response() -> Result<(), Error> {
1134 let (client_end, server_end) = zx::Channel::create();
1135 let client = sync::Client::new(client_end, "test_protocol");
1136 thread::spawn(move || {
1137 let mut received = MessageBufEtc::new();
1139 server_end
1140 .wait_handle(
1141 zx::Signals::CHANNEL_READABLE,
1142 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1143 )
1144 .expect("failed to wait for channel readable");
1145 server_end.read_etc(&mut received).expect("failed to read on server end");
1146 let (buf, _handles) = received.split_mut();
1147 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1148 assert_eq!(header.ordinal, SEND_ORDINAL);
1149 send_transaction(
1150 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1151 &server_end,
1152 );
1153 });
1154 let response_data = client
1155 .send_query::<u8, u8>(
1156 SEND_DATA,
1157 SEND_ORDINAL,
1158 DynamicFlags::empty(),
1159 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1160 )
1161 .context("sending query")?;
1162 assert_eq!(SEND_DATA, response_data);
1163 Ok(())
1164 }
1165
1166 #[test]
1167 fn sync_client_with_event_and_response() -> Result<(), Error> {
1168 let (client_end, server_end) = zx::Channel::create();
1169 let client = sync::Client::new(client_end, "test_protocol");
1170 thread::spawn(move || {
1171 let mut received = MessageBufEtc::new();
1173 server_end
1174 .wait_handle(
1175 zx::Signals::CHANNEL_READABLE,
1176 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1177 )
1178 .expect("failed to wait for channel readable");
1179 server_end.read_etc(&mut received).expect("failed to read on server end");
1180 let (buf, _handles) = received.split_mut();
1181 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1182 assert_ne!(header.tx_id, 0);
1183 assert_eq!(header.ordinal, SEND_ORDINAL);
1184 send_transaction(
1186 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1187 &server_end,
1188 );
1189 send_transaction(
1192 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1193 &server_end,
1194 );
1195 });
1196 let response_data = client
1197 .send_query::<u8, u8>(
1198 SEND_DATA,
1199 SEND_ORDINAL,
1200 DynamicFlags::empty(),
1201 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1202 )
1203 .context("sending query")?;
1204 assert_eq!(SEND_DATA, response_data);
1205
1206 let event_buf = client
1207 .wait_for_event(zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)))
1208 .context("waiting for event")?;
1209 let (bytes, _handles) = event_buf.split();
1210 let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
1211 assert_eq!(header.ordinal, EVENT_ORDINAL);
1212
1213 Ok(())
1214 }
1215
1216 #[test]
1217 fn sync_client_with_racing_events() -> Result<(), Error> {
1218 let (client_end, server_end) = zx::Channel::create();
1219 let client1 = Arc::new(sync::Client::new(client_end, "test_protocol"));
1220 let client2 = client1.clone();
1221
1222 let thread1 = thread::spawn(move || {
1223 let result = client1.wait_for_event(zx::MonotonicInstant::after(
1224 zx::MonotonicDuration::from_seconds(5),
1225 ));
1226 assert!(result.is_ok());
1227 });
1228
1229 let thread2 = thread::spawn(move || {
1230 let result = client2.wait_for_event(zx::MonotonicInstant::after(
1231 zx::MonotonicDuration::from_seconds(5),
1232 ));
1233 assert!(result.is_ok());
1234 });
1235
1236 send_transaction(
1237 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1238 &server_end,
1239 );
1240 send_transaction(
1241 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1242 &server_end,
1243 );
1244
1245 assert!(thread1.join().is_ok());
1246 assert!(thread2.join().is_ok());
1247
1248 Ok(())
1249 }
1250
1251 #[test]
1252 fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
1253 let (client_end, server_end) = zx::Channel::create();
1254 let client = sync::Client::new(client_end, "test_protocol");
1255 send_transaction(
1256 TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
1257 &server_end,
1258 );
1259 assert_matches!(
1260 client.wait_for_event(zx::MonotonicInstant::after(
1261 zx::MonotonicDuration::from_seconds(5)
1262 )),
1263 Err(crate::Error::UnexpectedSyncResponse)
1264 );
1265 Ok(())
1266 }
1267
1268 #[test]
1269 fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
1270 let (client_end, server_end) = zx::Channel::create();
1271 let client = sync::Client::new(client_end, "test_protocol");
1272 drop(server_end);
1273 assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
1274 Ok(())
1275 }
1276
1277 #[test]
1278 fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
1279 let (client_end, server_end) = zx::Channel::create();
1280 let client = sync::Client::new(client_end, "test_protocol");
1281 drop(server_end);
1282 assert_matches!(
1283 client.send_query::<u8, u8>(
1284 SEND_DATA,
1285 SEND_ORDINAL,
1286 DynamicFlags::empty(),
1287 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1288 ),
1289 Err(crate::Error::ClientChannelClosed {
1290 status: zx_status::Status::PEER_CLOSED,
1291 protocol_name: "test_protocol",
1292 epitaph: None,
1293 })
1294 );
1295 Ok(())
1296 }
1297
1298 #[test]
1301 fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
1302 let (client_end, server_end) = zx::Channel::create();
1303 let client = sync::Client::new(client_end, "test_protocol");
1304 server_end
1306 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1307 .expect("failed to write epitaph");
1308 assert_matches!(
1309 client.send_query::<u8, u8>(
1310 SEND_DATA,
1311 SEND_ORDINAL,
1312 DynamicFlags::empty(),
1313 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1314 ),
1315 Err(crate::Error::ClientChannelClosed {
1316 status: zx_status::Status::PEER_CLOSED,
1317 protocol_name: "test_protocol",
1318 epitaph: None,
1319 })
1320 );
1321 Ok(())
1322 }
1323
1324 #[test]
1325 fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
1326 let (client_end, server_end) = zx::Channel::create();
1327 let client = sync::Client::new(client_end, "test_protocol");
1328 server_end
1330 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1331 .expect("failed to write epitaph");
1332 assert_matches!(
1333 client.wait_for_event(zx::MonotonicInstant::after(
1334 zx::MonotonicDuration::from_seconds(5)
1335 )),
1336 Err(crate::Error::ClientChannelClosed {
1337 status: zx_status::Status::UNAVAILABLE,
1338 protocol_name: "test_protocol",
1339 epitaph: Some(epitaph),
1340 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1341 );
1342 Ok(())
1343 }
1344
1345 #[test]
1346 fn sync_client_into_channel() -> Result<(), Error> {
1347 let (client_end, _server_end) = zx::Channel::create();
1348 let client_end_raw = client_end.raw_handle();
1349 let client = sync::Client::new(client_end, "test_protocol");
1350 assert_eq!(client.into_channel().raw_handle(), client_end_raw);
1351 Ok(())
1352 }
1353
1354 #[fasync::run_singlethreaded(test)]
1355 async fn client() {
1356 let (client_end, server_end) = zx::Channel::create();
1357 let client_end = AsyncChannel::from_channel(client_end);
1358 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1359
1360 let server = AsyncChannel::from_channel(server_end);
1361 let receiver = async move {
1362 let mut buffer = MessageBufEtc::new();
1363 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1364 assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
1365 };
1366
1367 let receiver = receiver
1369 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1370 panic!("did not receive message in time!")
1371 });
1372
1373 client
1374 .send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
1375 .expect("failed to send msg");
1376
1377 receiver.await;
1378 }
1379
1380 #[fasync::run_singlethreaded(test)]
1381 async fn client_with_response() {
1382 let (client_end, server_end) = zx::Channel::create();
1383 let client_end = AsyncChannel::from_channel(client_end);
1384 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1385
1386 let server = AsyncChannel::from_channel(server_end);
1387 let mut buffer = MessageBufEtc::new();
1388 let receiver = async move {
1389 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1390 let two_way_tx_id = 1u8;
1391 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1392
1393 let (bytes, handles) = (&mut vec![], &mut vec![]);
1394 let header =
1395 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1396 encode_transaction(header, bytes, handles);
1397 server.write_etc(bytes, handles).expect("Server channel write failed");
1398 };
1399
1400 let receiver = receiver
1402 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1403 panic!("did not receiver message in time!")
1404 });
1405
1406 let sender = client
1407 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1408 .map_ok(|x| assert_eq!(x, SEND_DATA))
1409 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1410
1411 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1413 panic!("did not receive response in time!")
1414 });
1415
1416 let ((), ()) = join!(receiver, sender);
1417 }
1418
1419 #[fasync::run_singlethreaded(test)]
1420 async fn client_with_response_receives_epitaph() {
1421 let (client_end, server_end) = zx::Channel::create();
1422 let client_end = AsyncChannel::from_channel(client_end);
1423 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1424
1425 let server = AsyncChannel::from_channel(server_end);
1426 let mut buffer = zx::MessageBufEtc::new();
1427 let receiver = async move {
1428 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1429 server
1430 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1431 .expect("failed to write epitaph");
1432 };
1433 let receiver = receiver
1435 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1436 panic!("did not receive message in time!")
1437 });
1438
1439 let sender = async move {
1440 const ORDINAL: u64 = 42 << 32;
1441 let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
1442 assert_matches!(
1443 result,
1444 Err(crate::Error::ClientChannelClosed {
1445 status: zx_status::Status::UNAVAILABLE,
1446 protocol_name: "test_protocol",
1447 epitaph: Some(epitaph),
1448 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1449 );
1450 };
1451 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1453 panic!("did not receive response in time!")
1454 });
1455
1456 let ((), ()) = join!(receiver, sender);
1457 }
1458
1459 #[fasync::run_singlethreaded(test)]
1460 #[should_panic]
1461 async fn event_cant_be_taken_twice() {
1462 let (client_end, _) = zx::Channel::create();
1463 let client_end = AsyncChannel::from_channel(client_end);
1464 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1465 let _foo = client.take_event_receiver();
1466 client.take_event_receiver();
1467 }
1468
1469 #[fasync::run_singlethreaded(test)]
1470 async fn event_can_be_taken_after_drop() {
1471 let (client_end, _) = zx::Channel::create();
1472 let client_end = AsyncChannel::from_channel(client_end);
1473 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1474 let foo = client.take_event_receiver();
1475 drop(foo);
1476 client.take_event_receiver();
1477 }
1478
1479 #[fasync::run_singlethreaded(test)]
1480 async fn receiver_termination_test() {
1481 let (client_end, _) = zx::Channel::create();
1482 let client_end = AsyncChannel::from_channel(client_end);
1483 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1484 let mut foo = client.take_event_receiver();
1485 assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
1486 let _ = foo.next().await;
1487 assert!(
1488 foo.is_terminated(),
1489 "receiver should report terminated after seeing channel is closed"
1490 );
1491 }
1492
1493 #[fasync::run_singlethreaded(test)]
1494 #[should_panic(expected = "polled EventReceiver after `None`")]
1495 async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
1496 let (client_end, _) = zx::Channel::create();
1497 let client_end = AsyncChannel::from_channel(client_end);
1498 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1499 let foo = client.take_event_receiver();
1500 drop(foo);
1501 let mut bar = client.take_event_receiver();
1502 assert!(bar.next().await.is_none(), "read on closed channel should return none");
1503 let _ = bar.next().await;
1505 }
1506
1507 #[fasync::run_singlethreaded(test)]
1508 #[should_panic(expected = "polled EventReceiver after `None`")]
1509 async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
1510 let (client_end, server_end) = zx::Channel::create();
1511 let client_end = AsyncChannel::from_channel(client_end);
1512 let server_end = AsyncChannel::from_channel(server_end);
1513 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1514 let mut stream = client.take_event_receiver();
1515
1516 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1517 .expect("wrote epitaph");
1518 drop(server_end);
1519
1520 assert_matches!(
1521 stream.next().await,
1522 Some(Err(crate::Error::ClientChannelClosed {
1523 status: zx_status::Status::UNAVAILABLE,
1524 protocol_name: "test_protocol",
1525 epitaph: Some(epitaph),
1526 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1527 );
1528 assert_matches!(stream.next().await, None);
1529 let _ = stream.next().await;
1531 }
1532
1533 #[fasync::run_singlethreaded(test)]
1534 async fn event_can_be_taken() {
1535 let (client_end, _) = zx::Channel::create();
1536 let client_end = AsyncChannel::from_channel(client_end);
1537 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1538 client.take_event_receiver();
1539 }
1540
1541 #[fasync::run_singlethreaded(test)]
1542 async fn event_received() {
1543 let (client_end, server_end) = zx::Channel::create();
1544 let client_end = AsyncChannel::from_channel(client_end);
1545 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1546
1547 let server = AsyncChannel::from_channel(server_end);
1549 let (bytes, handles) = (&mut vec![], &mut vec![]);
1550 const ORDINAL: u64 = 5;
1551 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1552 encode_transaction(header, bytes, handles);
1553 server.write_etc(bytes, handles).expect("Server channel write failed");
1554 drop(server);
1555
1556 let recv = client
1557 .take_event_receiver()
1558 .into_future()
1559 .then(|(x, stream)| {
1560 let x = x.expect("should contain one element");
1561 let x = x.expect("fidl error");
1562 let x: i32 =
1563 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1564 .expect("failed to decode event");
1565 assert_eq!(x, 55);
1566 stream.into_future()
1567 })
1568 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1569
1570 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1572 panic!("did not receive event in time!")
1573 });
1574
1575 recv.await;
1576 }
1577
1578 #[fasync::run_singlethreaded(test)]
1582 async fn receiver_can_be_taken_after_end_of_stream() {
1583 let (client_end, server_end) = zx::Channel::create();
1584 let client_end = AsyncChannel::from_channel(client_end);
1585 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1586
1587 let server = AsyncChannel::from_channel(server_end);
1589 let (bytes, handles) = (&mut vec![], &mut vec![]);
1590 const ORDINAL: u64 = 5;
1591 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1592 encode_transaction(header, bytes, handles);
1593 server.write_etc(bytes, handles).expect("Server channel write failed");
1594 drop(server);
1595
1596 {
1600 let recv = client
1601 .take_event_receiver()
1602 .into_future()
1603 .then(|(x, stream)| {
1604 let x = x.expect("should contain one element");
1605 let x = x.expect("fidl error");
1606 let x: i32 =
1607 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1608 .expect("failed to decode event");
1609 assert_eq!(x, 55);
1610 stream.into_future()
1611 })
1612 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1613
1614 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1616 panic!("did not receive event in time!")
1617 });
1618
1619 recv.await;
1620 }
1621
1622 let mut c = client.take_event_receiver();
1625 assert!(
1626 c.next().await.is_none(),
1627 "receiver on closed channel should return none on first call"
1628 );
1629 }
1630
1631 #[fasync::run_singlethreaded(test)]
1632 async fn event_incompatible_format() {
1633 let (client_end, server_end) = zx::Channel::create();
1634 let client_end = AsyncChannel::from_channel(client_end);
1635 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1636
1637 let server = AsyncChannel::from_channel(server_end);
1639 let (bytes, handles) = (&mut vec![], &mut vec![]);
1640 let header = TransactionHeader::new_full(
1641 0,
1642 5,
1643 crate::encoding::Context {
1644 wire_format_version: crate::encoding::WireFormatVersion::V2,
1645 },
1646 DynamicFlags::empty(),
1647 0,
1648 );
1649 encode_transaction(header, bytes, handles);
1650 server.write_etc(bytes, handles).expect("Server channel write failed");
1651 drop(server);
1652
1653 let mut event_receiver = client.take_event_receiver();
1654 let recv = event_receiver.next().map(|event| {
1655 assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
1656 });
1657
1658 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1660 panic!("did not receive event in time!")
1661 });
1662
1663 recv.await;
1664 }
1665
1666 #[test]
1667 fn client_always_wakes_pending_futures() {
1668 let mut executor = fasync::TestExecutor::new();
1669
1670 let (client_end, server_end) = zx::Channel::create();
1671 let client_end = AsyncChannel::from_channel(client_end);
1672 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1673
1674 let mut event_receiver = client.take_event_receiver();
1675
1676 let (response_waker, response_waker_count) = new_count_waker();
1678 let response_cx = &mut Context::from_waker(&response_waker);
1679 let mut response_txid = Txid(0);
1680 let mut response_future = client
1681 .send_raw_query(|tx_id, bytes, handles| {
1682 response_txid = tx_id;
1683 let header = TransactionHeader::new(
1684 response_txid.as_raw_id(),
1685 SEND_ORDINAL,
1686 DynamicFlags::empty(),
1687 );
1688 encode_transaction(header, bytes, handles);
1689 Ok(())
1690 })
1691 .expect("Couldn't send query");
1692 assert!(response_future.poll_unpin(response_cx).is_pending());
1693
1694 let (event_waker, event_waker_count) = new_count_waker();
1696 let event_cx = &mut Context::from_waker(&event_waker);
1697 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1698
1699 assert_eq!(response_waker_count.get(), 0);
1701 assert_eq!(event_waker_count.get(), 0);
1702
1703 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1705
1706 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1708
1709 assert_eq!(response_waker_count.get(), 0);
1711 assert_eq!(event_waker_count.get(), 1);
1712
1713 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1715
1716 send_transaction(
1718 TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
1719 &server_end,
1720 );
1721
1722 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1724
1725 assert_eq!(response_waker_count.get(), 1);
1727 }
1728
1729 #[test]
1730 fn client_always_wakes_pending_futures_on_epitaph() {
1731 let mut executor = fasync::TestExecutor::new();
1732
1733 let (client_end, server_end) = zx::Channel::create();
1734 let client_end = AsyncChannel::from_channel(client_end);
1735 let server_end = AsyncChannel::from_channel(server_end);
1736 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1737
1738 let mut event_receiver = client.take_event_receiver();
1739
1740 let (response1_waker, response1_waker_count) = new_count_waker();
1742 let response1_cx = &mut Context::from_waker(&response1_waker);
1743 let mut response1_future = client
1744 .send_raw_query(|tx_id, bytes, handles| {
1745 let header =
1746 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1747 encode_transaction(header, bytes, handles);
1748 Ok(())
1749 })
1750 .expect("Couldn't send query");
1751 assert!(response1_future.poll_unpin(response1_cx).is_pending());
1752
1753 let (event_waker, event_waker_count) = new_count_waker();
1755 let event_cx = &mut Context::from_waker(&event_waker);
1756 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1757
1758 let (response2_waker, response2_waker_count) = new_count_waker();
1760 let response2_cx = &mut Context::from_waker(&response2_waker);
1761 let mut response2_future = client
1762 .send_raw_query(|tx_id, bytes, handles| {
1763 let header =
1764 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1765 encode_transaction(header, bytes, handles);
1766 Ok(())
1767 })
1768 .expect("Couldn't send query");
1769 assert!(response2_future.poll_unpin(response2_cx).is_pending());
1770
1771 let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
1772
1773 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1775
1776 assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
1778
1779 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1781 .expect("wrote epitaph");
1782
1783 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1785
1786 for wake_count in &wakers {
1789 assert_eq!(wake_count.get(), 1);
1790 }
1791
1792 assert_matches!(
1794 response1_future.poll_unpin(response1_cx),
1795 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1796 status: zx_status::Status::UNAVAILABLE,
1797 protocol_name: "test_protocol",
1798 epitaph: Some(epitaph),
1799 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1800 );
1801
1802 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1804
1805 assert_matches!(
1807 response2_future.poll_unpin(response2_cx),
1808 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1809 status: zx_status::Status::UNAVAILABLE,
1810 protocol_name: "test_protocol",
1811 epitaph: Some(epitaph),
1812 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1813 );
1814
1815 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1817 }
1818
1819 #[fasync::run_singlethreaded(test)]
1820 async fn client_allows_take_event_stream_even_if_event_delivered() {
1821 let (client_end, server_end) = zx::Channel::create();
1822 let client_end = AsyncChannel::from_channel(client_end);
1823 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1824
1825 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1827
1828 let (response_waker, _response_waker_count) = new_count_waker();
1830 let response_cx = &mut Context::from_waker(&response_waker);
1831 let mut response_future =
1832 client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
1833 assert!(response_future.poll_unpin(response_cx).is_pending());
1834
1835 let mut _event_receiver = client.take_event_receiver();
1837 }
1838
1839 #[fasync::run_singlethreaded(test)]
1840 async fn client_reports_epitaph_from_all_read_actions() {
1841 #[derive(Debug, PartialEq)]
1842 enum Action {
1843 SendMsg, SendQuery, WaitQuery, RecvEvent, }
1848 impl Action {
1849 fn should_report_epitaph(&self) -> bool {
1850 match self {
1851 Action::SendMsg | Action::SendQuery => false,
1852 Action::WaitQuery | Action::RecvEvent => true,
1853 }
1854 }
1855 }
1856 use Action::*;
1857 for two_actions in &[
1860 [SendMsg, SendMsg],
1861 [SendMsg, SendQuery],
1862 [SendMsg, WaitQuery],
1863 [SendMsg, RecvEvent],
1864 [SendQuery, SendMsg],
1865 [SendQuery, SendQuery],
1866 [SendQuery, WaitQuery],
1867 [SendQuery, RecvEvent],
1868 [WaitQuery, SendMsg],
1869 [WaitQuery, SendQuery],
1870 [WaitQuery, WaitQuery],
1871 [WaitQuery, RecvEvent],
1872 [RecvEvent, SendMsg],
1873 [RecvEvent, SendQuery],
1874 [RecvEvent, WaitQuery],
1875 ] {
1878 let (client_end, server_end) = zx::Channel::create();
1879 let client_end = AsyncChannel::from_channel(client_end);
1880 let client = Client::new(client_end, "test_protocol");
1881
1882 let server_end = AsyncChannel::from_channel(server_end);
1884 server_end
1885 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1886 .expect("failed to write epitaph");
1887
1888 let mut event_receiver = client.take_event_receiver();
1889
1890 for (index, action) in two_actions.iter().enumerate() {
1892 let err = match action {
1893 SendMsg => {
1894 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
1895 }
1896 WaitQuery => client
1897 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1898 .await
1899 .err(),
1900 SendQuery => client
1901 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1902 .check()
1903 .err(),
1904 RecvEvent => event_receiver.next().await.unwrap().err(),
1905 };
1906 let details = format!("index: {index:?}, two_actions: {two_actions:?}");
1907 match err {
1908 None => assert!(
1909 !action.should_report_epitaph(),
1910 "expected epitaph, but succeeded.\n{details}"
1911 ),
1912 Some(crate::Error::ClientChannelClosed {
1913 status: zx_status::Status::UNAVAILABLE,
1914 protocol_name: "test_protocol",
1915 epitaph: Some(epitaph),
1916 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
1917 action.should_report_epitaph(),
1918 "got epitaph unexpectedly.\n{details}",
1919 ),
1920 Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
1921 }
1922 }
1923
1924 if two_actions.contains(&RecvEvent) {
1926 assert_matches!(event_receiver.next().await, None);
1927 }
1928 }
1929 }
1930
1931 #[test]
1932 fn client_query_result_check() {
1933 let mut executor = fasync::TestExecutor::new();
1934 let (client_end, server_end) = zx::Channel::create();
1935 let client_end = AsyncChannel::from_channel(client_end);
1936 let client = Client::new(client_end, "test_protocol");
1937
1938 let server = AsyncChannel::from_channel(server_end);
1939
1940 let active_fut =
1942 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1943
1944 let mut checked_fut = active_fut.check().expect("failed to check future");
1945
1946 let mut buffer = MessageBufEtc::new();
1948 executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
1949 let two_way_tx_id = 1u8;
1950 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1951
1952 let (bytes, handles) = (&mut vec![], &mut vec![]);
1953 let header =
1954 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1955 encode_transaction(header, bytes, handles);
1956 server.write_etc(bytes, handles).expect("Server channel write failed");
1957
1958 executor
1959 .run_singlethreaded(&mut checked_fut)
1960 .map(|x| assert_eq!(x, SEND_DATA))
1961 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1962
1963 drop(server);
1965
1966 let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1967
1968 let mut checked_fut = query_fut.check().expect("failed to check future");
1970 assert_matches!(
1972 executor.run_singlethreaded(&mut checked_fut),
1973 Err(crate::Error::ClientChannelClosed {
1974 status: zx_status::Status::PEER_CLOSED,
1975 protocol_name: "test_protocol",
1976 epitaph: None,
1977 })
1978 );
1979 }
1980
1981 #[fasync::run_singlethreaded(test)]
1982 async fn client_into_channel() {
1983 let (client_end, _server_end) = zx::Channel::create();
1986 let client_end = AsyncChannel::from_channel(client_end);
1987 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1988
1989 assert!(client.into_channel().is_ok());
1990 }
1991
1992 #[fasync::run_singlethreaded(test)]
1993 async fn client_into_channel_outstanding_messages() {
1994 let (client_end, _server_end) = zx::Channel::create();
1997 let client_end = AsyncChannel::from_channel(client_end);
1998 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1999
2000 {
2001 let _sender =
2004 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2005 }
2006
2007 assert!(client.into_channel().is_err());
2008 }
2009
2010 #[fasync::run_singlethreaded(test)]
2011 async fn client_into_channel_active_clone() {
2012 let (client_end, _server_end) = zx::Channel::create();
2015 let client_end = AsyncChannel::from_channel(client_end);
2016 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2017
2018 let _cloned_client = client.clone();
2019
2020 assert!(client.into_channel().is_err());
2021 }
2022
2023 #[fasync::run_singlethreaded(test)]
2024 async fn client_into_channel_outstanding_messages_get_received() {
2025 let (client_end, server_end) = zx::Channel::create();
2026 let client_end = AsyncChannel::from_channel(client_end);
2027 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2028
2029 let server = AsyncChannel::from_channel(server_end);
2030 let mut buffer = MessageBufEtc::new();
2031 let receiver = async move {
2032 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2033 let two_way_tx_id = 1u8;
2034 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2035
2036 let (bytes, handles) = (&mut vec![], &mut vec![]);
2037 let header =
2038 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2039 encode_transaction(header, bytes, handles);
2040 server.write_etc(bytes, handles).expect("Server channel write failed");
2041 };
2042
2043 let receiver = receiver
2045 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2046 panic!("did not receiver message in time!")
2047 });
2048
2049 let sender = client
2050 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2051 .map_ok(|x| assert_eq!(x, SEND_DATA))
2052 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2053
2054 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2056 panic!("did not receive response in time!")
2057 });
2058
2059 let ((), ()) = join!(receiver, sender);
2060
2061 assert!(client.into_channel().is_ok());
2062 }
2063
2064 #[fasync::run_singlethreaded(test)]
2065 async fn client_decode_errors_are_broadcast() {
2066 let (client_end, server_end) = zx::Channel::create();
2067 let client_end = AsyncChannel::from_channel(client_end);
2068 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2069
2070 let server = AsyncChannel::from_channel(server_end);
2071
2072 let _server = fasync::Task::spawn(async move {
2073 let mut buffer = MessageBufEtc::new();
2074 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2075 let two_way_tx_id = 1u8;
2076 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2077
2078 let (bytes, handles) = (&mut vec![], &mut vec![]);
2079 let header =
2080 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2081 encode_transaction(header, bytes, handles);
2082 bytes[4] = 0;
2084 server.write_etc(bytes, handles).expect("Server channel write failed");
2085
2086 pending::<()>().await;
2088 });
2089
2090 let futures = FuturesUnordered::new();
2091
2092 for _ in 0..4 {
2093 futures.push(async {
2094 assert_matches!(
2095 client
2096 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2097 .map_ok(|x| assert_eq!(x, SEND_DATA))
2098 .await,
2099 Err(crate::Error::UnsupportedWireFormatVersion)
2100 );
2101 });
2102 }
2103
2104 futures
2105 .collect::<Vec<_>>()
2106 .on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
2107 .await;
2108 }
2109
2110 #[fasync::run_singlethreaded(test)]
2111 async fn into_channel_from_waker_succeeds() {
2112 let (client_end, server_end) = zx::Channel::create();
2113 let client_end = AsyncChannel::from_channel(client_end);
2114 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2115
2116 let server = AsyncChannel::from_channel(server_end);
2117 let mut buffer = MessageBufEtc::new();
2118 let receiver = async move {
2119 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2120 let two_way_tx_id = 1u8;
2121 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2122
2123 let (bytes, handles) = (&mut vec![], &mut vec![]);
2124 let header =
2125 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2126 encode_transaction(header, bytes, handles);
2127 server.write_etc(bytes, handles).expect("Server channel write failed");
2128 };
2129
2130 struct Sender {
2131 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
2132 }
2133
2134 let (done_tx, done_rx) = oneshot::channel();
2135
2136 let sender = Arc::new(Sender {
2137 future: Mutex::new(Box::pin(async move {
2138 client
2139 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2140 .map_ok(|x| assert_eq!(x, SEND_DATA))
2141 .unwrap_or_else(|e| panic!("fidl error: {e:?}"))
2142 .await;
2143
2144 assert!(client.into_channel().is_ok());
2145
2146 let _ = done_tx.send(());
2147 })),
2148 });
2149
2150 impl ArcWake for Sender {
2155 fn wake_by_ref(arc_self: &Arc<Self>) {
2156 assert!(
2157 arc_self
2158 .future
2159 .lock()
2160 .poll_unpin(&mut Context::from_waker(&noop_waker()))
2161 .is_ready()
2162 );
2163 }
2164 }
2165
2166 let waker = waker(sender.clone());
2167
2168 assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
2169
2170 receiver.await;
2171
2172 done_rx.await.unwrap();
2173 }
2174}