1use crate::encoding::{
8 decode_transaction_header, Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags,
9 Encode, Encoder, EpitaphBody, MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect,
10 TransactionHeader, TransactionMessage, TransactionMessageType, TypeMarker,
11};
12use crate::Error;
13use fuchsia_sync::Mutex;
14use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
15use futures::ready;
16use futures::stream::{FusedStream, Stream};
17use futures::task::{Context, Poll, Waker};
18use slab::Slab;
19use std::collections::VecDeque;
20use std::mem;
21use std::ops::ControlFlow;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{RawWaker, RawWakerVTable};
25use zx_status;
26
27#[doc(hidden)] pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
30 mut buf: D::MessageBufEtc,
31) -> Result<T::Owned, Error>
32where
33 T::Owned: Decode<T, D>,
34{
35 let (bytes, handles) = buf.split_mut();
36 let (header, body_bytes) = decode_transaction_header(bytes)?;
37 if header.ordinal != EXPECTED_ORDINAL {
38 return Err(Error::InvalidResponseOrdinal);
39 }
40 let mut output = Decode::<T, D>::new_empty();
41 Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
42 Ok(output)
43}
44
45#[derive(Debug, Clone)]
47pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
48 inner: Arc<ClientInner<D>>,
49}
50
51pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
53 MessageResponse<D>,
54 fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
55>;
56
57#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
62 pub MaybeDone<DecodedQueryResponseFut<T, D>>,
63);
64
65impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
66 fn is_terminated(&self) -> bool {
67 matches!(self.0, MaybeDone::Gone)
68 }
69}
70
71impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
72 type Output = Result<T, Error>;
73
74 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 ready!(self.0.poll_unpin(cx));
76 let maybe_done = Pin::new(&mut self.0);
77 Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
78 }
79}
80
81impl<T> QueryResponseFut<T> {
82 pub fn check(self) -> Result<Self, Error> {
89 match self.0 {
90 MaybeDone::Done(Err(e)) => Err(e),
91 x => Ok(QueryResponseFut(x)),
92 }
93 }
94}
95
96const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
97const TXID_GENERATION_SHIFT: usize = 24;
98const TXID_GENERATION_MASK: u8 = 0x7F;
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct Txid(u32);
103#[derive(Debug, Copy, Clone, PartialEq, Eq)]
105struct InterestId(usize);
106
107impl InterestId {
108 fn from_txid(txid: Txid) -> Self {
109 InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
110 }
111}
112
113impl Txid {
114 fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
115 let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
118 let generation = (generation & TXID_GENERATION_MASK) as u32;
120
121 let txid = (generation << TXID_GENERATION_SHIFT) | id;
126
127 Txid(txid)
128 }
129
130 pub fn as_raw_id(&self) -> u32 {
132 self.0
133 }
134}
135
136impl From<u32> for Txid {
137 fn from(txid: u32) -> Self {
138 Self(txid)
139 }
140}
141
142impl<D: ResourceDialect> Client<D> {
143 pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
148 Client {
149 inner: Arc::new(ClientInner {
150 channel: channel.boxed(),
151 interests: Mutex::default(),
152 terminal_error: Mutex::default(),
153 protocol_name,
154 }),
155 }
156 }
157
158 pub fn as_channel(&self) -> &D::ProxyChannel {
160 self.inner.channel.as_channel()
161 }
162
163 pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
170 match Arc::try_unwrap(self.inner) {
180 Ok(inner) => {
181 if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
182 Ok(inner.channel.unbox())
183 } else {
184 Err(Self { inner: Arc::new(inner) })
189 }
190 }
191 Err(inner) => Err(Self { inner }),
192 }
193 }
194
195 pub fn take_event_receiver(&self) -> EventReceiver<D> {
198 {
199 let mut lock = self.inner.interests.lock();
200
201 if let EventListener::None = lock.event_listener {
202 lock.event_listener = EventListener::WillPoll;
203 } else {
204 panic!("Event stream was already taken");
205 }
206 }
207
208 EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
209 }
210
211 pub fn send<T: TypeMarker>(
213 &self,
214 body: impl Encode<T, D>,
215 ordinal: u64,
216 dynamic_flags: DynamicFlags,
217 ) -> Result<(), Error> {
218 let msg =
219 TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
220 crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
221 msg,
222 |bytes, handles| self.send_raw(bytes, handles),
223 )
224 }
225
226 pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
228 &self,
229 body: impl Encode<Request, D>,
230 dynamic_flags: DynamicFlags,
231 ) -> QueryResponseFut<Response::Owned, D>
232 where
233 Response::Owned: Decode<Response, D>,
234 {
235 self.send_query_and_decode::<Request, Response::Owned>(
236 body,
237 ORDINAL,
238 dynamic_flags,
239 |buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
240 )
241 }
242
243 pub fn send_query_and_decode<Request: TypeMarker, Output>(
246 &self,
247 body: impl Encode<Request, D>,
248 ordinal: u64,
249 dynamic_flags: DynamicFlags,
250 decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
251 ) -> QueryResponseFut<Output, D> {
252 let send_result = self.send_raw_query(|tx_id, bytes, handles| {
253 let msg = TransactionMessage {
254 header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
255 body,
256 };
257 Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
258 Ok(())
259 });
260
261 QueryResponseFut(match send_result {
262 Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
263 Err(e) => MaybeDone::Done(Err(e)),
264 })
265 }
266
267 pub fn send_raw(
269 &self,
270 bytes: &[u8],
271 handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
272 ) -> Result<(), Error> {
273 match self.inner.channel.write_etc(bytes, handles) {
274 Ok(()) | Err(None) => Ok(()),
275 Err(Some(e)) => Err(Error::ClientWrite(e.into())),
276 }
277 }
278
279 pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
281 where
282 F: for<'a, 'b> FnOnce(
283 Txid,
284 &'a mut Vec<u8>,
285 &'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
286 ) -> Result<(), Error>,
287 {
288 let id = self.inner.interests.lock().register_msg_interest();
289 crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
290 encode_msg(id, bytes, handles)?;
291 self.send_raw(bytes, handles)
292 })?;
293
294 Ok(MessageResponse { id, client: Some(self.inner.clone()) })
295 }
296}
297
298#[must_use]
299#[derive(Debug)]
301pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
302 id: Txid,
303 client: Option<Arc<ClientInner<D>>>,
305}
306
307impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
308
309impl<D: ResourceDialect> Future for MessageResponse<D> {
310 type Output = Result<D::MessageBufEtc, Error>;
311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 let this = &mut *self;
313 let res;
314 {
315 let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
316 res = client.poll_recv_msg_response(this.id, cx);
317 }
318
319 if let Poll::Ready(Ok(_)) = res {
321 this.client.take().expect("MessageResponse polled after completion");
322 }
323
324 res
325 }
326}
327
328impl<D: ResourceDialect> Drop for MessageResponse<D> {
329 fn drop(&mut self) {
330 if let Some(client) = &self.client {
331 client.interests.lock().deregister(self.id);
332 }
333 }
334}
335
336#[derive(Debug)]
339enum MessageInterest<D: ResourceDialect> {
340 WillPoll,
342 Waiting(Waker),
344 Received(D::MessageBufEtc),
346 Discard,
349}
350
351impl<D: ResourceDialect> MessageInterest<D> {
352 fn is_received(&self) -> bool {
354 matches!(*self, MessageInterest::Received(_))
355 }
356
357 fn unwrap_received(self) -> D::MessageBufEtc {
358 if let MessageInterest::Received(buf) = self {
359 buf
360 } else {
361 panic!("EXPECTED received message")
362 }
363 }
364}
365
366#[derive(Debug)]
367enum EventReceiverState {
368 Active,
369 Terminal,
370 Terminated,
371}
372
373#[derive(Debug)]
375pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
376 inner: Arc<ClientInner<D>>,
377 state: EventReceiverState,
378}
379
380impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
381
382impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
383 fn is_terminated(&self) -> bool {
384 matches!(self.state, EventReceiverState::Terminated)
385 }
386}
387
388impl<D: ResourceDialect> Stream for EventReceiver<D> {
393 type Item = Result<D::MessageBufEtc, Error>;
394
395 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
396 match self.state {
397 EventReceiverState::Active => {}
398 EventReceiverState::Terminated => {
399 panic!("polled EventReceiver after `None`");
400 }
401 EventReceiverState::Terminal => {
402 self.state = EventReceiverState::Terminated;
403 return Poll::Ready(None);
404 }
405 }
406
407 Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
408 Ok(x) => Some(Ok(x)),
409 Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
410 self.state = EventReceiverState::Terminated;
413 None
414 }
415 err @ Err(_) => {
416 self.state = EventReceiverState::Terminal;
419 Some(err)
420 }
421 })
422 }
423}
424
425impl<D: ResourceDialect> Drop for EventReceiver<D> {
426 fn drop(&mut self) {
427 self.inner.interests.lock().dropped_event_listener();
428 }
429}
430
431#[derive(Debug, Default)]
432enum EventListener {
433 #[default]
435 None,
436 WillPoll,
438 Some(Waker),
440}
441
442impl EventListener {
443 fn is_some(&self) -> bool {
444 matches!(self, EventListener::Some(_))
445 }
446}
447
448#[derive(Debug)]
450struct ClientInner<D: ResourceDialect> {
451 channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
453
454 interests: Mutex<Interests<D>>,
456
457 terminal_error: Mutex<Option<Error>>,
460
461 protocol_name: &'static str,
463}
464
465#[derive(Debug)]
466struct Interests<D: ResourceDialect> {
467 messages: Slab<MessageInterest<D>>,
468 events: VecDeque<D::MessageBufEtc>,
469 event_listener: EventListener,
470 waker_count: usize,
472 generation: u8,
478}
479
480impl<D: ResourceDialect> Default for Interests<D> {
481 fn default() -> Self {
482 Interests {
483 messages: Slab::new(),
484 events: Default::default(),
485 event_listener: Default::default(),
486 waker_count: 0,
487 generation: 0,
488 }
489 }
490}
491
492impl<D: ResourceDialect> Interests<D> {
493 fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
495 self.events.push_back(buf);
496 self.take_event_waker()
497 }
498
499 fn take_event_waker(&mut self) -> Option<Waker> {
501 if self.event_listener.is_some() {
502 let EventListener::Some(waker) =
503 mem::replace(&mut self.event_listener, EventListener::WillPoll)
504 else {
505 unreachable!()
506 };
507
508 self.waker_count -= 1;
510 Some(waker)
511 } else {
512 None
513 }
514 }
515
516 fn event_waker(&self) -> Option<&Waker> {
518 match &self.event_listener {
519 EventListener::Some(waker) => Some(waker),
520 _ => None,
521 }
522 }
523
524 fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
527 let InterestId(raw_id) = InterestId::from_txid(txid);
528 let Some(interest) = self.messages.get_mut(raw_id) else {
531 return Err(Error::InvalidResponseTxid);
533 };
534
535 let mut waker = None;
536 if let MessageInterest::Discard = interest {
537 self.messages.remove(raw_id);
538 } else if let MessageInterest::Waiting(w) =
539 mem::replace(interest, MessageInterest::Received(buf))
540 {
541 waker = Some(w);
542
543 self.waker_count -= 1;
545 }
546
547 Ok(waker)
548 }
549
550 fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
553 let InterestId(raw_id) = InterestId::from_txid(txid);
554 let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
555 match interest {
556 MessageInterest::Received(_) => {
557 return Some(self.messages.remove(raw_id).unwrap_received())
558 }
559 MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
560 MessageInterest::WillPoll => self.waker_count += 1,
561 MessageInterest::Waiting(_) => {}
562 }
563 *interest = MessageInterest::Waiting(cx.waker().clone());
564 None
565 }
566
567 fn deregister(&mut self, txid: Txid) {
569 let InterestId(raw_id) = InterestId::from_txid(txid);
570 match self.messages[raw_id] {
571 MessageInterest::Received(_) => {
572 self.messages.remove(raw_id);
573 return;
574 }
575 MessageInterest::WillPoll => {}
576 MessageInterest::Waiting(_) => self.waker_count -= 1,
577 MessageInterest::Discard => unreachable!(),
578 }
579 self.messages[raw_id] = MessageInterest::Discard;
580 }
581
582 fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
584 self.events.pop_front().or_else(|| {
585 if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
586 .is_some()
587 {
588 self.waker_count += 1;
589 }
590 None
591 })
592 }
593
594 fn dropped_event_listener(&mut self) {
596 if self.event_listener.is_some() {
597 self.waker_count -= 1;
599 }
600 self.event_listener = EventListener::None;
601 }
602
603 fn register_msg_interest(&mut self) -> Txid {
608 self.generation = self.generation.wrapping_add(1);
609 Txid::from_interest_id(
612 InterestId(self.messages.insert(MessageInterest::WillPoll)),
613 self.generation,
614 )
615 }
616}
617
618impl<D: ResourceDialect> ClientInner<D> {
619 fn poll_recv_event(
620 self: &Arc<Self>,
621 cx: &Context<'_>,
622 ) -> Poll<Result<D::MessageBufEtc, Error>> {
623 if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
625 return Poll::Ready(Ok(msg_buf));
626 }
627
628 let maybe_terminal_error = self.recv_all(Some(Txid(0)));
631
632 let mut lock = self.interests.lock();
633
634 if let Some(msg_buf) = lock.events.pop_front() {
635 Poll::Ready(Ok(msg_buf))
636 } else {
637 maybe_terminal_error?;
638 Poll::Pending
639 }
640 }
641
642 fn poll_recv_msg_response(
645 self: &Arc<Self>,
646 txid: Txid,
647 cx: &Context<'_>,
648 ) -> Poll<Result<D::MessageBufEtc, Error>> {
649 if let Some(buf) = self.interests.lock().register(txid, cx) {
651 return Poll::Ready(Ok(buf));
652 }
653
654 let maybe_terminal_error = self.recv_all(Some(txid));
657
658 let InterestId(raw_id) = InterestId::from_txid(txid);
659 let mut interests = self.interests.lock();
660 if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
661 let buf = interests.messages.remove(raw_id).unwrap_received();
664 Poll::Ready(Ok(buf))
665 } else {
666 maybe_terminal_error?;
667 Poll::Pending
668 }
669 }
670
671 fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
681 let mut terminal_error = self.terminal_error.lock();
685 if let Some(error) = terminal_error.as_ref() {
686 return Err(error.clone());
687 }
688
689 let recv_once = |waker| {
690 let cx = &mut Context::from_waker(&waker);
691
692 let mut buf = D::MessageBufEtc::new();
693 let result = self.channel.recv_etc_from(cx, &mut buf);
694 match result {
695 Poll::Ready(Ok(())) => {}
696 Poll::Ready(Err(None)) => {
697 return Err(Error::ClientChannelClosed {
700 status: zx_status::Status::PEER_CLOSED,
701 protocol_name: self.protocol_name,
702 epitaph: None,
703 #[cfg(not(target_os = "fuchsia"))]
704 reason: self.channel.closed_reason(),
705 });
706 }
707 Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
708 Poll::Pending => return Ok(ControlFlow::Break(())),
709 };
710
711 let (bytes, _) = buf.split_mut();
712 let (header, body_bytes) = decode_transaction_header(bytes)?;
713 if header.is_epitaph() {
714 let handles = &mut [];
716 let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
717 Decoder::<D>::decode_into::<EpitaphBody>(
718 &header,
719 body_bytes,
720 handles,
721 &mut epitaph_body,
722 )?;
723 return Err(Error::ClientChannelClosed {
724 status: epitaph_body.error,
725 protocol_name: self.protocol_name,
726 epitaph: Some(epitaph_body.error.into_raw() as u32),
727 #[cfg(not(target_os = "fuchsia"))]
728 reason: self.channel.closed_reason(),
729 });
730 }
731
732 let txid = Txid(header.tx_id);
733
734 let waker = {
735 buf.shrink_bytes_to_fit();
736 let mut interests = self.interests.lock();
737 if txid == Txid(0) {
738 interests.push_event(buf)
739 } else {
740 interests.push_message(txid, buf)?
741 }
742 };
743
744 if want_txid != Some(txid) {
746 if let Some(waker) = waker {
747 waker.wake();
748 }
749 }
750
751 Ok(ControlFlow::Continue(()))
752 };
753
754 loop {
755 let waker = {
756 let interests = self.interests.lock();
757 if interests.waker_count == 0 {
758 return Ok(());
759 } else if interests.waker_count == 1 {
760 if let Some(waker) = interests.event_waker() {
766 waker.clone()
767 } else {
768 interests
769 .messages
770 .iter()
771 .find_map(|(_, interest)| {
772 if let MessageInterest::Waiting(waker) = interest {
773 Some(waker.clone())
774 } else {
775 None
776 }
777 })
778 .unwrap()
779 }
780 } else {
781 let weak = Arc::downgrade(self);
782 let waker = ClientWaker(Arc::new(move || {
783 if let Some(strong) = weak.upgrade() {
784 #[cfg(target_os = "fuchsia")]
787 if strong.recv_all(None).is_ok() {
788 return;
789 }
790
791 strong.wake_all();
792 }
793 }));
794 unsafe {
799 Waker::from_raw(RawWaker::new(
800 Arc::into_raw(Arc::new(waker)) as *const (),
801 &WAKER_VTABLE,
802 ))
803 }
804 }
805 };
806
807 match recv_once(waker) {
808 Ok(ControlFlow::Continue(())) => {}
809 Ok(ControlFlow::Break(())) => return Ok(()),
810 Err(error) => {
811 self.wake_all();
813 return Err(terminal_error.insert(error).clone());
814 }
815 }
816 }
817 }
818
819 fn wake_all(&self) {
821 let mut lock = self.interests.lock();
822 for (_, interest) in &mut lock.messages {
823 if let MessageInterest::Waiting(_) = interest {
824 let MessageInterest::Waiting(waker) =
825 mem::replace(interest, MessageInterest::WillPoll)
826 else {
827 unreachable!()
828 };
829 waker.wake();
830 }
831 }
832 if let Some(waker) = lock.take_event_waker() {
833 waker.wake();
834 }
835 lock.waker_count = 0;
836 }
837}
838
839#[derive(Clone)]
840struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
841
842static WAKER_VTABLE: RawWakerVTable =
843 RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
844
845unsafe fn clone_waker(data: *const ()) -> RawWaker {
846 Arc::increment_strong_count(data as *const ClientWaker);
847 RawWaker::new(data, &WAKER_VTABLE)
848}
849
850unsafe fn wake(data: *const ()) {
851 Arc::from_raw(data as *const ClientWaker).0();
852}
853
854unsafe fn wake_by_ref(data: *const ()) {
855 mem::ManuallyDrop::new(Arc::from_raw(data as *const ClientWaker)).0();
856}
857
858unsafe fn drop_waker(data: *const ()) {
859 Arc::from_raw(data as *const ClientWaker);
860}
861
862#[cfg(target_os = "fuchsia")]
863pub mod sync {
864 use super::*;
867 use std::mem::MaybeUninit;
868 use zx::{self as zx, AsHandleRef, MessageBufEtc};
869
870 #[derive(Debug)]
872 pub struct Client {
873 channel: zx::Channel,
875
876 protocol_name: &'static str,
878 }
879
880 impl Client {
881 pub fn new(channel: zx::Channel, protocol_name: &'static str) -> Self {
883 Client { channel, protocol_name }
884 }
885
886 pub fn as_channel(&self) -> &zx::Channel {
888 &self.channel
889 }
890
891 pub fn into_channel(self) -> zx::Channel {
893 self.channel
894 }
895
896 pub fn send<T: TypeMarker>(
898 &self,
899 body: impl Encode<T, DefaultFuchsiaResourceDialect>,
900 ordinal: u64,
901 dynamic_flags: DynamicFlags,
902 ) -> Result<(), Error> {
903 let mut write_bytes = Vec::new();
904 let mut write_handles = Vec::new();
905 let msg = TransactionMessage {
906 header: TransactionHeader::new(0, ordinal, dynamic_flags),
907 body,
908 };
909 Encoder::encode::<TransactionMessageType<T>>(
910 &mut write_bytes,
911 &mut write_handles,
912 msg,
913 )?;
914 match self.channel.write_etc(&write_bytes, &mut write_handles) {
915 Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
916 Err(e) => Err(Error::ClientWrite(e.into())),
917 }
918 }
919
920 pub fn send_query<Request: TypeMarker, Response: TypeMarker>(
922 &self,
923 body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
924 ordinal: u64,
925 dynamic_flags: DynamicFlags,
926 deadline: zx::MonotonicInstant,
927 ) -> Result<Response::Owned, Error>
928 where
929 Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
930 {
931 let mut write_bytes = Vec::new();
932 let mut write_handles = Vec::new();
933
934 let msg = TransactionMessage {
935 header: TransactionHeader::new(0, ordinal, dynamic_flags),
936 body,
937 };
938 Encoder::encode::<TransactionMessageType<Request>>(
939 &mut write_bytes,
940 &mut write_handles,
941 msg,
942 )?;
943
944 let bytes_out =
948 &mut [MaybeUninit::<u8>::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize];
949 let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
950 zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
951
952 let (bytes_out, handles_out) = self
955 .channel
956 .call_etc_uninit(deadline, &write_bytes, &mut write_handles, bytes_out, handles_out)
957 .map_err(|e| self.wrap_error(Error::ClientCall, e))?;
958
959 let (header, body_bytes) = decode_transaction_header(bytes_out)?;
960 if header.ordinal != ordinal {
961 return Err(Error::InvalidResponseOrdinal);
962 }
963 let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
964 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
965 &header,
966 body_bytes,
967 handles_out,
968 &mut output,
969 )?;
970 Ok(output)
971 }
972
973 pub fn wait_for_event(
975 &self,
976 deadline: zx::MonotonicInstant,
977 ) -> Result<MessageBufEtc, Error> {
978 let mut buf = zx::MessageBufEtc::new();
979 buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
980 buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
981
982 loop {
983 self.channel
984 .wait_handle(
985 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
986 deadline,
987 )
988 .map_err(|e| self.wrap_error(Error::ClientEvent, e))?;
989 match self.channel.read_etc(&mut buf) {
990 Ok(()) => {
991 let (header, body_bytes) = decode_transaction_header(buf.bytes())
994 .map_err(|_| Error::InvalidHeader)?;
995 if header.is_epitaph() {
996 let handles = &mut [];
999 let mut epitaph_body =
1000 Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
1001 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
1002 &header,
1003 body_bytes,
1004 handles,
1005 &mut epitaph_body,
1006 )?;
1007 return Err(Error::ClientChannelClosed {
1008 status: epitaph_body.error,
1009 protocol_name: self.protocol_name,
1010 epitaph: Some(epitaph_body.error.into_raw() as u32),
1011 });
1012 }
1013 if header.tx_id != 0 {
1014 return Err(Error::UnexpectedSyncResponse);
1015 }
1016 return Ok(buf);
1017 }
1018 Err(zx::Status::SHOULD_WAIT) => {
1019 continue;
1021 }
1022 Err(e) => {
1023 return Err(self.wrap_error(|x| Error::ClientRead(x.into()), e));
1024 }
1025 }
1026 }
1027 }
1028
1029 fn wrap_error<T: Fn(zx_status::Status) -> Error>(
1033 &self,
1034 variant: T,
1035 err: zx_status::Status,
1036 ) -> Error {
1037 if err == zx_status::Status::PEER_CLOSED {
1038 Error::ClientChannelClosed {
1039 status: zx_status::Status::PEER_CLOSED,
1040 protocol_name: self.protocol_name,
1041 epitaph: None,
1042 }
1043 } else {
1044 variant(err)
1045 }
1046 }
1047 }
1048}
1049
1050#[cfg(all(test, target_os = "fuchsia"))]
1051mod tests {
1052 use super::*;
1053 use crate::encoding::MAGIC_NUMBER_INITIAL;
1054 use crate::epitaph::{self, ChannelEpitaphExt};
1055 use anyhow::{Context as _, Error};
1056 use assert_matches::assert_matches;
1057 use fuchsia_async as fasync;
1058 use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
1059 use futures::channel::oneshot;
1060 use futures::stream::FuturesUnordered;
1061 use futures::task::{noop_waker, waker, ArcWake};
1062 use futures::{join, StreamExt, TryFutureExt};
1063 use futures_test::task::new_count_waker;
1064 use std::future::pending;
1065 use std::thread;
1066 use zx::{AsHandleRef, MessageBufEtc};
1067
1068 const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
1069 const SEND_ORDINAL: u64 = 42 << 32;
1070 const SEND_DATA: u8 = 55;
1071
1072 const EVENT_ORDINAL: u64 = 854 << 23;
1073
1074 #[rustfmt::skip]
1075 fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
1076 [
1077 txid_index, 0, 0, txid_generation, 2, 0, 0, MAGIC_NUMBER_INITIAL,
1080 0, 0, 0, 0, SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, SEND_DATA, 0, 0, 0, 0, 0, 0, 0, ]
1085 }
1086
1087 fn expected_sent_bytes_oneway() -> [u8; 24] {
1088 expected_sent_bytes(0, 0)
1089 }
1090
1091 fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
1092 let (bytes, handles) = (&mut vec![], &mut vec![]);
1093 encode_transaction(header, bytes, handles);
1094 channel.write_etc(bytes, handles).expect("Server channel write failed");
1095 }
1096
1097 fn encode_transaction(
1098 header: TransactionHeader,
1099 bytes: &mut Vec<u8>,
1100 handles: &mut Vec<zx::HandleDisposition<'static>>,
1101 ) {
1102 let event = TransactionMessage { header, body: SEND_DATA };
1103 Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
1104 bytes, handles, event,
1105 )
1106 .expect("Encoding failure");
1107 }
1108
1109 #[test]
1110 fn sync_client() -> Result<(), Error> {
1111 let (client_end, server_end) = zx::Channel::create();
1112 let client = sync::Client::new(client_end, "test_protocol");
1113 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
1114 let mut received = MessageBufEtc::new();
1115 server_end.read_etc(&mut received).context("reading")?;
1116 assert_eq!(received.bytes(), expected_sent_bytes_oneway());
1117 Ok(())
1118 }
1119
1120 #[test]
1121 fn sync_client_with_response() -> Result<(), Error> {
1122 let (client_end, server_end) = zx::Channel::create();
1123 let client = sync::Client::new(client_end, "test_protocol");
1124 thread::spawn(move || {
1125 let mut received = MessageBufEtc::new();
1127 server_end
1128 .wait_handle(
1129 zx::Signals::CHANNEL_READABLE,
1130 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1131 )
1132 .expect("failed to wait for channel readable");
1133 server_end.read_etc(&mut received).expect("failed to read on server end");
1134 let (buf, _handles) = received.split_mut();
1135 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1136 assert_eq!(header.ordinal, SEND_ORDINAL);
1137 send_transaction(
1138 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1139 &server_end,
1140 );
1141 });
1142 let response_data = client
1143 .send_query::<u8, u8>(
1144 SEND_DATA,
1145 SEND_ORDINAL,
1146 DynamicFlags::empty(),
1147 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1148 )
1149 .context("sending query")?;
1150 assert_eq!(SEND_DATA, response_data);
1151 Ok(())
1152 }
1153
1154 #[test]
1155 fn sync_client_with_event_and_response() -> Result<(), Error> {
1156 let (client_end, server_end) = zx::Channel::create();
1157 let client = sync::Client::new(client_end, "test_protocol");
1158 thread::spawn(move || {
1159 let mut received = MessageBufEtc::new();
1161 server_end
1162 .wait_handle(
1163 zx::Signals::CHANNEL_READABLE,
1164 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1165 )
1166 .expect("failed to wait for channel readable");
1167 server_end.read_etc(&mut received).expect("failed to read on server end");
1168 let (buf, _handles) = received.split_mut();
1169 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1170 assert_ne!(header.tx_id, 0);
1171 assert_eq!(header.ordinal, SEND_ORDINAL);
1172 send_transaction(
1174 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1175 &server_end,
1176 );
1177 send_transaction(
1180 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1181 &server_end,
1182 );
1183 });
1184 let response_data = client
1185 .send_query::<u8, u8>(
1186 SEND_DATA,
1187 SEND_ORDINAL,
1188 DynamicFlags::empty(),
1189 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1190 )
1191 .context("sending query")?;
1192 assert_eq!(SEND_DATA, response_data);
1193
1194 let event_buf = client
1195 .wait_for_event(zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)))
1196 .context("waiting for event")?;
1197 let (bytes, _handles) = event_buf.split();
1198 let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
1199 assert_eq!(header.ordinal, EVENT_ORDINAL);
1200
1201 Ok(())
1202 }
1203
1204 #[test]
1205 fn sync_client_with_racing_events() -> Result<(), Error> {
1206 let (client_end, server_end) = zx::Channel::create();
1207 let client1 = Arc::new(sync::Client::new(client_end, "test_protocol"));
1208 let client2 = client1.clone();
1209
1210 let thread1 = thread::spawn(move || {
1211 let result = client1.wait_for_event(zx::MonotonicInstant::after(
1212 zx::MonotonicDuration::from_seconds(5),
1213 ));
1214 assert!(result.is_ok());
1215 });
1216
1217 let thread2 = thread::spawn(move || {
1218 let result = client2.wait_for_event(zx::MonotonicInstant::after(
1219 zx::MonotonicDuration::from_seconds(5),
1220 ));
1221 assert!(result.is_ok());
1222 });
1223
1224 send_transaction(
1225 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1226 &server_end,
1227 );
1228 send_transaction(
1229 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1230 &server_end,
1231 );
1232
1233 assert!(thread1.join().is_ok());
1234 assert!(thread2.join().is_ok());
1235
1236 Ok(())
1237 }
1238
1239 #[test]
1240 fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
1241 let (client_end, server_end) = zx::Channel::create();
1242 let client = sync::Client::new(client_end, "test_protocol");
1243 send_transaction(
1244 TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
1245 &server_end,
1246 );
1247 assert_matches!(
1248 client.wait_for_event(zx::MonotonicInstant::after(
1249 zx::MonotonicDuration::from_seconds(5)
1250 )),
1251 Err(crate::Error::UnexpectedSyncResponse)
1252 );
1253 Ok(())
1254 }
1255
1256 #[test]
1257 fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
1258 let (client_end, server_end) = zx::Channel::create();
1259 let client = sync::Client::new(client_end, "test_protocol");
1260 drop(server_end);
1261 assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
1262 Ok(())
1263 }
1264
1265 #[test]
1266 fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
1267 let (client_end, server_end) = zx::Channel::create();
1268 let client = sync::Client::new(client_end, "test_protocol");
1269 drop(server_end);
1270 assert_matches!(
1271 client.send_query::<u8, u8>(
1272 SEND_DATA,
1273 SEND_ORDINAL,
1274 DynamicFlags::empty(),
1275 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1276 ),
1277 Err(crate::Error::ClientChannelClosed {
1278 status: zx_status::Status::PEER_CLOSED,
1279 protocol_name: "test_protocol",
1280 epitaph: None,
1281 })
1282 );
1283 Ok(())
1284 }
1285
1286 #[test]
1289 fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
1290 let (client_end, server_end) = zx::Channel::create();
1291 let client = sync::Client::new(client_end, "test_protocol");
1292 server_end
1294 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1295 .expect("failed to write epitaph");
1296 assert_matches!(
1297 client.send_query::<u8, u8>(
1298 SEND_DATA,
1299 SEND_ORDINAL,
1300 DynamicFlags::empty(),
1301 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1302 ),
1303 Err(crate::Error::ClientChannelClosed {
1304 status: zx_status::Status::PEER_CLOSED,
1305 protocol_name: "test_protocol",
1306 epitaph: None,
1307 })
1308 );
1309 Ok(())
1310 }
1311
1312 #[test]
1313 fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
1314 let (client_end, server_end) = zx::Channel::create();
1315 let client = sync::Client::new(client_end, "test_protocol");
1316 server_end
1318 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1319 .expect("failed to write epitaph");
1320 assert_matches!(
1321 client.wait_for_event(zx::MonotonicInstant::after(
1322 zx::MonotonicDuration::from_seconds(5)
1323 )),
1324 Err(crate::Error::ClientChannelClosed {
1325 status: zx_status::Status::UNAVAILABLE,
1326 protocol_name: "test_protocol",
1327 epitaph: Some(epitaph),
1328 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1329 );
1330 Ok(())
1331 }
1332
1333 #[test]
1334 fn sync_client_into_channel() -> Result<(), Error> {
1335 let (client_end, _server_end) = zx::Channel::create();
1336 let client_end_raw = client_end.raw_handle();
1337 let client = sync::Client::new(client_end, "test_protocol");
1338 assert_eq!(client.into_channel().raw_handle(), client_end_raw);
1339 Ok(())
1340 }
1341
1342 #[fasync::run_singlethreaded(test)]
1343 async fn client() {
1344 let (client_end, server_end) = zx::Channel::create();
1345 let client_end = AsyncChannel::from_channel(client_end);
1346 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1347
1348 let server = AsyncChannel::from_channel(server_end);
1349 let receiver = async move {
1350 let mut buffer = MessageBufEtc::new();
1351 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1352 assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
1353 };
1354
1355 let receiver = receiver
1357 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1358 panic!("did not receive message in time!")
1359 });
1360
1361 client
1362 .send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
1363 .expect("failed to send msg");
1364
1365 receiver.await;
1366 }
1367
1368 #[fasync::run_singlethreaded(test)]
1369 async fn client_with_response() {
1370 let (client_end, server_end) = zx::Channel::create();
1371 let client_end = AsyncChannel::from_channel(client_end);
1372 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1373
1374 let server = AsyncChannel::from_channel(server_end);
1375 let mut buffer = MessageBufEtc::new();
1376 let receiver = async move {
1377 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1378 let two_way_tx_id = 1u8;
1379 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1380
1381 let (bytes, handles) = (&mut vec![], &mut vec![]);
1382 let header =
1383 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1384 encode_transaction(header, bytes, handles);
1385 server.write_etc(bytes, handles).expect("Server channel write failed");
1386 };
1387
1388 let receiver = receiver
1390 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1391 panic!("did not receiver message in time!")
1392 });
1393
1394 let sender = client
1395 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1396 .map_ok(|x| assert_eq!(x, SEND_DATA))
1397 .unwrap_or_else(|e| panic!("fidl error: {:?}", e));
1398
1399 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1401 panic!("did not receive response in time!")
1402 });
1403
1404 let ((), ()) = join!(receiver, sender);
1405 }
1406
1407 #[fasync::run_singlethreaded(test)]
1408 async fn client_with_response_receives_epitaph() {
1409 let (client_end, server_end) = zx::Channel::create();
1410 let client_end = AsyncChannel::from_channel(client_end);
1411 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1412
1413 let server = AsyncChannel::from_channel(server_end);
1414 let mut buffer = zx::MessageBufEtc::new();
1415 let receiver = async move {
1416 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1417 server
1418 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1419 .expect("failed to write epitaph");
1420 };
1421 let receiver = receiver
1423 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1424 panic!("did not receive message in time!")
1425 });
1426
1427 let sender = async move {
1428 const ORDINAL: u64 = 42 << 32;
1429 let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
1430 assert_matches!(
1431 result,
1432 Err(crate::Error::ClientChannelClosed {
1433 status: zx_status::Status::UNAVAILABLE,
1434 protocol_name: "test_protocol",
1435 epitaph: Some(epitaph),
1436 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1437 );
1438 };
1439 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1441 panic!("did not receive response in time!")
1442 });
1443
1444 let ((), ()) = join!(receiver, sender);
1445 }
1446
1447 #[fasync::run_singlethreaded(test)]
1448 #[should_panic]
1449 async fn event_cant_be_taken_twice() {
1450 let (client_end, _) = zx::Channel::create();
1451 let client_end = AsyncChannel::from_channel(client_end);
1452 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1453 let _foo = client.take_event_receiver();
1454 client.take_event_receiver();
1455 }
1456
1457 #[fasync::run_singlethreaded(test)]
1458 async fn event_can_be_taken_after_drop() {
1459 let (client_end, _) = zx::Channel::create();
1460 let client_end = AsyncChannel::from_channel(client_end);
1461 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1462 let foo = client.take_event_receiver();
1463 drop(foo);
1464 client.take_event_receiver();
1465 }
1466
1467 #[fasync::run_singlethreaded(test)]
1468 async fn receiver_termination_test() {
1469 let (client_end, _) = zx::Channel::create();
1470 let client_end = AsyncChannel::from_channel(client_end);
1471 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1472 let mut foo = client.take_event_receiver();
1473 assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
1474 let _ = foo.next().await;
1475 assert!(
1476 foo.is_terminated(),
1477 "receiver should report terminated after seeing channel is closed"
1478 );
1479 }
1480
1481 #[fasync::run_singlethreaded(test)]
1482 #[should_panic(expected = "polled EventReceiver after `None`")]
1483 async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
1484 let (client_end, _) = zx::Channel::create();
1485 let client_end = AsyncChannel::from_channel(client_end);
1486 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1487 let foo = client.take_event_receiver();
1488 drop(foo);
1489 let mut bar = client.take_event_receiver();
1490 assert!(bar.next().await.is_none(), "read on closed channel should return none");
1491 let _ = bar.next().await;
1493 }
1494
1495 #[fasync::run_singlethreaded(test)]
1496 #[should_panic(expected = "polled EventReceiver after `None`")]
1497 async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
1498 let (client_end, server_end) = zx::Channel::create();
1499 let client_end = AsyncChannel::from_channel(client_end);
1500 let server_end = AsyncChannel::from_channel(server_end);
1501 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1502 let mut stream = client.take_event_receiver();
1503
1504 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1505 .expect("wrote epitaph");
1506 drop(server_end);
1507
1508 assert_matches!(
1509 stream.next().await,
1510 Some(Err(crate::Error::ClientChannelClosed {
1511 status: zx_status::Status::UNAVAILABLE,
1512 protocol_name: "test_protocol",
1513 epitaph: Some(epitaph),
1514 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1515 );
1516 assert_matches!(stream.next().await, None);
1517 let _ = stream.next().await;
1519 }
1520
1521 #[fasync::run_singlethreaded(test)]
1522 async fn event_can_be_taken() {
1523 let (client_end, _) = zx::Channel::create();
1524 let client_end = AsyncChannel::from_channel(client_end);
1525 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1526 client.take_event_receiver();
1527 }
1528
1529 #[fasync::run_singlethreaded(test)]
1530 async fn event_received() {
1531 let (client_end, server_end) = zx::Channel::create();
1532 let client_end = AsyncChannel::from_channel(client_end);
1533 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1534
1535 let server = AsyncChannel::from_channel(server_end);
1537 let (bytes, handles) = (&mut vec![], &mut vec![]);
1538 const ORDINAL: u64 = 5;
1539 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1540 encode_transaction(header, bytes, handles);
1541 server.write_etc(bytes, handles).expect("Server channel write failed");
1542 drop(server);
1543
1544 let recv = client
1545 .take_event_receiver()
1546 .into_future()
1547 .then(|(x, stream)| {
1548 let x = x.expect("should contain one element");
1549 let x = x.expect("fidl error");
1550 let x: i32 =
1551 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1552 .expect("failed to decode event");
1553 assert_eq!(x, 55);
1554 stream.into_future()
1555 })
1556 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1557
1558 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1560 panic!("did not receive event in time!")
1561 });
1562
1563 recv.await;
1564 }
1565
1566 #[fasync::run_singlethreaded(test)]
1570 async fn receiver_can_be_taken_after_end_of_stream() {
1571 let (client_end, server_end) = zx::Channel::create();
1572 let client_end = AsyncChannel::from_channel(client_end);
1573 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1574
1575 let server = AsyncChannel::from_channel(server_end);
1577 let (bytes, handles) = (&mut vec![], &mut vec![]);
1578 const ORDINAL: u64 = 5;
1579 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1580 encode_transaction(header, bytes, handles);
1581 server.write_etc(bytes, handles).expect("Server channel write failed");
1582 drop(server);
1583
1584 {
1588 let recv = client
1589 .take_event_receiver()
1590 .into_future()
1591 .then(|(x, stream)| {
1592 let x = x.expect("should contain one element");
1593 let x = x.expect("fidl error");
1594 let x: i32 =
1595 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1596 .expect("failed to decode event");
1597 assert_eq!(x, 55);
1598 stream.into_future()
1599 })
1600 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1601
1602 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1604 panic!("did not receive event in time!")
1605 });
1606
1607 recv.await;
1608 }
1609
1610 let mut c = client.take_event_receiver();
1613 assert!(
1614 c.next().await.is_none(),
1615 "receiver on closed channel should return none on first call"
1616 );
1617 }
1618
1619 #[fasync::run_singlethreaded(test)]
1620 async fn event_incompatible_format() {
1621 let (client_end, server_end) = zx::Channel::create();
1622 let client_end = AsyncChannel::from_channel(client_end);
1623 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1624
1625 let server = AsyncChannel::from_channel(server_end);
1627 let (bytes, handles) = (&mut vec![], &mut vec![]);
1628 let header = TransactionHeader::new_full(
1629 0,
1630 5,
1631 crate::encoding::Context {
1632 wire_format_version: crate::encoding::WireFormatVersion::V2,
1633 },
1634 DynamicFlags::empty(),
1635 0,
1636 );
1637 encode_transaction(header, bytes, handles);
1638 server.write_etc(bytes, handles).expect("Server channel write failed");
1639 drop(server);
1640
1641 let mut event_receiver = client.take_event_receiver();
1642 let recv = event_receiver.next().map(|event| {
1643 assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
1644 });
1645
1646 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1648 panic!("did not receive event in time!")
1649 });
1650
1651 recv.await;
1652 }
1653
1654 #[test]
1655 fn client_always_wakes_pending_futures() {
1656 let mut executor = fasync::TestExecutor::new();
1657
1658 let (client_end, server_end) = zx::Channel::create();
1659 let client_end = AsyncChannel::from_channel(client_end);
1660 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1661
1662 let mut event_receiver = client.take_event_receiver();
1663
1664 let (response_waker, response_waker_count) = new_count_waker();
1666 let response_cx = &mut Context::from_waker(&response_waker);
1667 let mut response_txid = Txid(0);
1668 let mut response_future = client
1669 .send_raw_query(|tx_id, bytes, handles| {
1670 response_txid = tx_id;
1671 let header = TransactionHeader::new(
1672 response_txid.as_raw_id(),
1673 SEND_ORDINAL,
1674 DynamicFlags::empty(),
1675 );
1676 encode_transaction(header, bytes, handles);
1677 Ok(())
1678 })
1679 .expect("Couldn't send query");
1680 assert!(response_future.poll_unpin(response_cx).is_pending());
1681
1682 let (event_waker, event_waker_count) = new_count_waker();
1684 let event_cx = &mut Context::from_waker(&event_waker);
1685 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1686
1687 assert_eq!(response_waker_count.get(), 0);
1689 assert_eq!(event_waker_count.get(), 0);
1690
1691 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1693
1694 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1696
1697 assert_eq!(response_waker_count.get(), 0);
1699 assert_eq!(event_waker_count.get(), 1);
1700
1701 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1703
1704 send_transaction(
1706 TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
1707 &server_end,
1708 );
1709
1710 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1712
1713 assert_eq!(response_waker_count.get(), 1);
1715 }
1716
1717 #[test]
1718 fn client_always_wakes_pending_futures_on_epitaph() {
1719 let mut executor = fasync::TestExecutor::new();
1720
1721 let (client_end, server_end) = zx::Channel::create();
1722 let client_end = AsyncChannel::from_channel(client_end);
1723 let server_end = AsyncChannel::from_channel(server_end);
1724 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1725
1726 let mut event_receiver = client.take_event_receiver();
1727
1728 let (response1_waker, response1_waker_count) = new_count_waker();
1730 let response1_cx = &mut Context::from_waker(&response1_waker);
1731 let mut response1_future = client
1732 .send_raw_query(|tx_id, bytes, handles| {
1733 let header =
1734 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1735 encode_transaction(header, bytes, handles);
1736 Ok(())
1737 })
1738 .expect("Couldn't send query");
1739 assert!(response1_future.poll_unpin(response1_cx).is_pending());
1740
1741 let (event_waker, event_waker_count) = new_count_waker();
1743 let event_cx = &mut Context::from_waker(&event_waker);
1744 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1745
1746 let (response2_waker, response2_waker_count) = new_count_waker();
1748 let response2_cx = &mut Context::from_waker(&response2_waker);
1749 let mut response2_future = client
1750 .send_raw_query(|tx_id, bytes, handles| {
1751 let header =
1752 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1753 encode_transaction(header, bytes, handles);
1754 Ok(())
1755 })
1756 .expect("Couldn't send query");
1757 assert!(response2_future.poll_unpin(response2_cx).is_pending());
1758
1759 let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
1760
1761 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1763
1764 assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
1766
1767 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1769 .expect("wrote epitaph");
1770
1771 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1773
1774 for wake_count in &wakers {
1777 assert_eq!(wake_count.get(), 1);
1778 }
1779
1780 assert_matches!(
1782 response1_future.poll_unpin(response1_cx),
1783 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1784 status: zx_status::Status::UNAVAILABLE,
1785 protocol_name: "test_protocol",
1786 epitaph: Some(epitaph),
1787 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1788 );
1789
1790 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1792
1793 assert_matches!(
1795 response2_future.poll_unpin(response2_cx),
1796 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1797 status: zx_status::Status::UNAVAILABLE,
1798 protocol_name: "test_protocol",
1799 epitaph: Some(epitaph),
1800 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1801 );
1802
1803 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1805 }
1806
1807 #[fasync::run_singlethreaded(test)]
1808 async fn client_allows_take_event_stream_even_if_event_delivered() {
1809 let (client_end, server_end) = zx::Channel::create();
1810 let client_end = AsyncChannel::from_channel(client_end);
1811 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1812
1813 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1815
1816 let (response_waker, _response_waker_count) = new_count_waker();
1818 let response_cx = &mut Context::from_waker(&response_waker);
1819 let mut response_future =
1820 client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
1821 assert!(response_future.poll_unpin(response_cx).is_pending());
1822
1823 let mut _event_receiver = client.take_event_receiver();
1825 }
1826
1827 #[fasync::run_singlethreaded(test)]
1828 async fn client_reports_epitaph_from_all_read_actions() {
1829 #[derive(Debug, PartialEq)]
1830 enum Action {
1831 SendMsg, SendQuery, WaitQuery, RecvEvent, }
1836 impl Action {
1837 fn should_report_epitaph(&self) -> bool {
1838 match self {
1839 Action::SendMsg | Action::SendQuery => false,
1840 Action::WaitQuery | Action::RecvEvent => true,
1841 }
1842 }
1843 }
1844 use Action::*;
1845 for two_actions in &[
1848 [SendMsg, SendMsg],
1849 [SendMsg, SendQuery],
1850 [SendMsg, WaitQuery],
1851 [SendMsg, RecvEvent],
1852 [SendQuery, SendMsg],
1853 [SendQuery, SendQuery],
1854 [SendQuery, WaitQuery],
1855 [SendQuery, RecvEvent],
1856 [WaitQuery, SendMsg],
1857 [WaitQuery, SendQuery],
1858 [WaitQuery, WaitQuery],
1859 [WaitQuery, RecvEvent],
1860 [RecvEvent, SendMsg],
1861 [RecvEvent, SendQuery],
1862 [RecvEvent, WaitQuery],
1863 ] {
1866 let (client_end, server_end) = zx::Channel::create();
1867 let client_end = AsyncChannel::from_channel(client_end);
1868 let client = Client::new(client_end, "test_protocol");
1869
1870 let server_end = AsyncChannel::from_channel(server_end);
1872 server_end
1873 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1874 .expect("failed to write epitaph");
1875
1876 let mut event_receiver = client.take_event_receiver();
1877
1878 for (index, action) in two_actions.iter().enumerate() {
1880 let err = match action {
1881 SendMsg => {
1882 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
1883 }
1884 WaitQuery => client
1885 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1886 .await
1887 .err(),
1888 SendQuery => client
1889 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1890 .check()
1891 .err(),
1892 RecvEvent => event_receiver.next().await.unwrap().err(),
1893 };
1894 let details = format!("index: {index:?}, two_actions: {two_actions:?}");
1895 match err {
1896 None => assert!(
1897 !action.should_report_epitaph(),
1898 "expected epitaph, but succeeded.\n{details}"
1899 ),
1900 Some(crate::Error::ClientChannelClosed {
1901 status: zx_status::Status::UNAVAILABLE,
1902 protocol_name: "test_protocol",
1903 epitaph: Some(epitaph),
1904 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
1905 action.should_report_epitaph(),
1906 "got epitaph unexpectedly.\n{details}",
1907 ),
1908 Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
1909 }
1910 }
1911
1912 if two_actions.contains(&RecvEvent) {
1914 assert_matches!(event_receiver.next().await, None);
1915 }
1916 }
1917 }
1918
1919 #[test]
1920 fn client_query_result_check() {
1921 let mut executor = fasync::TestExecutor::new();
1922 let (client_end, server_end) = zx::Channel::create();
1923 let client_end = AsyncChannel::from_channel(client_end);
1924 let client = Client::new(client_end, "test_protocol");
1925
1926 let server = AsyncChannel::from_channel(server_end);
1927
1928 let active_fut =
1930 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1931
1932 let mut checked_fut = active_fut.check().expect("failed to check future");
1933
1934 let mut buffer = MessageBufEtc::new();
1936 executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
1937 let two_way_tx_id = 1u8;
1938 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1939
1940 let (bytes, handles) = (&mut vec![], &mut vec![]);
1941 let header =
1942 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1943 encode_transaction(header, bytes, handles);
1944 server.write_etc(bytes, handles).expect("Server channel write failed");
1945
1946 executor
1947 .run_singlethreaded(&mut checked_fut)
1948 .map(|x| assert_eq!(x, SEND_DATA))
1949 .unwrap_or_else(|e| panic!("fidl error: {:?}", e));
1950
1951 drop(server);
1953
1954 let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1955
1956 let mut checked_fut = query_fut.check().expect("failed to check future");
1958 assert_matches!(
1960 executor.run_singlethreaded(&mut checked_fut),
1961 Err(crate::Error::ClientChannelClosed {
1962 status: zx_status::Status::PEER_CLOSED,
1963 protocol_name: "test_protocol",
1964 epitaph: None,
1965 })
1966 );
1967 }
1968
1969 #[fasync::run_singlethreaded(test)]
1970 async fn client_into_channel() {
1971 let (client_end, _server_end) = zx::Channel::create();
1974 let client_end = AsyncChannel::from_channel(client_end);
1975 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1976
1977 assert!(client.into_channel().is_ok());
1978 }
1979
1980 #[fasync::run_singlethreaded(test)]
1981 async fn client_into_channel_outstanding_messages() {
1982 let (client_end, _server_end) = zx::Channel::create();
1985 let client_end = AsyncChannel::from_channel(client_end);
1986 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1987
1988 {
1989 let _sender =
1992 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1993 }
1994
1995 assert!(client.into_channel().is_err());
1996 }
1997
1998 #[fasync::run_singlethreaded(test)]
1999 async fn client_into_channel_active_clone() {
2000 let (client_end, _server_end) = zx::Channel::create();
2003 let client_end = AsyncChannel::from_channel(client_end);
2004 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2005
2006 let _cloned_client = client.clone();
2007
2008 assert!(client.into_channel().is_err());
2009 }
2010
2011 #[fasync::run_singlethreaded(test)]
2012 async fn client_into_channel_outstanding_messages_get_received() {
2013 let (client_end, server_end) = zx::Channel::create();
2014 let client_end = AsyncChannel::from_channel(client_end);
2015 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2016
2017 let server = AsyncChannel::from_channel(server_end);
2018 let mut buffer = MessageBufEtc::new();
2019 let receiver = async move {
2020 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2021 let two_way_tx_id = 1u8;
2022 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2023
2024 let (bytes, handles) = (&mut vec![], &mut vec![]);
2025 let header =
2026 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2027 encode_transaction(header, bytes, handles);
2028 server.write_etc(bytes, handles).expect("Server channel write failed");
2029 };
2030
2031 let receiver = receiver
2033 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2034 panic!("did not receiver message in time!")
2035 });
2036
2037 let sender = client
2038 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2039 .map_ok(|x| assert_eq!(x, SEND_DATA))
2040 .unwrap_or_else(|e| panic!("fidl error: {:?}", e));
2041
2042 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2044 panic!("did not receive response in time!")
2045 });
2046
2047 let ((), ()) = join!(receiver, sender);
2048
2049 assert!(client.into_channel().is_ok());
2050 }
2051
2052 #[fasync::run_singlethreaded(test)]
2053 async fn client_decode_errors_are_broadcast() {
2054 let (client_end, server_end) = zx::Channel::create();
2055 let client_end = AsyncChannel::from_channel(client_end);
2056 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2057
2058 let server = AsyncChannel::from_channel(server_end);
2059
2060 let _server = fasync::Task::spawn(async move {
2061 let mut buffer = MessageBufEtc::new();
2062 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2063 let two_way_tx_id = 1u8;
2064 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2065
2066 let (bytes, handles) = (&mut vec![], &mut vec![]);
2067 let header =
2068 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2069 encode_transaction(header, bytes, handles);
2070 bytes[4] = 0;
2072 server.write_etc(bytes, handles).expect("Server channel write failed");
2073
2074 pending::<()>().await;
2076 });
2077
2078 let futures = FuturesUnordered::new();
2079
2080 for _ in 0..4 {
2081 futures.push(async {
2082 assert_matches!(
2083 client
2084 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2085 .map_ok(|x| assert_eq!(x, SEND_DATA))
2086 .await,
2087 Err(crate::Error::UnsupportedWireFormatVersion)
2088 );
2089 });
2090 }
2091
2092 futures
2093 .collect::<Vec<_>>()
2094 .on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
2095 .await;
2096 }
2097
2098 #[fasync::run_singlethreaded(test)]
2099 async fn into_channel_from_waker_succeeds() {
2100 let (client_end, server_end) = zx::Channel::create();
2101 let client_end = AsyncChannel::from_channel(client_end);
2102 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2103
2104 let server = AsyncChannel::from_channel(server_end);
2105 let mut buffer = MessageBufEtc::new();
2106 let receiver = async move {
2107 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2108 let two_way_tx_id = 1u8;
2109 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2110
2111 let (bytes, handles) = (&mut vec![], &mut vec![]);
2112 let header =
2113 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2114 encode_transaction(header, bytes, handles);
2115 server.write_etc(bytes, handles).expect("Server channel write failed");
2116 };
2117
2118 struct Sender {
2119 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
2120 }
2121
2122 let (done_tx, done_rx) = oneshot::channel();
2123
2124 let sender = Arc::new(Sender {
2125 future: Mutex::new(Box::pin(async move {
2126 client
2127 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2128 .map_ok(|x| assert_eq!(x, SEND_DATA))
2129 .unwrap_or_else(|e| panic!("fidl error: {:?}", e))
2130 .await;
2131
2132 assert!(client.into_channel().is_ok());
2133
2134 let _ = done_tx.send(());
2135 })),
2136 });
2137
2138 impl ArcWake for Sender {
2143 fn wake_by_ref(arc_self: &Arc<Self>) {
2144 assert!(arc_self
2145 .future
2146 .lock()
2147 .poll_unpin(&mut Context::from_waker(&noop_waker()))
2148 .is_ready());
2149 }
2150 }
2151
2152 let waker = waker(sender.clone());
2153
2154 assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
2155
2156 receiver.await;
2157
2158 done_rx.await.unwrap();
2159 }
2160}