1use fidl_message::TransactionHeader;
6use futures::FutureExt;
7use futures::channel::oneshot::Sender as OneshotSender;
8use futures::stream::Stream as StreamTrait;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{Context, Poll, Waker, ready};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29pub mod fidl_next;
30
31use responder::Responder;
32
33pub use channel::{
34 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
35};
36pub use event::Event;
37pub use event_pair::Eventpair as EventPair;
38pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
39pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
40pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
41
42#[rustfmt::skip]
44pub use Handle as Fifo;
45#[rustfmt::skip]
46pub use Handle as Job;
47#[rustfmt::skip]
48pub use Handle as Process;
49#[rustfmt::skip]
50pub use Handle as Resource;
51#[rustfmt::skip]
52pub use Handle as Stream;
53#[rustfmt::skip]
54pub use Handle as Thread;
55#[rustfmt::skip]
56pub use Handle as Vmar;
57#[rustfmt::skip]
58pub use Handle as Vmo;
59
60use proto::f_domain_ordinals as ordinals;
61
62fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match error {
64 FDomainError::TargetError(e) => {
65 let e = zx_status::Status::from_raw(*e);
66 write!(f, "Target-side error {e}")
67 }
68 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
69 write!(f, "Tried to use invalid handle id {id}")
70 }
71 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
72 f,
73 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
74 ),
75 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
76 write!(f, "Handle is occupied delivering streaming reads")
77 }
78 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
79 write!(f, "No streaming read was in progress")
80 }
81 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
82 write!(
83 f,
84 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
85 )
86 }
87 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
88 if *same_call {
89 write!(f, "Tried to create two or more new handles with the same id {id}")
90 } else {
91 write!(
92 f,
93 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
94 )
95 }
96 }
97 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
98 write!(f, "Tried to write a channel into itself")
99 }
100 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
101 write!(f, "Handle closed while being read")
102 }
103 _ => todo!(),
104 }
105}
106
107pub type Result<T, E = Error> = std::result::Result<T, E>;
109
110#[derive(Clone)]
112pub enum Error {
113 SocketWrite(WriteSocketError),
114 ChannelWrite(WriteChannelError),
115 FDomain(FDomainError),
116 Protocol(::fidl::Error),
117 ProtocolObjectTypeIncompatible,
118 ProtocolRightsIncompatible,
119 ProtocolSignalsIncompatible,
120 ProtocolStreamEventIncompatible,
121 Transport(Option<Arc<std::io::Error>>),
122 ConnectionMismatch,
123 StreamingAborted,
124}
125
126impl std::fmt::Display for Error {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 match self {
129 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
130 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
131 write_fdomain_error(error, f)
132 }
133 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
134 write!(f, "While writing channel: ")?;
135 write_fdomain_error(error, f)
136 }
137 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
138 write!(f, "Couldn't write all handles into a channel:")?;
139 for (pos, error) in
140 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
141 {
142 write!(f, "\n Handle in position {pos}: ")?;
143 write_fdomain_error(error, f)?;
144 }
145 Ok(())
146 }
147 Self::ProtocolObjectTypeIncompatible => {
148 write!(f, "The FDomain protocol does not recognize an object type")
149 }
150 Self::ProtocolRightsIncompatible => {
151 write!(f, "The FDomain protocol does not recognize some rights")
152 }
153 Self::ProtocolSignalsIncompatible => {
154 write!(f, "The FDomain protocol does not recognize some signals")
155 }
156 Self::ProtocolStreamEventIncompatible => {
157 write!(f, "The FDomain protocol does not recognize a received streaming IO event")
158 }
159 Self::FDomain(e) => write_fdomain_error(e, f),
160 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
161 Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
162 Self::Transport(None) => write!(f, "Transport closed"),
163 Self::ConnectionMismatch => {
164 write!(f, "Tried to use an FDomain handle from a different connection")
165 }
166 Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
167 }
168 }
169}
170
171impl std::fmt::Debug for Error {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 match self {
174 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
175 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
176 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
177 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
178 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
179 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
180 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
181 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
182 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
183 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
184 Self::StreamingAborted => write!(f, "StreamingAborted"),
185 }
186 }
187}
188
189impl std::error::Error for Error {}
190
191impl From<FDomainError> for Error {
192 fn from(other: FDomainError) -> Self {
193 Self::FDomain(other)
194 }
195}
196
197impl From<::fidl::Error> for Error {
198 fn from(other: ::fidl::Error) -> Self {
199 Self::Protocol(other)
200 }
201}
202
203impl From<WriteSocketError> for Error {
204 fn from(other: WriteSocketError) -> Self {
205 Self::SocketWrite(other)
206 }
207}
208
209impl From<WriteChannelError> for Error {
210 fn from(other: WriteChannelError) -> Self {
211 Self::ChannelWrite(other)
212 }
213}
214
215#[derive(Clone)]
219enum InnerError {
220 Protocol(::fidl::Error),
221 ProtocolStreamEventIncompatible,
222 Transport(Option<Arc<std::io::Error>>),
223}
224
225impl From<InnerError> for Error {
226 fn from(other: InnerError) -> Self {
227 match other {
228 InnerError::Protocol(p) => Error::Protocol(p),
229 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
230 InnerError::Transport(t) => Error::Transport(t),
231 }
232 }
233}
234
235impl From<::fidl::Error> for InnerError {
236 fn from(other: ::fidl::Error) -> Self {
237 InnerError::Protocol(other)
238 }
239}
240
241pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
251 fn poll_send_message(
254 self: Pin<&mut Self>,
255 msg: &[u8],
256 ctx: &mut Context<'_>,
257 ) -> Poll<Result<(), Option<std::io::Error>>>;
258}
259
260enum Transport {
266 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
267 Error(InnerError),
268}
269
270impl Transport {
271 fn error(&self) -> Option<InnerError> {
273 match self {
274 Transport::Transport(_, _, _) => None,
275 Transport::Error(inner_error) => Some(inner_error.clone()),
276 }
277 }
278
279 fn push_msg(&mut self, msg: Box<[u8]>) {
281 if let Transport::Transport(_, v, w) = self {
282 v.push_back(msg);
283 w.drain(..).for_each(Waker::wake);
284 }
285 }
286
287 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
289 match self {
290 Transport::Error(e) => Poll::Ready(e.clone()),
291 Transport::Transport(t, v, w) => {
292 while let Some(msg) = v.front() {
293 match t.as_mut().poll_send_message(msg, ctx) {
294 Poll::Ready(Ok(())) => {
295 v.pop_front();
296 }
297 Poll::Ready(Err(e)) => {
298 let e = e.map(Arc::new);
299 *self = Transport::Error(InnerError::Transport(e.clone()));
300 return Poll::Ready(InnerError::Transport(e));
301 }
302 Poll::Pending => return Poll::Pending,
303 }
304 }
305
306 if v.is_empty() {
307 w.push(ctx.waker().clone());
308 } else {
309 ctx.waker().wake_by_ref();
310 }
311 Poll::Pending
312 }
313 }
314 }
315
316 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
318 match self {
319 Transport::Error(e) => Poll::Ready(Err(e.clone())),
320 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
321 Some(Ok(x)) => Poll::Ready(Ok(x)),
322 Some(Err(e)) => {
323 let e = Arc::new(e);
324 *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
325 Poll::Ready(Err(InnerError::Transport(Some(e))))
326 }
327 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
328 },
329 }
330 }
331}
332
333struct SocketReadState {
335 wakers: Vec<Waker>,
336 queued: VecDeque<Result<proto::SocketData, Error>>,
337 read_request_pending: bool,
338 is_streaming: bool,
339}
340
341impl SocketReadState {
342 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
345 self.queued.push_back(msg);
346 self.wakers.drain(..).for_each(Waker::wake);
347 }
348}
349
350struct ChannelReadState {
352 wakers: Vec<Waker>,
353 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
354 read_request_pending: bool,
355 is_streaming: bool,
356}
357
358impl ChannelReadState {
359 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
362 self.queued.push_back(msg);
363 self.wakers.drain(..).for_each(Waker::wake);
364 }
365}
366
367struct ClientInner {
369 transport: Transport,
370 transactions: HashMap<NonZeroU32, responder::Responder>,
371 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
372 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
373 next_tx_id: u32,
374 waiting_to_close: Vec<proto::HandleId>,
375 waiting_to_close_waker: Waker,
376}
377
378impl ClientInner {
379 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
381 let tx_id = self.next_tx_id;
382
383 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
384 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
385 self.next_tx_id += 1;
386 assert!(
387 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
388 "Allocated same tx id twice!"
389 );
390 self.transport.push_msg(msg.into());
391 }
392
393 fn try_poll_transport(
396 &mut self,
397 ctx: &mut Context<'_>,
398 ) -> Poll<Result<Infallible, InnerError>> {
399 if !self.waiting_to_close.is_empty() {
400 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
401 for handle in &handles {
404 let _ = self.channel_read_states.remove(handle);
405 let _ = self.socket_read_states.remove(handle);
406 }
407 self.request(
408 ordinals::CLOSE,
409 proto::FDomainCloseRequest { handles },
410 Responder::Ignore,
411 );
412 }
413
414 self.waiting_to_close_waker = ctx.waker().clone();
415
416 loop {
417 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
418 for state in std::mem::take(&mut self.socket_read_states).into_values() {
419 state.wakers.into_iter().for_each(Waker::wake);
420 }
421 for (_, state) in self.channel_read_states.drain() {
422 state.wakers.into_iter().for_each(Waker::wake);
423 }
424 return Poll::Ready(Err(e));
425 }
426 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
427 return Poll::Pending;
428 };
429 let data = result?;
430 let (header, data) = match fidl_message::decode_transaction_header(&data) {
431 Ok(x) => x,
432 Err(e) => {
433 self.transport = Transport::Error(InnerError::Protocol(e));
434 continue;
435 }
436 };
437
438 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
439 if let Err(e) = self.process_event(header, data) {
440 self.transport = Transport::Error(e);
441 }
442 continue;
443 };
444
445 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
446 match tx.handle(self, Ok((header, data))) {
447 Ok(x) => x,
448 Err(e) => {
449 self.transport = Transport::Error(InnerError::Protocol(e));
450 continue;
451 }
452 }
453 }
454 }
455
456 fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
458 match header.ordinal {
459 ordinals::ON_SOCKET_STREAMING_DATA => {
460 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
461 header, data,
462 )?;
463 let o =
464 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
465 wakers: Vec::new(),
466 queued: VecDeque::new(),
467 is_streaming: false,
468 read_request_pending: false,
469 });
470 match msg.socket_message {
471 proto::SocketMessage::Data(data) => {
472 o.handle_incoming_message(Ok(data));
473 Ok(())
474 }
475 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
476 if let Some(error) = error {
477 o.handle_incoming_message(Err(Error::FDomain(*error)));
478 }
479 o.is_streaming = false;
480 Ok(())
481 }
482 _ => Err(InnerError::ProtocolStreamEventIncompatible),
483 }
484 }
485 ordinals::ON_CHANNEL_STREAMING_DATA => {
486 let msg = fidl_message::decode_message::<
487 proto::ChannelOnChannelStreamingDataRequest,
488 >(header, data)?;
489 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
490 ChannelReadState {
491 wakers: Vec::new(),
492 queued: VecDeque::new(),
493 is_streaming: false,
494 read_request_pending: false,
495 }
496 });
497 match msg.channel_sent {
498 proto::ChannelSent::Message(data) => {
499 o.handle_incoming_message(Ok(data));
500 Ok(())
501 }
502 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
503 if let Some(error) = error {
504 o.handle_incoming_message(Err(Error::FDomain(*error)));
505 }
506 o.is_streaming = false;
507 Ok(())
508 }
509 _ => Err(InnerError::ProtocolStreamEventIncompatible),
510 }
511 }
512 _ => Err(::fidl::Error::UnknownOrdinal {
513 ordinal: header.ordinal,
514 protocol_name:
515 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
516 }
517 .into()),
518 }
519 }
520
521 fn poll_transport(&mut self, ctx: &mut Context<'_>) {
525 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
526 for (_, v) in std::mem::take(&mut self.transactions) {
527 let _ = v.handle(self, Err(e.clone()));
528 }
529 }
530 }
531
532 pub(crate) fn handle_socket_read_response(
534 &mut self,
535 msg: Result<proto::SocketData, Error>,
536 id: proto::HandleId,
537 ) {
538 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
539 wakers: Vec::new(),
540 queued: VecDeque::new(),
541 is_streaming: false,
542 read_request_pending: false,
543 });
544 state.handle_incoming_message(msg);
545 state.read_request_pending = false;
546 }
547
548 pub(crate) fn handle_channel_read_response(
550 &mut self,
551 msg: Result<proto::ChannelMessage, Error>,
552 id: proto::HandleId,
553 ) {
554 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
555 wakers: Vec::new(),
556 queued: VecDeque::new(),
557 is_streaming: false,
558 read_request_pending: false,
559 });
560 state.handle_incoming_message(msg);
561 state.read_request_pending = false;
562 }
563}
564
565impl Drop for ClientInner {
566 fn drop(&mut self) {
567 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
568 for responder in responders {
569 let _ = responder.handle(self, Err(InnerError::Transport(None)));
570 }
571 for state in self.channel_read_states.values_mut() {
572 state.wakers.drain(..).for_each(Waker::wake);
573 }
574 for state in self.socket_read_states.values_mut() {
575 state.wakers.drain(..).for_each(Waker::wake);
576 }
577 }
578}
579
580pub struct Client(pub(crate) Mutex<ClientInner>);
587
588impl std::fmt::Debug for Client {
589 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
590 f.debug_tuple("Client").field(&"...").finish()
591 }
592}
593
594pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
598 Arc::new(Client(Mutex::new(ClientInner {
599 transport: Transport::Error(InnerError::Transport(None)),
600 transactions: HashMap::new(),
601 channel_read_states: HashMap::new(),
602 socket_read_states: HashMap::new(),
603 next_tx_id: 1,
604 waiting_to_close: Vec::new(),
605 waiting_to_close_waker: futures::task::noop_waker(),
606 })))
607});
608
609impl Client {
610 pub fn new(
617 transport: impl FDomainTransport + 'static,
618 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
619 let ret = Arc::new(Client(Mutex::new(ClientInner {
620 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
621 transactions: HashMap::new(),
622 socket_read_states: HashMap::new(),
623 channel_read_states: HashMap::new(),
624 next_tx_id: 1,
625 waiting_to_close: Vec::new(),
626 waiting_to_close_waker: futures::task::noop_waker(),
627 })));
628
629 let client_weak = Arc::downgrade(&ret);
630 let fut = futures::future::poll_fn(move |ctx| {
631 let Some(client) = client_weak.upgrade() else {
632 return Poll::Ready(());
633 };
634
635 client.0.lock().unwrap().poll_transport(ctx);
636 Poll::Pending
637 });
638
639 (ret, fut)
640 }
641
642 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
644 let new_handle = self.new_hid();
645 self.transaction(
646 ordinals::GET_NAMESPACE,
647 proto::FDomainGetNamespaceRequest { new_handle },
648 Responder::Namespace,
649 )
650 .await?;
651 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
652 }
653
654 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
656 let id_a = self.new_hid();
657 let id_b = self.new_hid();
658 let fut = self.transaction(
659 ordinals::CREATE_CHANNEL,
660 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
661 Responder::CreateChannel,
662 );
663
664 fuchsia_async::Task::spawn(async move {
665 if let Err(e) = fut.await {
666 log::debug!("FDomain channel creation failed: {e}");
667 }
668 })
669 .detach();
670
671 (
672 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
673 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
674 )
675 }
676
677 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
679 self: &Arc<Self>,
680 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
681 let (client, server) = self.create_channel();
682 let client_end = crate::fidl::ClientEnd::<F>::new(client);
683 let server_end = crate::fidl::ServerEnd::new(server);
684 (client_end, server_end)
685 }
686
687 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
689 self: &Arc<Self>,
690 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
691 let (client_end, server_end) = self.create_endpoints::<F>();
692 (client_end.into_proxy(), server_end)
693 }
694
695 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
697 self: &Arc<Self>,
698 ) -> (F::Proxy, F::RequestStream) {
699 let (client_end, server_end) = self.create_endpoints::<F>();
700 (client_end.into_proxy(), server_end.into_stream())
701 }
702
703 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
705 let id_a = self.new_hid();
706 let id_b = self.new_hid();
707 let fut = self.transaction(
708 ordinals::CREATE_SOCKET,
709 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
710 Responder::CreateSocket,
711 );
712
713 fuchsia_async::Task::spawn(async move {
714 if let Err(e) = fut.await {
715 log::debug!("FDomain socket creation failed: {e}");
716 }
717 })
718 .detach();
719
720 (
721 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
722 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
723 )
724 }
725
726 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
728 self.create_socket(proto::SocketType::Stream)
729 }
730
731 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
733 self.create_socket(proto::SocketType::Datagram)
734 }
735
736 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
738 let id_a = self.new_hid();
739 let id_b = self.new_hid();
740 let fut = self.transaction(
741 ordinals::CREATE_EVENT_PAIR,
742 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
743 Responder::CreateEventPair,
744 );
745
746 fuchsia_async::Task::spawn(async move {
747 if let Err(e) = fut.await {
748 log::debug!("FDomain event pair creation failed: {e}");
749 }
750 })
751 .detach();
752
753 (
754 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
755 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
756 )
757 }
758
759 pub fn create_event(self: &Arc<Self>) -> Event {
761 let id = self.new_hid();
762 let fut = self.transaction(
763 ordinals::CREATE_EVENT,
764 proto::EventCreateEventRequest { handle: id },
765 Responder::CreateEvent,
766 );
767
768 fuchsia_async::Task::spawn(async move {
769 if let Err(e) = fut.await {
770 log::debug!("FDomain event creation failed: {e}");
771 }
772 })
773 .detach();
774
775 Event(Handle { id: id.id, client: Arc::downgrade(self) })
776 }
777
778 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
780 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
785 }
786
787 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
793 self: &Arc<Self>,
794 ordinal: u64,
795 request: S,
796 f: F,
797 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
798 where
799 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
800 {
801 let mut inner = self.0.lock().unwrap();
802
803 let (sender, receiver) = futures::channel::oneshot::channel();
804 inner.request(ordinal, request, f(sender));
805 receiver.map(|x| x.expect("Oneshot went away without reply!"))
806 }
807
808 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
810 let mut inner = self.0.lock().unwrap();
811 if let Some(e) = inner.transport.error() {
812 return Err(e.into());
813 }
814
815 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
816 wakers: Vec::new(),
817 queued: VecDeque::new(),
818 is_streaming: false,
819 read_request_pending: false,
820 });
821
822 assert!(!state.is_streaming, "Initiated streaming twice!");
823 state.is_streaming = true;
824
825 inner.request(
826 ordinals::READ_SOCKET_STREAMING_START,
827 proto::SocketReadSocketStreamingStartRequest { handle: id },
828 Responder::Ignore,
829 );
830 Ok(())
831 }
832
833 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
837 let mut inner = self.0.lock().unwrap();
838 if let Some(state) = inner.socket_read_states.get_mut(&id) {
839 if state.is_streaming {
840 state.is_streaming = false;
841 let _ = inner.request(
843 ordinals::READ_SOCKET_STREAMING_STOP,
844 proto::ChannelReadChannelStreamingStopRequest { handle: id },
845 Responder::Ignore,
846 );
847 }
848 }
849 }
850
851 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
853 let mut inner = self.0.lock().unwrap();
854 if let Some(e) = inner.transport.error() {
855 return Err(e.into());
856 }
857 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
858 wakers: Vec::new(),
859 queued: VecDeque::new(),
860 is_streaming: false,
861 read_request_pending: false,
862 });
863
864 assert!(!state.is_streaming, "Initiated streaming twice!");
865 state.is_streaming = true;
866
867 inner.request(
868 ordinals::READ_CHANNEL_STREAMING_START,
869 proto::ChannelReadChannelStreamingStartRequest { handle: id },
870 Responder::Ignore,
871 );
872
873 Ok(())
874 }
875
876 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
880 let mut inner = self.0.lock().unwrap();
881 if let Some(state) = inner.channel_read_states.get_mut(&id) {
882 if state.is_streaming {
883 state.is_streaming = false;
884 let _ = inner.request(
886 ordinals::READ_CHANNEL_STREAMING_STOP,
887 proto::ChannelReadChannelStreamingStopRequest { handle: id },
888 Responder::Ignore,
889 );
890 }
891 }
892 }
893
894 pub(crate) fn poll_socket(
896 &self,
897 id: proto::HandleId,
898 ctx: &mut Context<'_>,
899 out: &mut [u8],
900 ) -> Poll<Result<usize, Error>> {
901 let mut inner = self.0.lock().unwrap();
902 if let Some(error) = inner.transport.error() {
903 return Poll::Ready(Err(error.into()));
904 }
905
906 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
907 wakers: Vec::new(),
908 queued: VecDeque::new(),
909 is_streaming: false,
910 read_request_pending: false,
911 });
912
913 if let Some(got) = state.queued.front_mut() {
914 match got.as_mut() {
915 Ok(data) => {
916 let read_size = std::cmp::min(data.data.len(), out.len());
917 out[..read_size].copy_from_slice(&data.data[..read_size]);
918
919 if data.data.len() > read_size && !data.is_datagram {
920 let _ = data.data.drain(..read_size);
921 } else {
922 let _ = state.queued.pop_front();
923 }
924
925 return Poll::Ready(Ok(read_size));
926 }
927 Err(_) => {
928 let err = state.queued.pop_front().unwrap().unwrap_err();
929 return Poll::Ready(Err(err));
930 }
931 }
932 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
933 state.wakers.push(ctx.waker().clone());
934 }
935
936 if !state.read_request_pending && !state.is_streaming {
937 inner.request(
938 ordinals::READ_SOCKET,
939 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
940 Responder::ReadSocket(id),
941 );
942 }
943
944 Poll::Pending
945 }
946
947 pub(crate) fn poll_channel(
949 &self,
950 id: proto::HandleId,
951 ctx: &mut Context<'_>,
952 for_stream: bool,
953 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
954 let mut inner = self.0.lock().unwrap();
955 if let Some(error) = inner.transport.error() {
956 return Poll::Ready(Some(Err(error.into())));
957 }
958
959 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
960 wakers: Vec::new(),
961 queued: VecDeque::new(),
962 is_streaming: false,
963 read_request_pending: false,
964 });
965
966 if let Some(got) = state.queued.pop_front() {
967 return Poll::Ready(Some(got));
968 } else if for_stream && !state.is_streaming {
969 return Poll::Ready(None);
970 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
971 state.wakers.push(ctx.waker().clone());
972 }
973
974 if !state.read_request_pending && !state.is_streaming {
975 inner.request(
976 ordinals::READ_CHANNEL,
977 proto::ChannelReadChannelRequest { handle: id },
978 Responder::ReadChannel(id),
979 );
980 }
981
982 Poll::Pending
983 }
984
985 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
987 let inner = self.0.lock().unwrap();
988 let Some(state) = inner.channel_read_states.get(&id) else {
989 return false;
990 };
991 state.is_streaming
992 }
993
994 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
997 let inner = self.0.lock().unwrap();
998 match handles {
999 proto::Handles::Handles(handles) => {
1000 for handle in handles {
1001 assert!(
1002 !(inner.channel_read_states.contains_key(handle)
1003 || inner.socket_read_states.contains_key(handle)),
1004 "Tried to transfer handle after reading"
1005 );
1006 }
1007 }
1008 proto::Handles::Dispositions(dispositions) => {
1009 for disposition in dispositions {
1010 match &disposition.handle {
1011 proto::HandleOp::Move_(handle) => assert!(
1012 !(inner.channel_read_states.contains_key(handle)
1013 || inner.socket_read_states.contains_key(handle)),
1014 "Tried to transfer handle after reading"
1015 ),
1016 proto::HandleOp::Duplicate(_) => (),
1018 }
1019 }
1020 }
1021 }
1022 }
1023}