1use fidl_message::TransactionHeader;
6use futures::FutureExt;
7use futures::channel::oneshot::Sender as OneshotSender;
8use futures::stream::Stream as StreamTrait;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{Context, Poll, Waker, ready};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29pub mod fidl_next;
30
31use responder::Responder;
32
33pub use channel::{
34 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
35};
36pub use event::Event;
37pub use event_pair::Eventpair as EventPair;
38pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
39pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
40pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
41
42#[rustfmt::skip]
44pub use Handle as Fifo;
45#[rustfmt::skip]
46pub use Handle as Job;
47#[rustfmt::skip]
48pub use Handle as Process;
49#[rustfmt::skip]
50pub use Handle as Resource;
51#[rustfmt::skip]
52pub use Handle as Stream;
53#[rustfmt::skip]
54pub use Handle as Thread;
55#[rustfmt::skip]
56pub use Handle as Vmar;
57#[rustfmt::skip]
58pub use Handle as Vmo;
59
60use proto::f_domain_ordinals as ordinals;
61
62fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match error {
64 FDomainError::TargetError(e) => {
65 let e = zx_status::Status::from_raw(*e);
66 write!(f, "Target-side error {e}")
67 }
68 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
69 write!(f, "Tried to use invalid handle id {id}")
70 }
71 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
72 f,
73 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
74 ),
75 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
76 write!(f, "Handle is occupied delivering streaming reads")
77 }
78 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
79 write!(f, "No streaming read was in progress")
80 }
81 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
82 write!(
83 f,
84 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
85 )
86 }
87 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
88 if *same_call {
89 write!(f, "Tried to create two or more new handles with the same id {id}")
90 } else {
91 write!(
92 f,
93 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
94 )
95 }
96 }
97 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
98 write!(f, "Tried to write a channel into itself")
99 }
100 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
101 write!(f, "Handle closed while being read")
102 }
103 _ => todo!(),
104 }
105}
106
107pub type Result<T, E = Error> = std::result::Result<T, E>;
109
110#[derive(Clone)]
112pub enum Error {
113 SocketWrite(WriteSocketError),
114 ChannelWrite(WriteChannelError),
115 FDomain(FDomainError),
116 Protocol(::fidl::Error),
117 ProtocolObjectTypeIncompatible,
118 ProtocolRightsIncompatible,
119 ProtocolSignalsIncompatible,
120 ProtocolStreamEventIncompatible,
121 Transport(Arc<std::io::Error>),
122 ConnectionMismatch,
123 StreamingAborted,
124}
125
126impl std::fmt::Display for Error {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
130 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
131 write_fdomain_error(error, f)
132 }
133 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
134 write!(f, "While writing channel: ")?;
135 write_fdomain_error(error, f)
136 }
137 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
138 write!(f, "Couldn't write all handles into a channel:")?;
139 for (pos, error) in
140 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
141 {
142 write!(f, "\n Handle in position {pos}: ")?;
143 write_fdomain_error(error, f)?;
144 }
145 Ok(())
146 }
147 Self::ProtocolObjectTypeIncompatible => {
148 write!(f, "The FDomain protocol does not recognize an object type")
149 }
150 Self::ProtocolRightsIncompatible => {
151 write!(f, "The FDomain protocol does not recognize some rights")
152 }
153 Self::ProtocolSignalsIncompatible => {
154 write!(f, "The FDomain protocol does not recognize some signals")
155 }
156 Self::ProtocolStreamEventIncompatible => {
157 write!(f, "The FDomain protocol does not recognize a received streaming IO event")
158 }
159 Self::FDomain(e) => write_fdomain_error(e, f),
160 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
161 Self::Transport(e) => write!(f, "Transport error: {e:?}"),
162 Self::ConnectionMismatch => {
163 write!(f, "Tried to use an FDomain handle from a different connection")
164 }
165 Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
166 }
167 }
168}
169
170impl std::fmt::Debug for Error {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 match self {
173 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
174 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
175 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
176 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
177 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
178 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
179 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
180 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
181 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
182 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
183 Self::StreamingAborted => write!(f, "StreamingAborted"),
184 }
185 }
186}
187
188impl std::error::Error for Error {}
189
190impl From<FDomainError> for Error {
191 fn from(other: FDomainError) -> Self {
192 Self::FDomain(other)
193 }
194}
195
196impl From<::fidl::Error> for Error {
197 fn from(other: ::fidl::Error) -> Self {
198 Self::Protocol(other)
199 }
200}
201
202impl From<WriteSocketError> for Error {
203 fn from(other: WriteSocketError) -> Self {
204 Self::SocketWrite(other)
205 }
206}
207
208impl From<WriteChannelError> for Error {
209 fn from(other: WriteChannelError) -> Self {
210 Self::ChannelWrite(other)
211 }
212}
213
214enum InnerError {
218 Protocol(::fidl::Error),
219 ProtocolStreamEventIncompatible,
220 Transport(Arc<std::io::Error>),
221}
222
223impl Clone for InnerError {
224 fn clone(&self) -> Self {
225 match self {
226 InnerError::Protocol(a) => InnerError::Protocol(a.clone()),
227 InnerError::ProtocolStreamEventIncompatible => {
228 InnerError::ProtocolStreamEventIncompatible
229 }
230 InnerError::Transport(a) => InnerError::Transport(Arc::clone(a)),
231 }
232 }
233}
234
235impl From<InnerError> for Error {
236 fn from(other: InnerError) -> Self {
237 match other {
238 InnerError::Protocol(p) => Error::Protocol(p),
239 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
240 InnerError::Transport(t) => Error::Transport(t),
241 }
242 }
243}
244
245impl From<::fidl::Error> for InnerError {
246 fn from(other: ::fidl::Error) -> Self {
247 InnerError::Protocol(other)
248 }
249}
250
251pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
261 fn poll_send_message(
264 self: Pin<&mut Self>,
265 msg: &[u8],
266 ctx: &mut Context<'_>,
267 ) -> Poll<Result<(), std::io::Error>>;
268}
269
270enum Transport {
276 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
277 Error(InnerError),
278}
279
280impl Transport {
281 fn error(&self) -> Option<InnerError> {
283 match self {
284 Transport::Transport(_, _, _) => None,
285 Transport::Error(inner_error) => Some(inner_error.clone()),
286 }
287 }
288
289 fn push_msg(&mut self, msg: Box<[u8]>) {
291 if let Transport::Transport(_, v, w) = self {
292 v.push_back(msg);
293 w.drain(..).for_each(Waker::wake);
294 }
295 }
296
297 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
299 match self {
300 Transport::Error(e) => Poll::Ready(e.clone()),
301 Transport::Transport(t, v, w) => {
302 while let Some(msg) = v.front() {
303 match t.as_mut().poll_send_message(msg, ctx) {
304 Poll::Ready(Ok(())) => {
305 v.pop_front();
306 }
307 Poll::Ready(Err(e)) => {
308 let e = Arc::new(e);
309 *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
310 return Poll::Ready(InnerError::Transport(e));
311 }
312 Poll::Pending => return Poll::Pending,
313 }
314 }
315
316 if v.is_empty() {
317 w.push(ctx.waker().clone());
318 } else {
319 ctx.waker().wake_by_ref();
320 }
321 Poll::Pending
322 }
323 }
324 }
325
326 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Option<Result<Box<[u8]>, InnerError>>> {
328 match self {
329 Transport::Error(e) => Poll::Ready(Some(Err(e.clone()))),
330 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
331 Some(Ok(x)) => Poll::Ready(Some(Ok(x))),
332 Some(Err(e)) => {
333 let e = Arc::new(e);
334 *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
335 Poll::Ready(Some(Err(InnerError::Transport(e))))
336 }
337 Option::None => Poll::Ready(None),
338 },
339 }
340 }
341}
342
343struct SocketReadState {
345 wakers: Vec<Waker>,
346 queued: VecDeque<Result<proto::SocketData, Error>>,
347 read_request_pending: bool,
348 is_streaming: bool,
349}
350
351impl SocketReadState {
352 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
355 self.queued.push_back(msg);
356 self.wakers.drain(..).for_each(Waker::wake);
357 }
358}
359
360struct ChannelReadState {
362 wakers: Vec<Waker>,
363 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
364 read_request_pending: bool,
365 is_streaming: bool,
366}
367
368impl ChannelReadState {
369 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
372 self.queued.push_back(msg);
373 self.wakers.drain(..).for_each(Waker::wake);
374 }
375}
376
377struct ClientInner {
379 transport: Transport,
380 transactions: HashMap<NonZeroU32, responder::Responder>,
381 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
382 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
383 next_tx_id: u32,
384 waiting_to_close: Vec<proto::HandleId>,
385 waiting_to_close_waker: Waker,
386}
387
388impl ClientInner {
389 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
391 let tx_id = self.next_tx_id;
392
393 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
394 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
395 self.next_tx_id += 1;
396 assert!(
397 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
398 "Allocated same tx id twice!"
399 );
400 self.transport.push_msg(msg.into());
401 }
402
403 fn try_poll_transport(
406 &mut self,
407 ctx: &mut Context<'_>,
408 ) -> Poll<Result<Infallible, InnerError>> {
409 if !self.waiting_to_close.is_empty() {
410 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
411 for handle in &handles {
414 let _ = self.channel_read_states.remove(handle);
415 let _ = self.socket_read_states.remove(handle);
416 }
417 self.request(
418 ordinals::CLOSE,
419 proto::FDomainCloseRequest { handles },
420 Responder::Ignore,
421 );
422 }
423
424 self.waiting_to_close_waker = ctx.waker().clone();
425
426 loop {
427 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
428 for state in std::mem::take(&mut self.socket_read_states).into_values() {
429 state.wakers.into_iter().for_each(Waker::wake);
430 }
431 for (_, state) in self.channel_read_states.drain() {
432 state.wakers.into_iter().for_each(Waker::wake);
433 }
434 return Poll::Ready(Err(e));
435 }
436 let Poll::Ready(Some(result)) = self.transport.poll_next(ctx) else {
437 return Poll::Pending;
438 };
439 let data = result?;
440 let (header, data) = match fidl_message::decode_transaction_header(&data) {
441 Ok(x) => x,
442 Err(e) => {
443 self.transport = Transport::Error(InnerError::Protocol(e));
444 continue;
445 }
446 };
447
448 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
449 if let Err(e) = self.process_event(header, data) {
450 self.transport = Transport::Error(e);
451 }
452 continue;
453 };
454
455 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
456 match tx.handle(self, Ok((header, data))) {
457 Ok(x) => x,
458 Err(e) => {
459 self.transport = Transport::Error(InnerError::Protocol(e));
460 continue;
461 }
462 }
463 }
464 }
465
466 fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
468 match header.ordinal {
469 ordinals::ON_SOCKET_STREAMING_DATA => {
470 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
471 header, data,
472 )?;
473 let o =
474 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
475 wakers: Vec::new(),
476 queued: VecDeque::new(),
477 is_streaming: false,
478 read_request_pending: false,
479 });
480 match msg.socket_message {
481 proto::SocketMessage::Data(data) => {
482 o.handle_incoming_message(Ok(data));
483 Ok(())
484 }
485 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
486 if let Some(error) = error {
487 o.handle_incoming_message(Err(Error::FDomain(*error)));
488 }
489 o.is_streaming = false;
490 Ok(())
491 }
492 _ => Err(InnerError::ProtocolStreamEventIncompatible),
493 }
494 }
495 ordinals::ON_CHANNEL_STREAMING_DATA => {
496 let msg = fidl_message::decode_message::<
497 proto::ChannelOnChannelStreamingDataRequest,
498 >(header, data)?;
499 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
500 ChannelReadState {
501 wakers: Vec::new(),
502 queued: VecDeque::new(),
503 is_streaming: false,
504 read_request_pending: false,
505 }
506 });
507 match msg.channel_sent {
508 proto::ChannelSent::Message(data) => {
509 o.handle_incoming_message(Ok(data));
510 Ok(())
511 }
512 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
513 if let Some(error) = error {
514 o.handle_incoming_message(Err(Error::FDomain(*error)));
515 }
516 o.is_streaming = false;
517 Ok(())
518 }
519 _ => Err(InnerError::ProtocolStreamEventIncompatible),
520 }
521 }
522 _ => Err(::fidl::Error::UnknownOrdinal {
523 ordinal: header.ordinal,
524 protocol_name:
525 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
526 }
527 .into()),
528 }
529 }
530
531 fn poll_transport(&mut self, ctx: &mut Context<'_>) {
535 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
536 for (_, v) in std::mem::take(&mut self.transactions) {
537 let _ = v.handle(self, Err(e.clone()));
538 }
539 }
540 }
541
542 pub(crate) fn handle_socket_read_response(
544 &mut self,
545 msg: Result<proto::SocketData, Error>,
546 id: proto::HandleId,
547 ) {
548 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
549 wakers: Vec::new(),
550 queued: VecDeque::new(),
551 is_streaming: false,
552 read_request_pending: false,
553 });
554 state.handle_incoming_message(msg);
555 state.read_request_pending = false;
556 }
557
558 pub(crate) fn handle_channel_read_response(
560 &mut self,
561 msg: Result<proto::ChannelMessage, Error>,
562 id: proto::HandleId,
563 ) {
564 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
565 wakers: Vec::new(),
566 queued: VecDeque::new(),
567 is_streaming: false,
568 read_request_pending: false,
569 });
570 state.handle_incoming_message(msg);
571 state.read_request_pending = false;
572 }
573}
574
575pub struct Client(pub(crate) Mutex<ClientInner>);
582
583impl std::fmt::Debug for Client {
584 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
585 f.debug_tuple("Client").field(&"...").finish()
586 }
587}
588
589pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
593 Arc::new(Client(Mutex::new(ClientInner {
594 transport: Transport::Error(InnerError::Transport(Arc::new(std::io::Error::other(
595 "Client Lost",
596 )))),
597 transactions: HashMap::new(),
598 channel_read_states: HashMap::new(),
599 socket_read_states: HashMap::new(),
600 next_tx_id: 1,
601 waiting_to_close: Vec::new(),
602 waiting_to_close_waker: futures::task::noop_waker(),
603 })))
604});
605
606impl Client {
607 pub fn new(
614 transport: impl FDomainTransport + 'static,
615 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
616 let ret = Arc::new(Client(Mutex::new(ClientInner {
617 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
618 transactions: HashMap::new(),
619 socket_read_states: HashMap::new(),
620 channel_read_states: HashMap::new(),
621 next_tx_id: 1,
622 waiting_to_close: Vec::new(),
623 waiting_to_close_waker: futures::task::noop_waker(),
624 })));
625
626 let client_weak = Arc::downgrade(&ret);
627 let fut = futures::future::poll_fn(move |ctx| {
628 let Some(client) = client_weak.upgrade() else {
629 return Poll::Ready(());
630 };
631
632 client.0.lock().unwrap().poll_transport(ctx);
633 Poll::Pending
634 });
635
636 (ret, fut)
637 }
638
639 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
641 let new_handle = self.new_hid();
642 self.transaction(
643 ordinals::GET_NAMESPACE,
644 proto::FDomainGetNamespaceRequest { new_handle },
645 Responder::Namespace,
646 )
647 .await?;
648 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
649 }
650
651 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
653 let id_a = self.new_hid();
654 let id_b = self.new_hid();
655 let fut = self.transaction(
656 ordinals::CREATE_CHANNEL,
657 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
658 Responder::CreateChannel,
659 );
660
661 fuchsia_async::Task::spawn(async move {
662 if let Err(e) = fut.await {
663 log::debug!("FDomain channel creation failed: {e}");
664 }
665 })
666 .detach();
667
668 (
669 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
670 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
671 )
672 }
673
674 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
676 self: &Arc<Self>,
677 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
678 let (client, server) = self.create_channel();
679 let client_end = crate::fidl::ClientEnd::<F>::new(client);
680 let server_end = crate::fidl::ServerEnd::new(server);
681 (client_end, server_end)
682 }
683
684 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
686 self: &Arc<Self>,
687 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
688 let (client_end, server_end) = self.create_endpoints::<F>();
689 (client_end.into_proxy(), server_end)
690 }
691
692 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
694 self: &Arc<Self>,
695 ) -> (F::Proxy, F::RequestStream) {
696 let (client_end, server_end) = self.create_endpoints::<F>();
697 (client_end.into_proxy(), server_end.into_stream())
698 }
699
700 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
702 let id_a = self.new_hid();
703 let id_b = self.new_hid();
704 let fut = self.transaction(
705 ordinals::CREATE_SOCKET,
706 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
707 Responder::CreateSocket,
708 );
709
710 fuchsia_async::Task::spawn(async move {
711 if let Err(e) = fut.await {
712 log::debug!("FDomain socket creation failed: {e}");
713 }
714 })
715 .detach();
716
717 (
718 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
719 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
720 )
721 }
722
723 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
725 self.create_socket(proto::SocketType::Stream)
726 }
727
728 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
730 self.create_socket(proto::SocketType::Datagram)
731 }
732
733 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
735 let id_a = self.new_hid();
736 let id_b = self.new_hid();
737 let fut = self.transaction(
738 ordinals::CREATE_EVENT_PAIR,
739 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
740 Responder::CreateEventPair,
741 );
742
743 fuchsia_async::Task::spawn(async move {
744 if let Err(e) = fut.await {
745 log::debug!("FDomain event pair creation failed: {e}");
746 }
747 })
748 .detach();
749
750 (
751 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
752 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
753 )
754 }
755
756 pub fn create_event(self: &Arc<Self>) -> Event {
758 let id = self.new_hid();
759 let fut = self.transaction(
760 ordinals::CREATE_EVENT,
761 proto::EventCreateEventRequest { handle: id },
762 Responder::CreateEvent,
763 );
764
765 fuchsia_async::Task::spawn(async move {
766 if let Err(e) = fut.await {
767 log::debug!("FDomain event creation failed: {e}");
768 }
769 })
770 .detach();
771
772 Event(Handle { id: id.id, client: Arc::downgrade(self) })
773 }
774
775 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
777 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
782 }
783
784 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
790 self: &Arc<Self>,
791 ordinal: u64,
792 request: S,
793 f: F,
794 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
795 where
796 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
797 {
798 let mut inner = self.0.lock().unwrap();
799
800 let (sender, receiver) = futures::channel::oneshot::channel();
801 inner.request(ordinal, request, f(sender));
802 receiver.map(|x| x.expect("Oneshot went away without reply!"))
803 }
804
805 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
807 let mut inner = self.0.lock().unwrap();
808 if let Some(e) = inner.transport.error() {
809 return Err(e.into());
810 }
811
812 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
813 wakers: Vec::new(),
814 queued: VecDeque::new(),
815 is_streaming: false,
816 read_request_pending: false,
817 });
818
819 assert!(!state.is_streaming, "Initiated streaming twice!");
820 state.is_streaming = true;
821
822 inner.request(
823 ordinals::READ_SOCKET_STREAMING_START,
824 proto::SocketReadSocketStreamingStartRequest { handle: id },
825 Responder::Ignore,
826 );
827 Ok(())
828 }
829
830 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
834 let mut inner = self.0.lock().unwrap();
835 if let Some(state) = inner.socket_read_states.get_mut(&id) {
836 if state.is_streaming {
837 state.is_streaming = false;
838 let _ = inner.request(
840 ordinals::READ_SOCKET_STREAMING_STOP,
841 proto::ChannelReadChannelStreamingStopRequest { handle: id },
842 Responder::Ignore,
843 );
844 }
845 }
846 }
847
848 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
850 let mut inner = self.0.lock().unwrap();
851 if let Some(e) = inner.transport.error() {
852 return Err(e.into());
853 }
854 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
855 wakers: Vec::new(),
856 queued: VecDeque::new(),
857 is_streaming: false,
858 read_request_pending: false,
859 });
860
861 assert!(!state.is_streaming, "Initiated streaming twice!");
862 state.is_streaming = true;
863
864 inner.request(
865 ordinals::READ_CHANNEL_STREAMING_START,
866 proto::ChannelReadChannelStreamingStartRequest { handle: id },
867 Responder::Ignore,
868 );
869
870 Ok(())
871 }
872
873 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
877 let mut inner = self.0.lock().unwrap();
878 if let Some(state) = inner.channel_read_states.get_mut(&id) {
879 if state.is_streaming {
880 state.is_streaming = false;
881 let _ = inner.request(
883 ordinals::READ_CHANNEL_STREAMING_STOP,
884 proto::ChannelReadChannelStreamingStopRequest { handle: id },
885 Responder::Ignore,
886 );
887 }
888 }
889 }
890
891 pub(crate) fn poll_socket(
893 &self,
894 id: proto::HandleId,
895 ctx: &mut Context<'_>,
896 out: &mut [u8],
897 ) -> Poll<Result<usize, Error>> {
898 let mut inner = self.0.lock().unwrap();
899 if let Some(error) = inner.transport.error() {
900 return Poll::Ready(Err(error.into()));
901 }
902
903 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
904 wakers: Vec::new(),
905 queued: VecDeque::new(),
906 is_streaming: false,
907 read_request_pending: false,
908 });
909
910 if let Some(got) = state.queued.front_mut() {
911 match got.as_mut() {
912 Ok(data) => {
913 let read_size = std::cmp::min(data.data.len(), out.len());
914 out[..read_size].copy_from_slice(&data.data[..read_size]);
915
916 if data.data.len() > read_size && !data.is_datagram {
917 let _ = data.data.drain(..read_size);
918 } else {
919 let _ = state.queued.pop_front();
920 }
921
922 return Poll::Ready(Ok(read_size));
923 }
924 Err(_) => {
925 let err = state.queued.pop_front().unwrap().unwrap_err();
926 return Poll::Ready(Err(err));
927 }
928 }
929 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
930 state.wakers.push(ctx.waker().clone());
931 }
932
933 if !state.read_request_pending && !state.is_streaming {
934 inner.request(
935 ordinals::READ_SOCKET,
936 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
937 Responder::ReadSocket(id),
938 );
939 }
940
941 Poll::Pending
942 }
943
944 pub(crate) fn poll_channel(
946 &self,
947 id: proto::HandleId,
948 ctx: &mut Context<'_>,
949 for_stream: bool,
950 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
951 let mut inner = self.0.lock().unwrap();
952 if let Some(error) = inner.transport.error() {
953 return Poll::Ready(Some(Err(error.into())));
954 }
955
956 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
957 wakers: Vec::new(),
958 queued: VecDeque::new(),
959 is_streaming: false,
960 read_request_pending: false,
961 });
962
963 if let Some(got) = state.queued.pop_front() {
964 return Poll::Ready(Some(got));
965 } else if for_stream && !state.is_streaming {
966 return Poll::Ready(None);
967 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
968 state.wakers.push(ctx.waker().clone());
969 }
970
971 if !state.read_request_pending && !state.is_streaming {
972 inner.request(
973 ordinals::READ_CHANNEL,
974 proto::ChannelReadChannelRequest { handle: id },
975 Responder::ReadChannel(id),
976 );
977 }
978
979 Poll::Pending
980 }
981
982 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
984 let inner = self.0.lock().unwrap();
985 let Some(state) = inner.channel_read_states.get(&id) else {
986 return false;
987 };
988 state.is_streaming
989 }
990
991 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
994 let inner = self.0.lock().unwrap();
995 match handles {
996 proto::Handles::Handles(handles) => {
997 for handle in handles {
998 assert!(
999 !(inner.channel_read_states.contains_key(handle)
1000 || inner.socket_read_states.contains_key(handle)),
1001 "Tried to transfer handle after reading"
1002 );
1003 }
1004 }
1005 proto::Handles::Dispositions(dispositions) => {
1006 for disposition in dispositions {
1007 match &disposition.handle {
1008 proto::HandleOp::Move_(handle) => assert!(
1009 !(inner.channel_read_states.contains_key(handle)
1010 || inner.socket_read_states.contains_key(handle)),
1011 "Tried to transfer handle after reading"
1012 ),
1013 proto::HandleOp::Duplicate(_) => (),
1015 }
1016 }
1017 }
1018 }
1019 }
1020}