1use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_sync::Mutex;
8use futures::FutureExt;
9use futures::channel::oneshot::Sender as OneshotSender;
10use futures::stream::Stream as StreamTrait;
11use std::collections::{HashMap, VecDeque};
12use std::convert::Infallible;
13use std::future::Future;
14use std::num::NonZeroU32;
15use std::pin::Pin;
16use std::sync::{Arc, LazyLock, Weak};
17use std::task::{Context, Poll, Waker, ready};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Exception;
51#[rustfmt::skip]
52pub use Handle as Fifo;
53#[rustfmt::skip]
54pub use Handle as Iob;
55#[rustfmt::skip]
56pub use Handle as Job;
57#[rustfmt::skip]
58pub use Handle as Process;
59#[rustfmt::skip]
60pub use Handle as Resource;
61#[rustfmt::skip]
62pub use Handle as Stream;
63#[rustfmt::skip]
64pub use Handle as Thread;
65#[rustfmt::skip]
66pub use Handle as Vmar;
67#[rustfmt::skip]
68pub use Handle as Vmo;
69#[rustfmt::skip]
70pub use Handle as Counter;
71
72use proto::f_domain_ordinals as ordinals;
73
74fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match error {
76 FDomainError::TargetError(e) => {
77 let e = zx_status::Status::from_raw(*e);
78 write!(f, "Target-side error {e}")
79 }
80 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
81 write!(f, "Tried to use invalid handle id {id}")
82 }
83 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
84 f,
85 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
86 ),
87 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
88 write!(f, "Handle is occupied delivering streaming reads")
89 }
90 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
91 write!(f, "No streaming read was in progress")
92 }
93 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
94 write!(
95 f,
96 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
97 )
98 }
99 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
100 if *same_call {
101 write!(f, "Tried to create two or more new handles with the same id {id}")
102 } else {
103 write!(
104 f,
105 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
106 )
107 }
108 }
109 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
110 write!(f, "Tried to write a channel into itself")
111 }
112 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
113 write!(f, "Handle closed while being read")
114 }
115 _ => todo!(),
116 }
117}
118
119pub type Result<T, E = Error> = std::result::Result<T, E>;
121
122#[derive(Clone)]
124pub enum Error {
125 SocketWrite(WriteSocketError),
126 ChannelWrite(WriteChannelError),
127 FDomain(FDomainError),
128 Protocol(::fidl::Error),
129 ProtocolObjectTypeIncompatible,
130 ProtocolRightsIncompatible,
131 ProtocolSignalsIncompatible,
132 ProtocolStreamEventIncompatible,
133 Transport(Option<Arc<std::io::Error>>),
134 ConnectionMismatch,
135 StreamingAborted,
136}
137
138impl std::fmt::Display for Error {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 match self {
141 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
142 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
143 write_fdomain_error(error, f)
144 }
145 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
146 write!(f, "While writing channel: ")?;
147 write_fdomain_error(error, f)
148 }
149 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
150 write!(f, "Couldn't write all handles into a channel:")?;
151 for (pos, error) in
152 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
153 {
154 write!(f, "\n Handle in position {pos}: ")?;
155 write_fdomain_error(error, f)?;
156 }
157 Ok(())
158 }
159 Self::ProtocolObjectTypeIncompatible => {
160 write!(
161 f,
162 "The FDomain protocol received an unrecognized or incompatible object type"
163 )
164 }
165 Self::ProtocolRightsIncompatible => {
166 write!(
167 f,
168 "The FDomain protocol received unrecognized or incompatible handle rights"
169 )
170 }
171 Self::ProtocolSignalsIncompatible => {
172 write!(f, "The FDomain protocol received unrecognized or incompatible signals")
173 }
174 Self::ProtocolStreamEventIncompatible => {
175 write!(
176 f,
177 "The FDomain protocol received an unrecognized or incompatible streaming IO event"
178 )
179 }
180 Self::FDomain(e) => write_fdomain_error(e, f),
181 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
182 Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
183 Self::Transport(None) => {
184 write!(f, "Transport error: Connection to the device has been lost")
185 }
186 Self::ConnectionMismatch => {
187 write!(
188 f,
189 "Tried to use an FDomain handle with a different connection than the one it was created on"
190 )
191 }
192 Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
193 }
194 }
195}
196
197impl std::fmt::Debug for Error {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 match self {
200 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
201 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
202 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
203 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
204 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
205 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
206 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
207 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
208 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
209 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
210 Self::StreamingAborted => write!(f, "StreamingAborted"),
211 }
212 }
213}
214
215impl std::error::Error for Error {}
216
217impl From<FDomainError> for Error {
218 fn from(other: FDomainError) -> Self {
219 Self::FDomain(other)
220 }
221}
222
223impl From<::fidl::Error> for Error {
224 fn from(other: ::fidl::Error) -> Self {
225 Self::Protocol(other)
226 }
227}
228
229impl From<WriteSocketError> for Error {
230 fn from(other: WriteSocketError) -> Self {
231 Self::SocketWrite(other)
232 }
233}
234
235impl From<WriteChannelError> for Error {
236 fn from(other: WriteChannelError) -> Self {
237 Self::ChannelWrite(other)
238 }
239}
240
241#[derive(Clone)]
245enum InnerError {
246 Protocol(::fidl::Error),
247 ProtocolStreamEventIncompatible,
248 Transport(Option<Arc<std::io::Error>>),
249}
250
251impl From<InnerError> for Error {
252 fn from(other: InnerError) -> Self {
253 match other {
254 InnerError::Protocol(p) => Error::Protocol(p),
255 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
256 InnerError::Transport(t) => Error::Transport(t),
257 }
258 }
259}
260
261impl From<::fidl::Error> for InnerError {
262 fn from(other: ::fidl::Error) -> Self {
263 InnerError::Protocol(other)
264 }
265}
266
267pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
277 fn poll_send_message(
280 self: Pin<&mut Self>,
281 msg: &[u8],
282 ctx: &mut Context<'_>,
283 ) -> Poll<Result<(), Option<std::io::Error>>>;
284
285 fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 Ok(())
288 }
289
290 fn has_debug_fmt(&self) -> bool {
292 false
293 }
294}
295
296enum Transport {
302 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
303 Error(InnerError),
304}
305
306impl Transport {
307 fn error(&self) -> Option<InnerError> {
309 match self {
310 Transport::Transport(_, _, _) => None,
311 Transport::Error(inner_error) => Some(inner_error.clone()),
312 }
313 }
314
315 fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
317 match self {
318 Transport::Transport(_, v, w) => {
319 v.push_back(msg);
320 w.drain(..).for_each(Waker::wake);
321 Ok(())
322 }
323 Transport::Error(e) => Err(e.clone()),
324 }
325 }
326
327 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
329 match self {
330 Transport::Error(e) => Poll::Ready(e.clone()),
331 Transport::Transport(t, v, w) => {
332 while let Some(msg) = v.front() {
333 match t.as_mut().poll_send_message(msg, ctx) {
334 Poll::Ready(Ok(())) => {
335 v.pop_front();
336 }
337 Poll::Ready(Err(e)) => {
338 let e = e.map(Arc::new);
339 return Poll::Ready(InnerError::Transport(e));
340 }
341 Poll::Pending => return Poll::Pending,
342 }
343 }
344
345 if v.is_empty() {
346 w.push(ctx.waker().clone());
347 } else {
348 ctx.waker().wake_by_ref();
349 }
350 Poll::Pending
351 }
352 }
353 }
354
355 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
357 match self {
358 Transport::Error(e) => Poll::Ready(Err(e.clone())),
359 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
360 Some(Ok(x)) => Poll::Ready(Ok(x)),
361 Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
362 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
363 },
364 }
365 }
366}
367
368impl Drop for Transport {
369 fn drop(&mut self) {
370 if let Transport::Transport(_, _, wakers) = self {
371 wakers.drain(..).for_each(Waker::wake);
372 }
373 }
374}
375
376struct SocketReadState {
378 wakers: Vec<Waker>,
379 queued: VecDeque<Result<proto::SocketData, Error>>,
380 read_request_pending: bool,
381 is_streaming: bool,
382}
383
384impl SocketReadState {
385 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
388 self.queued.push_back(msg);
389 std::mem::replace(&mut self.wakers, Vec::new())
390 }
391}
392
393struct ChannelReadState {
395 wakers: Vec<Waker>,
396 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
397 read_request_pending: bool,
398 is_streaming: bool,
399}
400
401impl ChannelReadState {
402 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
405 self.queued.push_back(msg);
406 std::mem::replace(&mut self.wakers, Vec::new())
407 }
408}
409
410struct ClientInner {
412 transport: Transport,
413 transactions: HashMap<NonZeroU32, responder::Responder>,
414 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
415 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
416 next_tx_id: u32,
417 waiting_to_close: Vec<proto::HandleId>,
418 waiting_to_close_waker: Waker,
419
420 wakers_to_wake: Vec<Waker>,
426}
427
428impl ClientInner {
429 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
431 if ordinal != ordinals::CLOSE {
432 self.process_waiting_to_close();
433 }
434 let tx_id = self.next_tx_id;
435
436 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
437 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
438 self.next_tx_id += 1;
439 if let Err(e) = self.transport.push_msg(msg.into()) {
440 let _ = responder.handle(self, Err(e.into()));
441 } else {
442 assert!(
443 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
444 "Allocated same tx id twice!"
445 );
446 }
447 }
448
449 fn process_waiting_to_close(&mut self) {
450 if !self.waiting_to_close.is_empty() {
451 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
452 for handle in &handles {
455 let _ = self.channel_read_states.remove(handle);
456 let _ = self.socket_read_states.remove(handle);
457 }
458 self.request(
459 ordinals::CLOSE,
460 proto::FDomainCloseRequest { handles },
461 Responder::Ignore,
462 );
463 }
464 }
465
466 fn try_poll_transport(
469 &mut self,
470 ctx: &mut Context<'_>,
471 ) -> Poll<Result<Infallible, InnerError>> {
472 self.process_waiting_to_close();
473
474 self.waiting_to_close_waker = ctx.waker().clone();
475
476 loop {
477 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
478 return Poll::Ready(Err(e));
479 }
480 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
481 return Poll::Pending;
482 };
483 let data = result?;
484 let (header, data) = fidl_message::decode_transaction_header(&data)?;
485
486 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
487 let wakers = self.process_event(header, data)?;
488 self.wakers_to_wake.extend(wakers);
489 continue;
490 };
491
492 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
493 tx.handle(self, Ok((header, data)))?;
494 }
495 }
496
497 fn process_event(
499 &mut self,
500 header: TransactionHeader,
501 data: &[u8],
502 ) -> Result<Vec<Waker>, InnerError> {
503 match header.ordinal {
504 ordinals::ON_SOCKET_STREAMING_DATA => {
505 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
506 header, data,
507 )?;
508 let o =
509 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
510 wakers: Vec::new(),
511 queued: VecDeque::new(),
512 is_streaming: false,
513 read_request_pending: false,
514 });
515 match msg.socket_message {
516 proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
517 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
518 let ret = if let Some(error) = error {
519 o.handle_incoming_message(Err(Error::FDomain(*error)))
520 } else {
521 Vec::new()
522 };
523 o.is_streaming = false;
524 Ok(ret)
525 }
526 _ => Err(InnerError::ProtocolStreamEventIncompatible),
527 }
528 }
529 ordinals::ON_CHANNEL_STREAMING_DATA => {
530 let msg = fidl_message::decode_message::<
531 proto::ChannelOnChannelStreamingDataRequest,
532 >(header, data)?;
533 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
534 ChannelReadState {
535 wakers: Vec::new(),
536 queued: VecDeque::new(),
537 is_streaming: false,
538 read_request_pending: false,
539 }
540 });
541 match msg.channel_sent {
542 proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
543 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
544 let ret = if let Some(error) = error {
545 o.handle_incoming_message(Err(Error::FDomain(*error)))
546 } else {
547 Vec::new()
548 };
549 o.is_streaming = false;
550 Ok(ret)
551 }
552 _ => Err(InnerError::ProtocolStreamEventIncompatible),
553 }
554 }
555 _ => Err(::fidl::Error::UnknownOrdinal {
556 ordinal: header.ordinal,
557 protocol_name:
558 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
559 }
560 .into()),
561 }
562 }
563
564 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
568 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
569 for (_, v) in std::mem::take(&mut self.transactions) {
570 let _ = v.handle(self, Err(e.clone()));
571 }
572 for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
573 state.queued.push_back(Err(Error::from(e.clone())));
574 self.wakers_to_wake.extend(state.wakers);
575 }
576 for (_, mut state) in self.channel_read_states.drain() {
577 state.queued.push_back(Err(Error::from(e.clone())));
578 self.wakers_to_wake.extend(state.wakers);
579 }
580 if matches!(self.transport, Transport::Transport(_, _, _)) {
581 self.transport = Transport::Error(e);
582 }
583
584 Poll::Ready(())
585 } else {
586 Poll::Pending
587 }
588 }
589
590 pub(crate) fn handle_socket_read_response(
592 &mut self,
593 msg: Result<proto::SocketData, Error>,
594 id: proto::HandleId,
595 ) {
596 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
597 wakers: Vec::new(),
598 queued: VecDeque::new(),
599 is_streaming: false,
600 read_request_pending: false,
601 });
602 let wakers = state.handle_incoming_message(msg);
603 self.wakers_to_wake.extend(wakers);
604 state.read_request_pending = false;
605 }
606
607 pub(crate) fn handle_channel_read_response(
609 &mut self,
610 msg: Result<proto::ChannelMessage, Error>,
611 id: proto::HandleId,
612 ) {
613 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
614 wakers: Vec::new(),
615 queued: VecDeque::new(),
616 is_streaming: false,
617 read_request_pending: false,
618 });
619 let wakers = state.handle_incoming_message(msg);
620 self.wakers_to_wake.extend(wakers);
621 state.read_request_pending = false;
622 }
623}
624
625impl Drop for ClientInner {
626 fn drop(&mut self) {
627 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
628 for responder in responders {
629 let _ = responder.handle(self, Err(InnerError::Transport(None)));
630 }
631 for state in self.channel_read_states.values_mut() {
632 state.wakers.drain(..).for_each(Waker::wake);
633 }
634 for state in self.socket_read_states.values_mut() {
635 state.wakers.drain(..).for_each(Waker::wake);
636 }
637 self.waiting_to_close_waker.wake_by_ref();
638 self.wakers_to_wake.drain(..).for_each(Waker::wake);
639 }
640}
641
642pub struct Client(pub(crate) Mutex<ClientInner>);
649
650impl std::fmt::Debug for Client {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 let inner = self.0.lock();
653 match &inner.transport {
654 Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
655 write!(f, "Client(")?;
656 transport.debug_fmt(f)?;
657 write!(f, ")")
658 }
659 Transport::Error(error) => {
660 let error = Error::from(error.clone());
661 write!(f, "Client(Failed: {error})")
662 }
663 _ => f.debug_tuple("Client").field(&"<transport>").finish(),
664 }
665 }
666}
667
668pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
672 Arc::new(Client(Mutex::new(ClientInner {
673 transport: Transport::Error(InnerError::Transport(None)),
674 transactions: HashMap::new(),
675 channel_read_states: HashMap::new(),
676 socket_read_states: HashMap::new(),
677 next_tx_id: 1,
678 waiting_to_close: Vec::new(),
679 waiting_to_close_waker: std::task::Waker::noop().clone(),
680 wakers_to_wake: Vec::new(),
681 })))
682});
683
684pub struct ClientLoop {
690 client: Weak<Client>,
691 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
692}
693
694impl Future for ClientLoop {
695 type Output = ();
696 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
697 self.fut.as_mut().poll(cx)
698 }
699}
700
701impl Drop for ClientLoop {
702 fn drop(&mut self) {
703 let Some(client) = self.client.upgrade() else {
704 return;
705 };
706
707 let (channel_read_states, socket_read_states, deferred_wakers) = {
708 let mut inner = client.0.lock();
709 let transactions = std::mem::take(&mut inner.transactions);
710 log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
711 for (_, v) in transactions {
712 let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
713 }
714
715 let channel_read_states = std::mem::take(&mut inner.channel_read_states);
716 let socket_read_states = std::mem::take(&mut inner.socket_read_states);
717
718 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
719
720 (channel_read_states, socket_read_states, deferred_wakers)
721 };
722
723 log::debug!("Failing reads on {} channels", channel_read_states.len());
724 for (_, mut state) in channel_read_states {
725 state.queued.push_back(Err(Error::Transport(None)));
726 state.wakers.into_iter().for_each(Waker::wake);
727 }
728
729 log::debug!("Failing reads on {} sockets", socket_read_states.len());
730 for (_, mut state) in socket_read_states {
731 state.queued.push_back(Err(Error::Transport(None)));
732 state.wakers.into_iter().for_each(Waker::wake);
733 }
734
735 deferred_wakers.into_iter().for_each(Waker::wake);
736 }
737}
738
739impl Client {
740 pub fn transport_status(&self) -> Result<()> {
741 match &self.0.lock().transport {
742 Transport::Error(e) => Err(e.clone().into()),
743 Transport::Transport(_, _, _) => Ok(()),
744 }
745 }
746 pub fn new(
753 transport: impl FDomainTransport + 'static,
754 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
755 let ret = Arc::new(Client(Mutex::new(ClientInner {
756 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
757 transactions: HashMap::new(),
758 socket_read_states: HashMap::new(),
759 channel_read_states: HashMap::new(),
760 next_tx_id: 1,
761 waiting_to_close: Vec::new(),
762 waiting_to_close_waker: std::task::Waker::noop().clone(),
763 wakers_to_wake: Vec::new(),
764 })));
765
766 let client_weak = Arc::downgrade(&ret);
767 let fut = futures::future::poll_fn(move |ctx| {
768 let Some(client) = client_weak.upgrade() else {
769 return Poll::Ready(());
770 };
771
772 let (ret, deferred_wakers) = {
773 let mut inner = client.0.lock();
774 let ret = inner.poll_transport(ctx);
775 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
776 (ret, deferred_wakers)
777 };
778 deferred_wakers.into_iter().for_each(Waker::wake);
779 ret
780 });
781
782 let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
783
784 (ret, client_loop)
785 }
786
787 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
789 let new_handle = self.new_hid();
790 self.transaction(
791 ordinals::GET_NAMESPACE,
792 proto::FDomainGetNamespaceRequest { new_handle },
793 Responder::Namespace,
794 )
795 .await?;
796 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
797 }
798
799 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
801 let id_a = self.new_hid();
802 let id_b = self.new_hid();
803 let fut = self.transaction(
804 ordinals::CREATE_CHANNEL,
805 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
806 Responder::CreateChannel,
807 );
808
809 fuchsia_async::Task::spawn(async move {
810 if let Err(e) = fut.await {
811 log::debug!("FDomain channel creation failed: {e}");
812 }
813 })
814 .detach();
815
816 (
817 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
818 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
819 )
820 }
821
822 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
824 self: &Arc<Self>,
825 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
826 let (client, server) = self.create_channel();
827 let client_end = crate::fidl::ClientEnd::<F>::new(client);
828 let server_end = crate::fidl::ServerEnd::new(server);
829 (client_end, server_end)
830 }
831
832 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
834 self: &Arc<Self>,
835 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
836 let (client_end, server_end) = self.create_endpoints::<F>();
837 (client_end.into_proxy(), server_end)
838 }
839
840 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
842 self: &Arc<Self>,
843 ) -> (F::Proxy, F::RequestStream) {
844 let (client_end, server_end) = self.create_endpoints::<F>();
845 (client_end.into_proxy(), server_end.into_stream())
846 }
847
848 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
850 self: &Arc<Self>,
851 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
852 let (client_end, server_end) = self.create_endpoints::<F>();
853 (client_end, server_end.into_stream())
854 }
855
856 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
858 let id_a = self.new_hid();
859 let id_b = self.new_hid();
860 let fut = self.transaction(
861 ordinals::CREATE_SOCKET,
862 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
863 Responder::CreateSocket,
864 );
865
866 fuchsia_async::Task::spawn(async move {
867 if let Err(e) = fut.await {
868 log::debug!("FDomain socket creation failed: {e}");
869 }
870 })
871 .detach();
872
873 (
874 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
875 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
876 )
877 }
878
879 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
881 self.create_socket(proto::SocketType::Stream)
882 }
883
884 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
886 self.create_socket(proto::SocketType::Datagram)
887 }
888
889 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
891 let id_a = self.new_hid();
892 let id_b = self.new_hid();
893 let fut = self.transaction(
894 ordinals::CREATE_EVENT_PAIR,
895 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
896 Responder::CreateEventPair,
897 );
898
899 fuchsia_async::Task::spawn(async move {
900 if let Err(e) = fut.await {
901 log::debug!("FDomain event pair creation failed: {e}");
902 }
903 })
904 .detach();
905
906 (
907 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
908 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
909 )
910 }
911
912 pub fn create_event(self: &Arc<Self>) -> Event {
914 let id = self.new_hid();
915 let fut = self.transaction(
916 ordinals::CREATE_EVENT,
917 proto::EventCreateEventRequest { handle: id },
918 Responder::CreateEvent,
919 );
920
921 fuchsia_async::Task::spawn(async move {
922 if let Err(e) = fut.await {
923 log::debug!("FDomain event creation failed: {e}");
924 }
925 })
926 .detach();
927
928 Event(Handle { id: id.id, client: Arc::downgrade(self) })
929 }
930
931 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
933 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
938 }
939
940 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
946 self: &Arc<Self>,
947 ordinal: u64,
948 request: S,
949 f: F,
950 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
951 where
952 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
953 {
954 let mut inner = self.0.lock();
955
956 let (sender, receiver) = futures::channel::oneshot::channel();
957 inner.request(ordinal, request, f(sender));
958 receiver.map(|x| x.expect("Oneshot went away without reply!"))
959 }
960
961 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
963 let mut inner = self.0.lock();
964 if let Some(e) = inner.transport.error() {
965 return Err(e.into());
966 }
967
968 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
969 wakers: Vec::new(),
970 queued: VecDeque::new(),
971 is_streaming: false,
972 read_request_pending: false,
973 });
974
975 assert!(!state.is_streaming, "Initiated streaming twice!");
976 state.is_streaming = true;
977
978 inner.request(
979 ordinals::READ_SOCKET_STREAMING_START,
980 proto::SocketReadSocketStreamingStartRequest { handle: id },
981 Responder::Ignore,
982 );
983 Ok(())
984 }
985
986 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
990 let mut inner = self.0.lock();
991 if let Some(state) = inner.socket_read_states.get_mut(&id) {
992 if state.is_streaming {
993 state.is_streaming = false;
994 let _ = inner.request(
996 ordinals::READ_SOCKET_STREAMING_STOP,
997 proto::ChannelReadChannelStreamingStopRequest { handle: id },
998 Responder::Ignore,
999 );
1000 }
1001 }
1002 }
1003
1004 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1006 let mut inner = self.0.lock();
1007 if let Some(e) = inner.transport.error() {
1008 return Err(e.into());
1009 }
1010 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1011 wakers: Vec::new(),
1012 queued: VecDeque::new(),
1013 is_streaming: false,
1014 read_request_pending: false,
1015 });
1016
1017 assert!(!state.is_streaming, "Initiated streaming twice!");
1018 state.is_streaming = true;
1019
1020 inner.request(
1021 ordinals::READ_CHANNEL_STREAMING_START,
1022 proto::ChannelReadChannelStreamingStartRequest { handle: id },
1023 Responder::Ignore,
1024 );
1025
1026 Ok(())
1027 }
1028
1029 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1033 let mut inner = self.0.lock();
1034 if let Some(state) = inner.channel_read_states.get_mut(&id) {
1035 if state.is_streaming {
1036 state.is_streaming = false;
1037 let _ = inner.request(
1039 ordinals::READ_CHANNEL_STREAMING_STOP,
1040 proto::ChannelReadChannelStreamingStopRequest { handle: id },
1041 Responder::Ignore,
1042 );
1043 }
1044 }
1045 }
1046
1047 pub(crate) fn poll_socket(
1049 &self,
1050 id: proto::HandleId,
1051 ctx: &mut Context<'_>,
1052 out: &mut [u8],
1053 ) -> Poll<Result<usize, Error>> {
1054 let mut inner = self.0.lock();
1055 if let Some(error) = inner.transport.error() {
1056 return Poll::Ready(Err(error.into()));
1057 }
1058
1059 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1060 wakers: Vec::new(),
1061 queued: VecDeque::new(),
1062 is_streaming: false,
1063 read_request_pending: false,
1064 });
1065
1066 if let Some(got) = state.queued.front_mut() {
1067 match got.as_mut() {
1068 Ok(data) => {
1069 let read_size = std::cmp::min(data.data.len(), out.len());
1070 out[..read_size].copy_from_slice(&data.data[..read_size]);
1071
1072 if data.data.len() > read_size && !data.is_datagram {
1073 let _ = data.data.drain(..read_size);
1074 } else {
1075 let _ = state.queued.pop_front();
1076 }
1077
1078 return Poll::Ready(Ok(read_size));
1079 }
1080 Err(_) => {
1081 let err = state.queued.pop_front().unwrap().unwrap_err();
1082 return Poll::Ready(Err(err));
1083 }
1084 }
1085 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1086 state.wakers.push(ctx.waker().clone());
1087 }
1088
1089 if !state.read_request_pending && !state.is_streaming {
1090 inner.request(
1091 ordinals::READ_SOCKET,
1092 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1093 Responder::ReadSocket(id),
1094 );
1095 }
1096
1097 Poll::Pending
1098 }
1099
1100 pub(crate) fn poll_channel(
1102 &self,
1103 id: proto::HandleId,
1104 ctx: &mut Context<'_>,
1105 for_stream: bool,
1106 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1107 let mut inner = self.0.lock();
1108 if let Some(error) = inner.transport.error() {
1109 return Poll::Ready(Some(Err(error.into())));
1110 }
1111
1112 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1113 wakers: Vec::new(),
1114 queued: VecDeque::new(),
1115 is_streaming: false,
1116 read_request_pending: false,
1117 });
1118
1119 if let Some(got) = state.queued.pop_front() {
1120 return Poll::Ready(Some(got));
1121 } else if for_stream && !state.is_streaming {
1122 return Poll::Ready(None);
1123 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1124 state.wakers.push(ctx.waker().clone());
1125 }
1126
1127 if !state.read_request_pending && !state.is_streaming {
1128 inner.request(
1129 ordinals::READ_CHANNEL,
1130 proto::ChannelReadChannelRequest { handle: id },
1131 Responder::ReadChannel(id),
1132 );
1133 }
1134
1135 Poll::Pending
1136 }
1137
1138 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1140 let inner = self.0.lock();
1141 let Some(state) = inner.channel_read_states.get(&id) else {
1142 return false;
1143 };
1144 state.is_streaming
1145 }
1146
1147 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1150 let inner = self.0.lock();
1151 match handles {
1152 proto::Handles::Handles(handles) => {
1153 for handle in handles {
1154 assert!(
1155 !(inner.channel_read_states.contains_key(handle)
1156 || inner.socket_read_states.contains_key(handle)),
1157 "Tried to transfer handle after reading"
1158 );
1159 }
1160 }
1161 proto::Handles::Dispositions(dispositions) => {
1162 for disposition in dispositions {
1163 match &disposition.handle {
1164 proto::HandleOp::Move_(handle) => assert!(
1165 !(inner.channel_read_states.contains_key(handle)
1166 || inner.socket_read_states.contains_key(handle)),
1167 "Tried to transfer handle after reading"
1168 ),
1169 proto::HandleOp::Duplicate(_) => (),
1171 }
1172 }
1173 }
1174 }
1175 }
1176}