1use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Fifo;
51#[rustfmt::skip]
52pub use Handle as Job;
53#[rustfmt::skip]
54pub use Handle as Process;
55#[rustfmt::skip]
56pub use Handle as Resource;
57#[rustfmt::skip]
58pub use Handle as Stream;
59#[rustfmt::skip]
60pub use Handle as Thread;
61#[rustfmt::skip]
62pub use Handle as Vmar;
63#[rustfmt::skip]
64pub use Handle as Vmo;
65#[rustfmt::skip]
66pub use Handle as Counter;
67
68use proto::f_domain_ordinals as ordinals;
69
70fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match error {
72 FDomainError::TargetError(e) => {
73 let e = zx_status::Status::from_raw(*e);
74 write!(f, "Target-side error {e}")
75 }
76 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
77 write!(f, "Tried to use invalid handle id {id}")
78 }
79 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
80 f,
81 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
82 ),
83 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
84 write!(f, "Handle is occupied delivering streaming reads")
85 }
86 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
87 write!(f, "No streaming read was in progress")
88 }
89 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
90 write!(
91 f,
92 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
93 )
94 }
95 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
96 if *same_call {
97 write!(f, "Tried to create two or more new handles with the same id {id}")
98 } else {
99 write!(
100 f,
101 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
102 )
103 }
104 }
105 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
106 write!(f, "Tried to write a channel into itself")
107 }
108 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
109 write!(f, "Handle closed while being read")
110 }
111 _ => todo!(),
112 }
113}
114
115pub type Result<T, E = Error> = std::result::Result<T, E>;
117
118#[derive(Clone)]
120pub enum Error {
121 SocketWrite(WriteSocketError),
122 ChannelWrite(WriteChannelError),
123 FDomain(FDomainError),
124 Protocol(::fidl::Error),
125 ProtocolObjectTypeIncompatible,
126 ProtocolRightsIncompatible,
127 ProtocolSignalsIncompatible,
128 ProtocolStreamEventIncompatible,
129 Transport(Option<Arc<std::io::Error>>),
130 ConnectionMismatch,
131 StreamingAborted,
132}
133
134impl std::fmt::Display for Error {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
138 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
139 write_fdomain_error(error, f)
140 }
141 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
142 write!(f, "While writing channel: ")?;
143 write_fdomain_error(error, f)
144 }
145 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
146 write!(f, "Couldn't write all handles into a channel:")?;
147 for (pos, error) in
148 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
149 {
150 write!(f, "\n Handle in position {pos}: ")?;
151 write_fdomain_error(error, f)?;
152 }
153 Ok(())
154 }
155 Self::ProtocolObjectTypeIncompatible => {
156 write!(f, "The FDomain protocol does not recognize an object type")
157 }
158 Self::ProtocolRightsIncompatible => {
159 write!(f, "The FDomain protocol does not recognize some rights")
160 }
161 Self::ProtocolSignalsIncompatible => {
162 write!(f, "The FDomain protocol does not recognize some signals")
163 }
164 Self::ProtocolStreamEventIncompatible => {
165 write!(f, "The FDomain protocol does not recognize a received streaming IO event")
166 }
167 Self::FDomain(e) => write_fdomain_error(e, f),
168 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
169 Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
170 Self::Transport(None) => write!(f, "Transport closed"),
171 Self::ConnectionMismatch => {
172 write!(f, "Tried to use an FDomain handle from a different connection")
173 }
174 Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
175 }
176 }
177}
178
179impl std::fmt::Debug for Error {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 match self {
182 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
183 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
184 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
185 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
186 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
187 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
188 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
189 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
190 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
191 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
192 Self::StreamingAborted => write!(f, "StreamingAborted"),
193 }
194 }
195}
196
197impl std::error::Error for Error {}
198
199impl From<FDomainError> for Error {
200 fn from(other: FDomainError) -> Self {
201 Self::FDomain(other)
202 }
203}
204
205impl From<::fidl::Error> for Error {
206 fn from(other: ::fidl::Error) -> Self {
207 Self::Protocol(other)
208 }
209}
210
211impl From<WriteSocketError> for Error {
212 fn from(other: WriteSocketError) -> Self {
213 Self::SocketWrite(other)
214 }
215}
216
217impl From<WriteChannelError> for Error {
218 fn from(other: WriteChannelError) -> Self {
219 Self::ChannelWrite(other)
220 }
221}
222
223#[derive(Clone)]
227enum InnerError {
228 Protocol(::fidl::Error),
229 ProtocolStreamEventIncompatible,
230 Transport(Option<Arc<std::io::Error>>),
231}
232
233impl From<InnerError> for Error {
234 fn from(other: InnerError) -> Self {
235 match other {
236 InnerError::Protocol(p) => Error::Protocol(p),
237 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
238 InnerError::Transport(t) => Error::Transport(t),
239 }
240 }
241}
242
243impl From<::fidl::Error> for InnerError {
244 fn from(other: ::fidl::Error) -> Self {
245 InnerError::Protocol(other)
246 }
247}
248
249pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
259 fn poll_send_message(
262 self: Pin<&mut Self>,
263 msg: &[u8],
264 ctx: &mut Context<'_>,
265 ) -> Poll<Result<(), Option<std::io::Error>>>;
266
267 fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 Ok(())
270 }
271
272 fn has_debug_fmt(&self) -> bool {
274 false
275 }
276}
277
278enum Transport {
284 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
285 Error(InnerError),
286}
287
288impl Transport {
289 fn error(&self) -> Option<InnerError> {
291 match self {
292 Transport::Transport(_, _, _) => None,
293 Transport::Error(inner_error) => Some(inner_error.clone()),
294 }
295 }
296
297 fn push_msg(&mut self, msg: Box<[u8]>) {
299 if let Transport::Transport(_, v, w) = self {
300 v.push_back(msg);
301 w.drain(..).for_each(Waker::wake);
302 }
303 }
304
305 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
307 match self {
308 Transport::Error(e) => Poll::Ready(e.clone()),
309 Transport::Transport(t, v, w) => {
310 while let Some(msg) = v.front() {
311 match t.as_mut().poll_send_message(msg, ctx) {
312 Poll::Ready(Ok(())) => {
313 v.pop_front();
314 }
315 Poll::Ready(Err(e)) => {
316 let e = e.map(Arc::new);
317 *self = Transport::Error(InnerError::Transport(e.clone()));
318 return Poll::Ready(InnerError::Transport(e));
319 }
320 Poll::Pending => return Poll::Pending,
321 }
322 }
323
324 if v.is_empty() {
325 w.push(ctx.waker().clone());
326 } else {
327 ctx.waker().wake_by_ref();
328 }
329 Poll::Pending
330 }
331 }
332 }
333
334 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
336 match self {
337 Transport::Error(e) => Poll::Ready(Err(e.clone())),
338 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
339 Some(Ok(x)) => Poll::Ready(Ok(x)),
340 Some(Err(e)) => {
341 let e = Arc::new(e);
342 *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
343 Poll::Ready(Err(InnerError::Transport(Some(e))))
344 }
345 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
346 },
347 }
348 }
349}
350
351impl Drop for Transport {
352 fn drop(&mut self) {
353 if let Transport::Transport(_, _, wakers) = self {
354 wakers.drain(..).for_each(Waker::wake);
355 }
356 }
357}
358
359struct SocketReadState {
361 wakers: Vec<Waker>,
362 queued: VecDeque<Result<proto::SocketData, Error>>,
363 read_request_pending: bool,
364 is_streaming: bool,
365}
366
367impl SocketReadState {
368 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
371 self.queued.push_back(msg);
372 self.wakers.drain(..).for_each(Waker::wake);
373 }
374}
375
376struct ChannelReadState {
378 wakers: Vec<Waker>,
379 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
380 read_request_pending: bool,
381 is_streaming: bool,
382}
383
384impl ChannelReadState {
385 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
388 self.queued.push_back(msg);
389 self.wakers.drain(..).for_each(Waker::wake);
390 }
391}
392
393struct ClientInner {
395 transport: Transport,
396 transactions: HashMap<NonZeroU32, responder::Responder>,
397 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
398 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
399 next_tx_id: u32,
400 waiting_to_close: Vec<proto::HandleId>,
401 waiting_to_close_waker: Waker,
402}
403
404impl ClientInner {
405 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
407 let tx_id = self.next_tx_id;
408
409 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
410 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
411 self.next_tx_id += 1;
412 assert!(
413 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
414 "Allocated same tx id twice!"
415 );
416 self.transport.push_msg(msg.into());
417 }
418
419 fn try_poll_transport(
422 &mut self,
423 ctx: &mut Context<'_>,
424 ) -> Poll<Result<Infallible, InnerError>> {
425 if !self.waiting_to_close.is_empty() {
426 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
427 for handle in &handles {
430 let _ = self.channel_read_states.remove(handle);
431 let _ = self.socket_read_states.remove(handle);
432 }
433 self.request(
434 ordinals::CLOSE,
435 proto::FDomainCloseRequest { handles },
436 Responder::Ignore,
437 );
438 }
439
440 self.waiting_to_close_waker = ctx.waker().clone();
441
442 loop {
443 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
444 for state in std::mem::take(&mut self.socket_read_states).into_values() {
445 state.wakers.into_iter().for_each(Waker::wake);
446 }
447 for (_, state) in self.channel_read_states.drain() {
448 state.wakers.into_iter().for_each(Waker::wake);
449 }
450 return Poll::Ready(Err(e));
451 }
452 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
453 return Poll::Pending;
454 };
455 let data = result?;
456 let (header, data) = match fidl_message::decode_transaction_header(&data) {
457 Ok(x) => x,
458 Err(e) => {
459 self.transport = Transport::Error(InnerError::Protocol(e));
460 continue;
461 }
462 };
463
464 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
465 if let Err(e) = self.process_event(header, data) {
466 self.transport = Transport::Error(e);
467 }
468 continue;
469 };
470
471 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
472 match tx.handle(self, Ok((header, data))) {
473 Ok(x) => x,
474 Err(e) => {
475 self.transport = Transport::Error(InnerError::Protocol(e));
476 continue;
477 }
478 }
479 }
480 }
481
482 fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
484 match header.ordinal {
485 ordinals::ON_SOCKET_STREAMING_DATA => {
486 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
487 header, data,
488 )?;
489 let o =
490 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
491 wakers: Vec::new(),
492 queued: VecDeque::new(),
493 is_streaming: false,
494 read_request_pending: false,
495 });
496 match msg.socket_message {
497 proto::SocketMessage::Data(data) => {
498 o.handle_incoming_message(Ok(data));
499 Ok(())
500 }
501 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
502 if let Some(error) = error {
503 o.handle_incoming_message(Err(Error::FDomain(*error)));
504 }
505 o.is_streaming = false;
506 Ok(())
507 }
508 _ => Err(InnerError::ProtocolStreamEventIncompatible),
509 }
510 }
511 ordinals::ON_CHANNEL_STREAMING_DATA => {
512 let msg = fidl_message::decode_message::<
513 proto::ChannelOnChannelStreamingDataRequest,
514 >(header, data)?;
515 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
516 ChannelReadState {
517 wakers: Vec::new(),
518 queued: VecDeque::new(),
519 is_streaming: false,
520 read_request_pending: false,
521 }
522 });
523 match msg.channel_sent {
524 proto::ChannelSent::Message(data) => {
525 o.handle_incoming_message(Ok(data));
526 Ok(())
527 }
528 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
529 if let Some(error) = error {
530 o.handle_incoming_message(Err(Error::FDomain(*error)));
531 }
532 o.is_streaming = false;
533 Ok(())
534 }
535 _ => Err(InnerError::ProtocolStreamEventIncompatible),
536 }
537 }
538 _ => Err(::fidl::Error::UnknownOrdinal {
539 ordinal: header.ordinal,
540 protocol_name:
541 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
542 }
543 .into()),
544 }
545 }
546
547 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
551 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
552 for (_, v) in std::mem::take(&mut self.transactions) {
553 let _ = v.handle(self, Err(e.clone()));
554 }
555
556 Poll::Ready(())
557 } else {
558 Poll::Pending
559 }
560 }
561
562 pub(crate) fn handle_socket_read_response(
564 &mut self,
565 msg: Result<proto::SocketData, Error>,
566 id: proto::HandleId,
567 ) {
568 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
569 wakers: Vec::new(),
570 queued: VecDeque::new(),
571 is_streaming: false,
572 read_request_pending: false,
573 });
574 state.handle_incoming_message(msg);
575 state.read_request_pending = false;
576 }
577
578 pub(crate) fn handle_channel_read_response(
580 &mut self,
581 msg: Result<proto::ChannelMessage, Error>,
582 id: proto::HandleId,
583 ) {
584 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
585 wakers: Vec::new(),
586 queued: VecDeque::new(),
587 is_streaming: false,
588 read_request_pending: false,
589 });
590 state.handle_incoming_message(msg);
591 state.read_request_pending = false;
592 }
593}
594
595impl Drop for ClientInner {
596 fn drop(&mut self) {
597 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
598 for responder in responders {
599 let _ = responder.handle(self, Err(InnerError::Transport(None)));
600 }
601 for state in self.channel_read_states.values_mut() {
602 state.wakers.drain(..).for_each(Waker::wake);
603 }
604 for state in self.socket_read_states.values_mut() {
605 state.wakers.drain(..).for_each(Waker::wake);
606 }
607 self.waiting_to_close_waker.wake_by_ref();
608 }
609}
610
611pub struct Client(pub(crate) Mutex<ClientInner>);
618
619impl std::fmt::Debug for Client {
620 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
621 let inner = self.0.lock();
622 match &inner.transport {
623 Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
624 write!(f, "Client(")?;
625 transport.debug_fmt(f)?;
626 write!(f, ")")
627 }
628 Transport::Error(error) => {
629 let error = Error::from(error.clone());
630 write!(f, "Client(Failed: {error})")
631 }
632 _ => f.debug_tuple("Client").field(&"<transport>").finish(),
633 }
634 }
635}
636
637pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
641 Arc::new(Client(Mutex::new(ClientInner {
642 transport: Transport::Error(InnerError::Transport(None)),
643 transactions: HashMap::new(),
644 channel_read_states: HashMap::new(),
645 socket_read_states: HashMap::new(),
646 next_tx_id: 1,
647 waiting_to_close: Vec::new(),
648 waiting_to_close_waker: std::task::Waker::noop().clone(),
649 })))
650});
651
652impl Client {
653 pub fn new(
660 transport: impl FDomainTransport + 'static,
661 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
662 let ret = Arc::new(Client(Mutex::new(ClientInner {
663 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
664 transactions: HashMap::new(),
665 socket_read_states: HashMap::new(),
666 channel_read_states: HashMap::new(),
667 next_tx_id: 1,
668 waiting_to_close: Vec::new(),
669 waiting_to_close_waker: std::task::Waker::noop().clone(),
670 })));
671
672 let client_weak = Arc::downgrade(&ret);
673 let fut = futures::future::poll_fn(move |ctx| {
674 let Some(client) = client_weak.upgrade() else {
675 return Poll::Ready(());
676 };
677
678 client.0.lock().poll_transport(ctx)
679 });
680
681 (ret, fut)
682 }
683
684 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
686 let new_handle = self.new_hid();
687 self.transaction(
688 ordinals::GET_NAMESPACE,
689 proto::FDomainGetNamespaceRequest { new_handle },
690 Responder::Namespace,
691 )
692 .await?;
693 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
694 }
695
696 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
698 let id_a = self.new_hid();
699 let id_b = self.new_hid();
700 let fut = self.transaction(
701 ordinals::CREATE_CHANNEL,
702 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
703 Responder::CreateChannel,
704 );
705
706 fuchsia_async::Task::spawn(async move {
707 if let Err(e) = fut.await {
708 log::debug!("FDomain channel creation failed: {e}");
709 }
710 })
711 .detach();
712
713 (
714 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
715 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
716 )
717 }
718
719 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
721 self: &Arc<Self>,
722 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
723 let (client, server) = self.create_channel();
724 let client_end = crate::fidl::ClientEnd::<F>::new(client);
725 let server_end = crate::fidl::ServerEnd::new(server);
726 (client_end, server_end)
727 }
728
729 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
731 self: &Arc<Self>,
732 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
733 let (client_end, server_end) = self.create_endpoints::<F>();
734 (client_end.into_proxy(), server_end)
735 }
736
737 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
739 self: &Arc<Self>,
740 ) -> (F::Proxy, F::RequestStream) {
741 let (client_end, server_end) = self.create_endpoints::<F>();
742 (client_end.into_proxy(), server_end.into_stream())
743 }
744
745 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
747 self: &Arc<Self>,
748 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
749 let (client_end, server_end) = self.create_endpoints::<F>();
750 (client_end, server_end.into_stream())
751 }
752
753 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
755 let id_a = self.new_hid();
756 let id_b = self.new_hid();
757 let fut = self.transaction(
758 ordinals::CREATE_SOCKET,
759 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
760 Responder::CreateSocket,
761 );
762
763 fuchsia_async::Task::spawn(async move {
764 if let Err(e) = fut.await {
765 log::debug!("FDomain socket creation failed: {e}");
766 }
767 })
768 .detach();
769
770 (
771 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
772 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
773 )
774 }
775
776 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
778 self.create_socket(proto::SocketType::Stream)
779 }
780
781 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
783 self.create_socket(proto::SocketType::Datagram)
784 }
785
786 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
788 let id_a = self.new_hid();
789 let id_b = self.new_hid();
790 let fut = self.transaction(
791 ordinals::CREATE_EVENT_PAIR,
792 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
793 Responder::CreateEventPair,
794 );
795
796 fuchsia_async::Task::spawn(async move {
797 if let Err(e) = fut.await {
798 log::debug!("FDomain event pair creation failed: {e}");
799 }
800 })
801 .detach();
802
803 (
804 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
805 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
806 )
807 }
808
809 pub fn create_event(self: &Arc<Self>) -> Event {
811 let id = self.new_hid();
812 let fut = self.transaction(
813 ordinals::CREATE_EVENT,
814 proto::EventCreateEventRequest { handle: id },
815 Responder::CreateEvent,
816 );
817
818 fuchsia_async::Task::spawn(async move {
819 if let Err(e) = fut.await {
820 log::debug!("FDomain event creation failed: {e}");
821 }
822 })
823 .detach();
824
825 Event(Handle { id: id.id, client: Arc::downgrade(self) })
826 }
827
828 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
830 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
835 }
836
837 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
843 self: &Arc<Self>,
844 ordinal: u64,
845 request: S,
846 f: F,
847 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
848 where
849 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
850 {
851 let mut inner = self.0.lock();
852
853 let (sender, receiver) = futures::channel::oneshot::channel();
854 inner.request(ordinal, request, f(sender));
855 receiver.map(|x| x.expect("Oneshot went away without reply!"))
856 }
857
858 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
860 let mut inner = self.0.lock();
861 if let Some(e) = inner.transport.error() {
862 return Err(e.into());
863 }
864
865 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
866 wakers: Vec::new(),
867 queued: VecDeque::new(),
868 is_streaming: false,
869 read_request_pending: false,
870 });
871
872 assert!(!state.is_streaming, "Initiated streaming twice!");
873 state.is_streaming = true;
874
875 inner.request(
876 ordinals::READ_SOCKET_STREAMING_START,
877 proto::SocketReadSocketStreamingStartRequest { handle: id },
878 Responder::Ignore,
879 );
880 Ok(())
881 }
882
883 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
887 let mut inner = self.0.lock();
888 if let Some(state) = inner.socket_read_states.get_mut(&id) {
889 if state.is_streaming {
890 state.is_streaming = false;
891 let _ = inner.request(
893 ordinals::READ_SOCKET_STREAMING_STOP,
894 proto::ChannelReadChannelStreamingStopRequest { handle: id },
895 Responder::Ignore,
896 );
897 }
898 }
899 }
900
901 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
903 let mut inner = self.0.lock();
904 if let Some(e) = inner.transport.error() {
905 return Err(e.into());
906 }
907 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
908 wakers: Vec::new(),
909 queued: VecDeque::new(),
910 is_streaming: false,
911 read_request_pending: false,
912 });
913
914 assert!(!state.is_streaming, "Initiated streaming twice!");
915 state.is_streaming = true;
916
917 inner.request(
918 ordinals::READ_CHANNEL_STREAMING_START,
919 proto::ChannelReadChannelStreamingStartRequest { handle: id },
920 Responder::Ignore,
921 );
922
923 Ok(())
924 }
925
926 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
930 let mut inner = self.0.lock();
931 if let Some(state) = inner.channel_read_states.get_mut(&id) {
932 if state.is_streaming {
933 state.is_streaming = false;
934 let _ = inner.request(
936 ordinals::READ_CHANNEL_STREAMING_STOP,
937 proto::ChannelReadChannelStreamingStopRequest { handle: id },
938 Responder::Ignore,
939 );
940 }
941 }
942 }
943
944 pub(crate) fn poll_socket(
946 &self,
947 id: proto::HandleId,
948 ctx: &mut Context<'_>,
949 out: &mut [u8],
950 ) -> Poll<Result<usize, Error>> {
951 let mut inner = self.0.lock();
952 if let Some(error) = inner.transport.error() {
953 return Poll::Ready(Err(error.into()));
954 }
955
956 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
957 wakers: Vec::new(),
958 queued: VecDeque::new(),
959 is_streaming: false,
960 read_request_pending: false,
961 });
962
963 if let Some(got) = state.queued.front_mut() {
964 match got.as_mut() {
965 Ok(data) => {
966 let read_size = std::cmp::min(data.data.len(), out.len());
967 out[..read_size].copy_from_slice(&data.data[..read_size]);
968
969 if data.data.len() > read_size && !data.is_datagram {
970 let _ = data.data.drain(..read_size);
971 } else {
972 let _ = state.queued.pop_front();
973 }
974
975 return Poll::Ready(Ok(read_size));
976 }
977 Err(_) => {
978 let err = state.queued.pop_front().unwrap().unwrap_err();
979 return Poll::Ready(Err(err));
980 }
981 }
982 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
983 state.wakers.push(ctx.waker().clone());
984 }
985
986 if !state.read_request_pending && !state.is_streaming {
987 inner.request(
988 ordinals::READ_SOCKET,
989 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
990 Responder::ReadSocket(id),
991 );
992 }
993
994 Poll::Pending
995 }
996
997 pub(crate) fn poll_channel(
999 &self,
1000 id: proto::HandleId,
1001 ctx: &mut Context<'_>,
1002 for_stream: bool,
1003 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1004 let mut inner = self.0.lock();
1005 if let Some(error) = inner.transport.error() {
1006 return Poll::Ready(Some(Err(error.into())));
1007 }
1008
1009 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1010 wakers: Vec::new(),
1011 queued: VecDeque::new(),
1012 is_streaming: false,
1013 read_request_pending: false,
1014 });
1015
1016 if let Some(got) = state.queued.pop_front() {
1017 return Poll::Ready(Some(got));
1018 } else if for_stream && !state.is_streaming {
1019 return Poll::Ready(None);
1020 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1021 state.wakers.push(ctx.waker().clone());
1022 }
1023
1024 if !state.read_request_pending && !state.is_streaming {
1025 inner.request(
1026 ordinals::READ_CHANNEL,
1027 proto::ChannelReadChannelRequest { handle: id },
1028 Responder::ReadChannel(id),
1029 );
1030 }
1031
1032 Poll::Pending
1033 }
1034
1035 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1037 let inner = self.0.lock();
1038 let Some(state) = inner.channel_read_states.get(&id) else {
1039 return false;
1040 };
1041 state.is_streaming
1042 }
1043
1044 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1047 let inner = self.0.lock();
1048 match handles {
1049 proto::Handles::Handles(handles) => {
1050 for handle in handles {
1051 assert!(
1052 !(inner.channel_read_states.contains_key(handle)
1053 || inner.socket_read_states.contains_key(handle)),
1054 "Tried to transfer handle after reading"
1055 );
1056 }
1057 }
1058 proto::Handles::Dispositions(dispositions) => {
1059 for disposition in dispositions {
1060 match &disposition.handle {
1061 proto::HandleOp::Move_(handle) => assert!(
1062 !(inner.channel_read_states.contains_key(handle)
1063 || inner.socket_read_states.contains_key(handle)),
1064 "Tried to transfer handle after reading"
1065 ),
1066 proto::HandleOp::Duplicate(_) => (),
1068 }
1069 }
1070 }
1071 }
1072 }
1073}