1use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_async as _;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::Stream as StreamTrait;
12use std::collections::{HashMap, VecDeque};
13use std::convert::Infallible;
14use std::future::Future;
15use std::num::NonZeroU32;
16use std::pin::Pin;
17use std::sync::{Arc, LazyLock, Weak};
18use std::task::{Context, Poll, Waker, ready};
19
20mod channel;
21mod event;
22mod event_pair;
23mod handle;
24mod responder;
25mod socket;
26
27#[cfg(test)]
28mod test;
29
30pub mod fidl;
31pub mod fidl_next;
32
33use responder::Responder;
34
35pub use channel::{
36 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
37};
38pub use event::Event;
39pub use event_pair::Eventpair as EventPair;
40pub use handle::unowned::Unowned;
41pub use handle::{
42 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
43};
44pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
45pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
46
47#[rustfmt::skip]
49pub use Handle as Clock;
50#[rustfmt::skip]
51pub use Handle as Exception;
52#[rustfmt::skip]
53pub use Handle as Fifo;
54#[rustfmt::skip]
55pub use Handle as Iob;
56#[rustfmt::skip]
57pub use Handle as Job;
58#[rustfmt::skip]
59pub use Handle as Process;
60#[rustfmt::skip]
61pub use Handle as Resource;
62#[rustfmt::skip]
63pub use Handle as Stream;
64#[rustfmt::skip]
65pub use Handle as Thread;
66#[rustfmt::skip]
67pub use Handle as Vmar;
68#[rustfmt::skip]
69pub use Handle as Vmo;
70#[rustfmt::skip]
71pub use Handle as Counter;
72
73use proto::f_domain_ordinals as ordinals;
74
75fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match error {
77 FDomainError::TargetError(e) => {
78 let e = zx_status::Status::from_raw(*e);
79 write!(f, "Target-side error {e}")
80 }
81 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
82 write!(f, "Tried to use invalid handle id {id}")
83 }
84 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
85 f,
86 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
87 ),
88 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
89 write!(f, "Handle is occupied delivering streaming reads")
90 }
91 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
92 write!(f, "No streaming read was in progress")
93 }
94 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
95 write!(
96 f,
97 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
98 )
99 }
100 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
101 if *same_call {
102 write!(f, "Tried to create two or more new handles with the same id {id}")
103 } else {
104 write!(
105 f,
106 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
107 )
108 }
109 }
110 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
111 write!(f, "Tried to write a channel into itself")
112 }
113 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
114 write!(f, "Handle closed while being read")
115 }
116 _ => todo!(),
117 }
118}
119
120pub type Result<T, E = Error> = std::result::Result<T, E>;
122
123#[derive(Clone)]
125pub enum Error {
126 SocketWrite(WriteSocketError),
127 ChannelWrite(WriteChannelError),
128 FDomain(FDomainError),
129 Protocol(::fidl::Error),
130 ProtocolObjectTypeIncompatible,
131 ProtocolRightsIncompatible,
132 ProtocolSignalsIncompatible,
133 ProtocolStreamEventIncompatible,
134 Transport(Option<Arc<std::io::Error>>),
135 ConnectionMismatch,
136 StreamingAborted,
137}
138
139impl std::fmt::Display for Error {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
143 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
144 write_fdomain_error(error, f)
145 }
146 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
147 write!(f, "While writing channel: ")?;
148 write_fdomain_error(error, f)
149 }
150 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
151 write!(f, "Couldn't write all handles into a channel:")?;
152 for (pos, error) in
153 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
154 {
155 write!(f, "\n Handle in position {pos}: ")?;
156 write_fdomain_error(error, f)?;
157 }
158 Ok(())
159 }
160 Self::ProtocolObjectTypeIncompatible => {
161 write!(
162 f,
163 "The FDomain protocol received an unrecognized or incompatible object type"
164 )
165 }
166 Self::ProtocolRightsIncompatible => {
167 write!(
168 f,
169 "The FDomain protocol received unrecognized or incompatible handle rights"
170 )
171 }
172 Self::ProtocolSignalsIncompatible => {
173 write!(f, "The FDomain protocol received unrecognized or incompatible signals")
174 }
175 Self::ProtocolStreamEventIncompatible => {
176 write!(
177 f,
178 "The FDomain protocol received an unrecognized or incompatible streaming IO event"
179 )
180 }
181 Self::FDomain(e) => write_fdomain_error(e, f),
182 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
183 Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
184 Self::Transport(None) => {
185 write!(f, "Transport error: Connection to the device has been lost")
186 }
187 Self::ConnectionMismatch => {
188 write!(
189 f,
190 "Tried to use an FDomain handle with a different connection than the one it was created on"
191 )
192 }
193 Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
194 }
195 }
196}
197
198impl std::fmt::Debug for Error {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 match self {
201 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
202 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
203 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
204 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
205 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
206 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
207 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
208 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
209 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
210 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
211 Self::StreamingAborted => write!(f, "StreamingAborted"),
212 }
213 }
214}
215
216impl std::error::Error for Error {}
217
218impl From<FDomainError> for Error {
219 fn from(other: FDomainError) -> Self {
220 Self::FDomain(other)
221 }
222}
223
224impl From<::fidl::Error> for Error {
225 fn from(other: ::fidl::Error) -> Self {
226 Self::Protocol(other)
227 }
228}
229
230impl From<WriteSocketError> for Error {
231 fn from(other: WriteSocketError) -> Self {
232 Self::SocketWrite(other)
233 }
234}
235
236impl From<WriteChannelError> for Error {
237 fn from(other: WriteChannelError) -> Self {
238 Self::ChannelWrite(other)
239 }
240}
241
242#[derive(Clone)]
246enum InnerError {
247 Protocol(::fidl::Error),
248 ProtocolStreamEventIncompatible,
249 Transport(Option<Arc<std::io::Error>>),
250}
251
252impl From<InnerError> for Error {
253 fn from(other: InnerError) -> Self {
254 match other {
255 InnerError::Protocol(p) => Error::Protocol(p),
256 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
257 InnerError::Transport(t) => Error::Transport(t),
258 }
259 }
260}
261
262impl From<::fidl::Error> for InnerError {
263 fn from(other: ::fidl::Error) -> Self {
264 InnerError::Protocol(other)
265 }
266}
267
268pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
278 fn poll_send_message(
281 self: Pin<&mut Self>,
282 msg: &[u8],
283 ctx: &mut Context<'_>,
284 ) -> Poll<Result<(), Option<std::io::Error>>>;
285
286 fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 Ok(())
289 }
290
291 fn has_debug_fmt(&self) -> bool {
293 false
294 }
295}
296
297enum Transport {
303 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
304 Error(InnerError),
305}
306
307impl Transport {
308 fn error(&self) -> Option<InnerError> {
310 match self {
311 Transport::Transport(_, _, _) => None,
312 Transport::Error(inner_error) => Some(inner_error.clone()),
313 }
314 }
315
316 fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
318 match self {
319 Transport::Transport(_, v, w) => {
320 v.push_back(msg);
321 w.drain(..).for_each(Waker::wake);
322 Ok(())
323 }
324 Transport::Error(e) => Err(e.clone()),
325 }
326 }
327
328 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
330 match self {
331 Transport::Error(e) => Poll::Ready(e.clone()),
332 Transport::Transport(t, v, w) => {
333 while let Some(msg) = v.front() {
334 match t.as_mut().poll_send_message(msg, ctx) {
335 Poll::Ready(Ok(())) => {
336 v.pop_front();
337 }
338 Poll::Ready(Err(e)) => {
339 let e = e.map(Arc::new);
340 return Poll::Ready(InnerError::Transport(e));
341 }
342 Poll::Pending => return Poll::Pending,
343 }
344 }
345
346 if v.is_empty() {
347 w.push(ctx.waker().clone());
348 } else {
349 ctx.waker().wake_by_ref();
350 }
351 Poll::Pending
352 }
353 }
354 }
355
356 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
358 match self {
359 Transport::Error(e) => Poll::Ready(Err(e.clone())),
360 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
361 Some(Ok(x)) => Poll::Ready(Ok(x)),
362 Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
363 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
364 },
365 }
366 }
367}
368
369impl Drop for Transport {
370 fn drop(&mut self) {
371 if let Transport::Transport(_, _, wakers) = self {
372 wakers.drain(..).for_each(Waker::wake);
373 }
374 }
375}
376
377struct SocketReadState {
379 wakers: Vec<Waker>,
380 queued: VecDeque<Result<proto::SocketData, Error>>,
381 read_request_pending: bool,
382 is_streaming: bool,
383}
384
385impl SocketReadState {
386 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
389 self.queued.push_back(msg);
390 std::mem::replace(&mut self.wakers, Vec::new())
391 }
392}
393
394struct ChannelReadState {
396 wakers: Vec<Waker>,
397 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
398 read_request_pending: bool,
399 is_streaming: bool,
400}
401
402impl ChannelReadState {
403 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
406 self.queued.push_back(msg);
407 std::mem::replace(&mut self.wakers, Vec::new())
408 }
409}
410
411struct ClientInner {
413 transport: Transport,
414 transactions: HashMap<NonZeroU32, responder::Responder>,
415 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
416 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
417 next_tx_id: u32,
418 waiting_to_close: Vec<proto::HandleId>,
419 waiting_to_close_waker: Waker,
420
421 wakers_to_wake: Vec<Waker>,
427}
428
429impl ClientInner {
430 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
432 if ordinal != ordinals::CLOSE {
433 self.process_waiting_to_close();
434 }
435 let tx_id = self.next_tx_id;
436
437 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
438 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
439 self.next_tx_id += 1;
440 if let Err(e) = self.transport.push_msg(msg.into()) {
441 let _ = responder.handle(self, Err(e.into()));
442 } else {
443 assert!(
444 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
445 "Allocated same tx id twice!"
446 );
447 }
448 }
449
450 fn process_waiting_to_close(&mut self) {
451 if !self.waiting_to_close.is_empty() {
452 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
453 for handle in &handles {
456 let _ = self.channel_read_states.remove(handle);
457 let _ = self.socket_read_states.remove(handle);
458 }
459 self.request(
460 ordinals::CLOSE,
461 proto::FDomainCloseRequest { handles },
462 Responder::Ignore,
463 );
464 }
465 }
466
467 fn try_poll_transport(
470 &mut self,
471 ctx: &mut Context<'_>,
472 ) -> Poll<Result<Infallible, InnerError>> {
473 self.process_waiting_to_close();
474
475 self.waiting_to_close_waker = ctx.waker().clone();
476
477 loop {
478 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
479 return Poll::Ready(Err(e));
480 }
481 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
482 return Poll::Pending;
483 };
484 let data = result?;
485 let (header, data) = fidl_message::decode_transaction_header(&data)?;
486
487 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
488 let wakers = self.process_event(header, data)?;
489 self.wakers_to_wake.extend(wakers);
490 continue;
491 };
492
493 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
494 tx.handle(self, Ok((header, data)))?;
495 }
496 }
497
498 fn process_event(
500 &mut self,
501 header: TransactionHeader,
502 data: &[u8],
503 ) -> Result<Vec<Waker>, InnerError> {
504 match header.ordinal {
505 ordinals::ON_SOCKET_STREAMING_DATA => {
506 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
507 header, data,
508 )?;
509 let o =
510 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
511 wakers: Vec::new(),
512 queued: VecDeque::new(),
513 is_streaming: false,
514 read_request_pending: false,
515 });
516 match msg.socket_message {
517 proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
518 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
519 let ret = if let Some(error) = error {
520 o.handle_incoming_message(Err(Error::FDomain(*error)))
521 } else {
522 Vec::new()
523 };
524 o.is_streaming = false;
525 Ok(ret)
526 }
527 _ => Err(InnerError::ProtocolStreamEventIncompatible),
528 }
529 }
530 ordinals::ON_CHANNEL_STREAMING_DATA => {
531 let msg = fidl_message::decode_message::<
532 proto::ChannelOnChannelStreamingDataRequest,
533 >(header, data)?;
534 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
535 ChannelReadState {
536 wakers: Vec::new(),
537 queued: VecDeque::new(),
538 is_streaming: false,
539 read_request_pending: false,
540 }
541 });
542 match msg.channel_sent {
543 proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
544 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
545 let ret = if let Some(error) = error {
546 o.handle_incoming_message(Err(Error::FDomain(*error)))
547 } else {
548 Vec::new()
549 };
550 o.is_streaming = false;
551 Ok(ret)
552 }
553 _ => Err(InnerError::ProtocolStreamEventIncompatible),
554 }
555 }
556 _ => Err(::fidl::Error::UnknownOrdinal {
557 ordinal: header.ordinal,
558 protocol_name:
559 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
560 }
561 .into()),
562 }
563 }
564
565 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
569 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
570 for (_, v) in std::mem::take(&mut self.transactions) {
571 let _ = v.handle(self, Err(e.clone()));
572 }
573 for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
574 state.queued.push_back(Err(Error::from(e.clone())));
575 self.wakers_to_wake.extend(state.wakers);
576 }
577 for (_, mut state) in self.channel_read_states.drain() {
578 state.queued.push_back(Err(Error::from(e.clone())));
579 self.wakers_to_wake.extend(state.wakers);
580 }
581 if matches!(self.transport, Transport::Transport(_, _, _)) {
582 self.transport = Transport::Error(e);
583 }
584
585 Poll::Ready(())
586 } else {
587 Poll::Pending
588 }
589 }
590
591 pub(crate) fn handle_socket_read_response(
593 &mut self,
594 msg: Result<proto::SocketData, Error>,
595 id: proto::HandleId,
596 ) {
597 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
598 wakers: Vec::new(),
599 queued: VecDeque::new(),
600 is_streaming: false,
601 read_request_pending: false,
602 });
603 let wakers = state.handle_incoming_message(msg);
604 self.wakers_to_wake.extend(wakers);
605 state.read_request_pending = false;
606 }
607
608 pub(crate) fn handle_channel_read_response(
610 &mut self,
611 msg: Result<proto::ChannelMessage, Error>,
612 id: proto::HandleId,
613 ) {
614 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
615 wakers: Vec::new(),
616 queued: VecDeque::new(),
617 is_streaming: false,
618 read_request_pending: false,
619 });
620 let wakers = state.handle_incoming_message(msg);
621 self.wakers_to_wake.extend(wakers);
622 state.read_request_pending = false;
623 }
624}
625
626impl Drop for ClientInner {
627 fn drop(&mut self) {
628 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
629 for responder in responders {
630 let _ = responder.handle(self, Err(InnerError::Transport(None)));
631 }
632 for state in self.channel_read_states.values_mut() {
633 state.wakers.drain(..).for_each(Waker::wake);
634 }
635 for state in self.socket_read_states.values_mut() {
636 state.wakers.drain(..).for_each(Waker::wake);
637 }
638 self.waiting_to_close_waker.wake_by_ref();
639 self.wakers_to_wake.drain(..).for_each(Waker::wake);
640 }
641}
642
643pub struct Client(pub(crate) Mutex<ClientInner>);
650
651impl std::fmt::Debug for Client {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 let inner = self.0.lock();
654 match &inner.transport {
655 Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
656 write!(f, "Client(")?;
657 transport.debug_fmt(f)?;
658 write!(f, ")")
659 }
660 Transport::Error(error) => {
661 let error = Error::from(error.clone());
662 write!(f, "Client(Failed: {error})")
663 }
664 _ => f.debug_tuple("Client").field(&"<transport>").finish(),
665 }
666 }
667}
668
669pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
673 Arc::new(Client(Mutex::new(ClientInner {
674 transport: Transport::Error(InnerError::Transport(None)),
675 transactions: HashMap::new(),
676 channel_read_states: HashMap::new(),
677 socket_read_states: HashMap::new(),
678 next_tx_id: 1,
679 waiting_to_close: Vec::new(),
680 waiting_to_close_waker: std::task::Waker::noop().clone(),
681 wakers_to_wake: Vec::new(),
682 })))
683});
684
685pub struct ClientLoop {
691 client: Weak<Client>,
692 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
693}
694
695impl Future for ClientLoop {
696 type Output = ();
697 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
698 self.fut.as_mut().poll(cx)
699 }
700}
701
702impl Drop for ClientLoop {
703 fn drop(&mut self) {
704 let Some(client) = self.client.upgrade() else {
705 return;
706 };
707
708 let (channel_read_states, socket_read_states, deferred_wakers) = {
709 let mut inner = client.0.lock();
710 let transactions = std::mem::take(&mut inner.transactions);
711 log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
712 for (_, v) in transactions {
713 let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
714 }
715
716 let channel_read_states = std::mem::take(&mut inner.channel_read_states);
717 let socket_read_states = std::mem::take(&mut inner.socket_read_states);
718
719 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
720
721 (channel_read_states, socket_read_states, deferred_wakers)
722 };
723
724 log::debug!("Failing reads on {} channels", channel_read_states.len());
725 for (_, mut state) in channel_read_states {
726 state.queued.push_back(Err(Error::Transport(None)));
727 state.wakers.into_iter().for_each(Waker::wake);
728 }
729
730 log::debug!("Failing reads on {} sockets", socket_read_states.len());
731 for (_, mut state) in socket_read_states {
732 state.queued.push_back(Err(Error::Transport(None)));
733 state.wakers.into_iter().for_each(Waker::wake);
734 }
735
736 deferred_wakers.into_iter().for_each(Waker::wake);
737 }
738}
739
740impl Client {
741 pub fn transport_status(&self) -> Result<()> {
742 match &self.0.lock().transport {
743 Transport::Error(e) => Err(e.clone().into()),
744 Transport::Transport(_, _, _) => Ok(()),
745 }
746 }
747 pub fn new(
754 transport: impl FDomainTransport + 'static,
755 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
756 let ret = Arc::new(Client(Mutex::new(ClientInner {
757 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
758 transactions: HashMap::new(),
759 socket_read_states: HashMap::new(),
760 channel_read_states: HashMap::new(),
761 next_tx_id: 1,
762 waiting_to_close: Vec::new(),
763 waiting_to_close_waker: std::task::Waker::noop().clone(),
764 wakers_to_wake: Vec::new(),
765 })));
766
767 let client_weak = Arc::downgrade(&ret);
768 let fut = futures::future::poll_fn(move |ctx| {
769 let Some(client) = client_weak.upgrade() else {
770 return Poll::Ready(());
771 };
772
773 let (ret, deferred_wakers) = {
774 let mut inner = client.0.lock();
775 let ret = inner.poll_transport(ctx);
776 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
777 (ret, deferred_wakers)
778 };
779 deferred_wakers.into_iter().for_each(Waker::wake);
780 ret
781 });
782
783 let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
784
785 (ret, client_loop)
786 }
787
788 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
790 let new_handle = self.new_hid();
791 self.transaction(
792 ordinals::GET_NAMESPACE,
793 proto::FDomainGetNamespaceRequest { new_handle },
794 Responder::Namespace,
795 )
796 .await?;
797 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
798 }
799
800 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
802 let id_a = self.new_hid();
803 let id_b = self.new_hid();
804 let fut = self.transaction(
805 ordinals::CREATE_CHANNEL,
806 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
807 Responder::CreateChannel,
808 );
809
810 fuchsia_async::Task::spawn(async move {
811 if let Err(e) = fut.await {
812 log::debug!("FDomain channel creation failed: {e}");
813 }
814 })
815 .detach();
816
817 (
818 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
819 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
820 )
821 }
822
823 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
825 self: &Arc<Self>,
826 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
827 let (client, server) = self.create_channel();
828 let client_end = crate::fidl::ClientEnd::<F>::new(client);
829 let server_end = crate::fidl::ServerEnd::new(server);
830 (client_end, server_end)
831 }
832
833 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
835 self: &Arc<Self>,
836 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
837 let (client_end, server_end) = self.create_endpoints::<F>();
838 (client_end.into_proxy(), server_end)
839 }
840
841 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
843 self: &Arc<Self>,
844 ) -> (F::Proxy, F::RequestStream) {
845 let (client_end, server_end) = self.create_endpoints::<F>();
846 (client_end.into_proxy(), server_end.into_stream())
847 }
848
849 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
851 self: &Arc<Self>,
852 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
853 let (client_end, server_end) = self.create_endpoints::<F>();
854 (client_end, server_end.into_stream())
855 }
856
857 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
859 let id_a = self.new_hid();
860 let id_b = self.new_hid();
861 let fut = self.transaction(
862 ordinals::CREATE_SOCKET,
863 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
864 Responder::CreateSocket,
865 );
866
867 fuchsia_async::Task::spawn(async move {
868 if let Err(e) = fut.await {
869 log::debug!("FDomain socket creation failed: {e}");
870 }
871 })
872 .detach();
873
874 (
875 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
876 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
877 )
878 }
879
880 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
882 self.create_socket(proto::SocketType::Stream)
883 }
884
885 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
887 self.create_socket(proto::SocketType::Datagram)
888 }
889
890 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
892 let id_a = self.new_hid();
893 let id_b = self.new_hid();
894 let fut = self.transaction(
895 ordinals::CREATE_EVENT_PAIR,
896 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
897 Responder::CreateEventPair,
898 );
899
900 fuchsia_async::Task::spawn(async move {
901 if let Err(e) = fut.await {
902 log::debug!("FDomain event pair creation failed: {e}");
903 }
904 })
905 .detach();
906
907 (
908 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
909 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
910 )
911 }
912
913 pub fn create_event(self: &Arc<Self>) -> Event {
915 let id = self.new_hid();
916 let fut = self.transaction(
917 ordinals::CREATE_EVENT,
918 proto::EventCreateEventRequest { handle: id },
919 Responder::CreateEvent,
920 );
921
922 fuchsia_async::Task::spawn(async move {
923 if let Err(e) = fut.await {
924 log::debug!("FDomain event creation failed: {e}");
925 }
926 })
927 .detach();
928
929 Event(Handle { id: id.id, client: Arc::downgrade(self) })
930 }
931
932 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
934 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
939 }
940
941 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
947 self: &Arc<Self>,
948 ordinal: u64,
949 request: S,
950 f: F,
951 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
952 where
953 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
954 {
955 let mut inner = self.0.lock();
956
957 let (sender, receiver) = futures::channel::oneshot::channel();
958 inner.request(ordinal, request, f(sender));
959 receiver.map(|x| x.expect("Oneshot went away without reply!"))
960 }
961
962 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
964 let mut inner = self.0.lock();
965 if let Some(e) = inner.transport.error() {
966 return Err(e.into());
967 }
968
969 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
970 wakers: Vec::new(),
971 queued: VecDeque::new(),
972 is_streaming: false,
973 read_request_pending: false,
974 });
975
976 assert!(!state.is_streaming, "Initiated streaming twice!");
977 state.is_streaming = true;
978
979 inner.request(
980 ordinals::READ_SOCKET_STREAMING_START,
981 proto::SocketReadSocketStreamingStartRequest { handle: id },
982 Responder::Ignore,
983 );
984 Ok(())
985 }
986
987 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
991 let mut inner = self.0.lock();
992 if let Some(state) = inner.socket_read_states.get_mut(&id) {
993 if state.is_streaming {
994 state.is_streaming = false;
995 let _ = inner.request(
997 ordinals::READ_SOCKET_STREAMING_STOP,
998 proto::ChannelReadChannelStreamingStopRequest { handle: id },
999 Responder::Ignore,
1000 );
1001 }
1002 }
1003 }
1004
1005 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1007 let mut inner = self.0.lock();
1008 if let Some(e) = inner.transport.error() {
1009 return Err(e.into());
1010 }
1011 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1012 wakers: Vec::new(),
1013 queued: VecDeque::new(),
1014 is_streaming: false,
1015 read_request_pending: false,
1016 });
1017
1018 assert!(!state.is_streaming, "Initiated streaming twice!");
1019 state.is_streaming = true;
1020
1021 inner.request(
1022 ordinals::READ_CHANNEL_STREAMING_START,
1023 proto::ChannelReadChannelStreamingStartRequest { handle: id },
1024 Responder::Ignore,
1025 );
1026
1027 Ok(())
1028 }
1029
1030 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1034 let mut inner = self.0.lock();
1035 if let Some(state) = inner.channel_read_states.get_mut(&id) {
1036 if state.is_streaming {
1037 state.is_streaming = false;
1038 let _ = inner.request(
1040 ordinals::READ_CHANNEL_STREAMING_STOP,
1041 proto::ChannelReadChannelStreamingStopRequest { handle: id },
1042 Responder::Ignore,
1043 );
1044 }
1045 }
1046 }
1047
1048 pub(crate) fn poll_socket(
1050 &self,
1051 id: proto::HandleId,
1052 ctx: &mut Context<'_>,
1053 out: &mut [u8],
1054 ) -> Poll<Result<usize, Error>> {
1055 let mut inner = self.0.lock();
1056 if let Some(error) = inner.transport.error() {
1057 return Poll::Ready(Err(error.into()));
1058 }
1059
1060 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1061 wakers: Vec::new(),
1062 queued: VecDeque::new(),
1063 is_streaming: false,
1064 read_request_pending: false,
1065 });
1066
1067 if let Some(got) = state.queued.front_mut() {
1068 match got.as_mut() {
1069 Ok(data) => {
1070 let read_size = std::cmp::min(data.data.len(), out.len());
1071 out[..read_size].copy_from_slice(&data.data[..read_size]);
1072
1073 if data.data.len() > read_size && !data.is_datagram {
1074 let _ = data.data.drain(..read_size);
1075 } else {
1076 let _ = state.queued.pop_front();
1077 }
1078
1079 return Poll::Ready(Ok(read_size));
1080 }
1081 Err(_) => {
1082 let err = state.queued.pop_front().unwrap().unwrap_err();
1083 return Poll::Ready(Err(err));
1084 }
1085 }
1086 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1087 state.wakers.push(ctx.waker().clone());
1088 }
1089
1090 if !state.read_request_pending && !state.is_streaming {
1091 inner.request(
1092 ordinals::READ_SOCKET,
1093 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1094 Responder::ReadSocket(id),
1095 );
1096 }
1097
1098 Poll::Pending
1099 }
1100
1101 pub(crate) fn poll_channel(
1103 &self,
1104 id: proto::HandleId,
1105 ctx: &mut Context<'_>,
1106 for_stream: bool,
1107 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1108 let mut inner = self.0.lock();
1109 if let Some(error) = inner.transport.error() {
1110 return Poll::Ready(Some(Err(error.into())));
1111 }
1112
1113 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1114 wakers: Vec::new(),
1115 queued: VecDeque::new(),
1116 is_streaming: false,
1117 read_request_pending: false,
1118 });
1119
1120 if let Some(got) = state.queued.pop_front() {
1121 return Poll::Ready(Some(got));
1122 } else if for_stream && !state.is_streaming {
1123 return Poll::Ready(None);
1124 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1125 state.wakers.push(ctx.waker().clone());
1126 }
1127
1128 if !state.read_request_pending && !state.is_streaming {
1129 inner.request(
1130 ordinals::READ_CHANNEL,
1131 proto::ChannelReadChannelRequest { handle: id },
1132 Responder::ReadChannel(id),
1133 );
1134 }
1135
1136 Poll::Pending
1137 }
1138
1139 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1141 let inner = self.0.lock();
1142 let Some(state) = inner.channel_read_states.get(&id) else {
1143 return false;
1144 };
1145 state.is_streaming
1146 }
1147
1148 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1151 let inner = self.0.lock();
1152 match handles {
1153 proto::Handles::Handles(handles) => {
1154 for handle in handles {
1155 assert!(
1156 !(inner.channel_read_states.contains_key(handle)
1157 || inner.socket_read_states.contains_key(handle)),
1158 "Tried to transfer handle after reading"
1159 );
1160 }
1161 }
1162 proto::Handles::Dispositions(dispositions) => {
1163 for disposition in dispositions {
1164 match &disposition.handle {
1165 proto::HandleOp::Move_(handle) => assert!(
1166 !(inner.channel_read_states.contains_key(handle)
1167 || inner.socket_read_states.contains_key(handle)),
1168 "Tried to transfer handle after reading"
1169 ),
1170 proto::HandleOp::Duplicate(_) => (),
1172 }
1173 }
1174 }
1175 }
1176 }
1177}