1use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46#[rustfmt::skip]
48pub use Handle as Fifo;
49#[rustfmt::skip]
50pub use Handle as Job;
51#[rustfmt::skip]
52pub use Handle as Process;
53#[rustfmt::skip]
54pub use Handle as Resource;
55#[rustfmt::skip]
56pub use Handle as Stream;
57#[rustfmt::skip]
58pub use Handle as Thread;
59#[rustfmt::skip]
60pub use Handle as Vmar;
61#[rustfmt::skip]
62pub use Handle as Vmo;
63
64use proto::f_domain_ordinals as ordinals;
65
66fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match error {
68 FDomainError::TargetError(e) => {
69 let e = zx_status::Status::from_raw(*e);
70 write!(f, "Target-side error {e}")
71 }
72 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
73 write!(f, "Tried to use invalid handle id {id}")
74 }
75 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
76 f,
77 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
78 ),
79 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
80 write!(f, "Handle is occupied delivering streaming reads")
81 }
82 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
83 write!(f, "No streaming read was in progress")
84 }
85 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
86 write!(
87 f,
88 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
89 )
90 }
91 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
92 if *same_call {
93 write!(f, "Tried to create two or more new handles with the same id {id}")
94 } else {
95 write!(
96 f,
97 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
98 )
99 }
100 }
101 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
102 write!(f, "Tried to write a channel into itself")
103 }
104 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
105 write!(f, "Handle closed while being read")
106 }
107 _ => todo!(),
108 }
109}
110
111pub type Result<T, E = Error> = std::result::Result<T, E>;
113
114#[derive(Clone)]
116pub enum Error {
117 SocketWrite(WriteSocketError),
118 ChannelWrite(WriteChannelError),
119 FDomain(FDomainError),
120 Protocol(::fidl::Error),
121 ProtocolObjectTypeIncompatible,
122 ProtocolRightsIncompatible,
123 ProtocolSignalsIncompatible,
124 ProtocolStreamEventIncompatible,
125 Transport(Option<Arc<std::io::Error>>),
126 ConnectionMismatch,
127 StreamingAborted,
128}
129
130impl std::fmt::Display for Error {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
134 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
135 write_fdomain_error(error, f)
136 }
137 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
138 write!(f, "While writing channel: ")?;
139 write_fdomain_error(error, f)
140 }
141 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
142 write!(f, "Couldn't write all handles into a channel:")?;
143 for (pos, error) in
144 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
145 {
146 write!(f, "\n Handle in position {pos}: ")?;
147 write_fdomain_error(error, f)?;
148 }
149 Ok(())
150 }
151 Self::ProtocolObjectTypeIncompatible => {
152 write!(f, "The FDomain protocol does not recognize an object type")
153 }
154 Self::ProtocolRightsIncompatible => {
155 write!(f, "The FDomain protocol does not recognize some rights")
156 }
157 Self::ProtocolSignalsIncompatible => {
158 write!(f, "The FDomain protocol does not recognize some signals")
159 }
160 Self::ProtocolStreamEventIncompatible => {
161 write!(f, "The FDomain protocol does not recognize a received streaming IO event")
162 }
163 Self::FDomain(e) => write_fdomain_error(e, f),
164 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
165 Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
166 Self::Transport(None) => write!(f, "Transport closed"),
167 Self::ConnectionMismatch => {
168 write!(f, "Tried to use an FDomain handle from a different connection")
169 }
170 Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
171 }
172 }
173}
174
175impl std::fmt::Debug for Error {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self {
178 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
179 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
180 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
181 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
182 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
183 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
184 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
185 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
186 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
187 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
188 Self::StreamingAborted => write!(f, "StreamingAborted"),
189 }
190 }
191}
192
193impl std::error::Error for Error {}
194
195impl From<FDomainError> for Error {
196 fn from(other: FDomainError) -> Self {
197 Self::FDomain(other)
198 }
199}
200
201impl From<::fidl::Error> for Error {
202 fn from(other: ::fidl::Error) -> Self {
203 Self::Protocol(other)
204 }
205}
206
207impl From<WriteSocketError> for Error {
208 fn from(other: WriteSocketError) -> Self {
209 Self::SocketWrite(other)
210 }
211}
212
213impl From<WriteChannelError> for Error {
214 fn from(other: WriteChannelError) -> Self {
215 Self::ChannelWrite(other)
216 }
217}
218
219#[derive(Clone)]
223enum InnerError {
224 Protocol(::fidl::Error),
225 ProtocolStreamEventIncompatible,
226 Transport(Option<Arc<std::io::Error>>),
227}
228
229impl From<InnerError> for Error {
230 fn from(other: InnerError) -> Self {
231 match other {
232 InnerError::Protocol(p) => Error::Protocol(p),
233 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
234 InnerError::Transport(t) => Error::Transport(t),
235 }
236 }
237}
238
239impl From<::fidl::Error> for InnerError {
240 fn from(other: ::fidl::Error) -> Self {
241 InnerError::Protocol(other)
242 }
243}
244
245pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
255 fn poll_send_message(
258 self: Pin<&mut Self>,
259 msg: &[u8],
260 ctx: &mut Context<'_>,
261 ) -> Poll<Result<(), Option<std::io::Error>>>;
262}
263
264enum Transport {
270 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
271 Error(InnerError),
272}
273
274impl Transport {
275 fn error(&self) -> Option<InnerError> {
277 match self {
278 Transport::Transport(_, _, _) => None,
279 Transport::Error(inner_error) => Some(inner_error.clone()),
280 }
281 }
282
283 fn push_msg(&mut self, msg: Box<[u8]>) {
285 if let Transport::Transport(_, v, w) = self {
286 v.push_back(msg);
287 w.drain(..).for_each(Waker::wake);
288 }
289 }
290
291 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
293 match self {
294 Transport::Error(e) => Poll::Ready(e.clone()),
295 Transport::Transport(t, v, w) => {
296 while let Some(msg) = v.front() {
297 match t.as_mut().poll_send_message(msg, ctx) {
298 Poll::Ready(Ok(())) => {
299 v.pop_front();
300 }
301 Poll::Ready(Err(e)) => {
302 let e = e.map(Arc::new);
303 *self = Transport::Error(InnerError::Transport(e.clone()));
304 return Poll::Ready(InnerError::Transport(e));
305 }
306 Poll::Pending => return Poll::Pending,
307 }
308 }
309
310 if v.is_empty() {
311 w.push(ctx.waker().clone());
312 } else {
313 ctx.waker().wake_by_ref();
314 }
315 Poll::Pending
316 }
317 }
318 }
319
320 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
322 match self {
323 Transport::Error(e) => Poll::Ready(Err(e.clone())),
324 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
325 Some(Ok(x)) => Poll::Ready(Ok(x)),
326 Some(Err(e)) => {
327 let e = Arc::new(e);
328 *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
329 Poll::Ready(Err(InnerError::Transport(Some(e))))
330 }
331 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
332 },
333 }
334 }
335}
336
337struct SocketReadState {
339 wakers: Vec<Waker>,
340 queued: VecDeque<Result<proto::SocketData, Error>>,
341 read_request_pending: bool,
342 is_streaming: bool,
343}
344
345impl SocketReadState {
346 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
349 self.queued.push_back(msg);
350 self.wakers.drain(..).for_each(Waker::wake);
351 }
352}
353
354struct ChannelReadState {
356 wakers: Vec<Waker>,
357 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
358 read_request_pending: bool,
359 is_streaming: bool,
360}
361
362impl ChannelReadState {
363 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
366 self.queued.push_back(msg);
367 self.wakers.drain(..).for_each(Waker::wake);
368 }
369}
370
371struct ClientInner {
373 transport: Transport,
374 transactions: HashMap<NonZeroU32, responder::Responder>,
375 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
376 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
377 next_tx_id: u32,
378 waiting_to_close: Vec<proto::HandleId>,
379 waiting_to_close_waker: Waker,
380}
381
382impl ClientInner {
383 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
385 let tx_id = self.next_tx_id;
386
387 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
388 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
389 self.next_tx_id += 1;
390 assert!(
391 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
392 "Allocated same tx id twice!"
393 );
394 self.transport.push_msg(msg.into());
395 }
396
397 fn try_poll_transport(
400 &mut self,
401 ctx: &mut Context<'_>,
402 ) -> Poll<Result<Infallible, InnerError>> {
403 if !self.waiting_to_close.is_empty() {
404 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
405 for handle in &handles {
408 let _ = self.channel_read_states.remove(handle);
409 let _ = self.socket_read_states.remove(handle);
410 }
411 self.request(
412 ordinals::CLOSE,
413 proto::FDomainCloseRequest { handles },
414 Responder::Ignore,
415 );
416 }
417
418 self.waiting_to_close_waker = ctx.waker().clone();
419
420 loop {
421 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
422 for state in std::mem::take(&mut self.socket_read_states).into_values() {
423 state.wakers.into_iter().for_each(Waker::wake);
424 }
425 for (_, state) in self.channel_read_states.drain() {
426 state.wakers.into_iter().for_each(Waker::wake);
427 }
428 return Poll::Ready(Err(e));
429 }
430 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
431 return Poll::Pending;
432 };
433 let data = result?;
434 let (header, data) = match fidl_message::decode_transaction_header(&data) {
435 Ok(x) => x,
436 Err(e) => {
437 self.transport = Transport::Error(InnerError::Protocol(e));
438 continue;
439 }
440 };
441
442 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
443 if let Err(e) = self.process_event(header, data) {
444 self.transport = Transport::Error(e);
445 }
446 continue;
447 };
448
449 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
450 match tx.handle(self, Ok((header, data))) {
451 Ok(x) => x,
452 Err(e) => {
453 self.transport = Transport::Error(InnerError::Protocol(e));
454 continue;
455 }
456 }
457 }
458 }
459
460 fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
462 match header.ordinal {
463 ordinals::ON_SOCKET_STREAMING_DATA => {
464 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
465 header, data,
466 )?;
467 let o =
468 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
469 wakers: Vec::new(),
470 queued: VecDeque::new(),
471 is_streaming: false,
472 read_request_pending: false,
473 });
474 match msg.socket_message {
475 proto::SocketMessage::Data(data) => {
476 o.handle_incoming_message(Ok(data));
477 Ok(())
478 }
479 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
480 if let Some(error) = error {
481 o.handle_incoming_message(Err(Error::FDomain(*error)));
482 }
483 o.is_streaming = false;
484 Ok(())
485 }
486 _ => Err(InnerError::ProtocolStreamEventIncompatible),
487 }
488 }
489 ordinals::ON_CHANNEL_STREAMING_DATA => {
490 let msg = fidl_message::decode_message::<
491 proto::ChannelOnChannelStreamingDataRequest,
492 >(header, data)?;
493 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
494 ChannelReadState {
495 wakers: Vec::new(),
496 queued: VecDeque::new(),
497 is_streaming: false,
498 read_request_pending: false,
499 }
500 });
501 match msg.channel_sent {
502 proto::ChannelSent::Message(data) => {
503 o.handle_incoming_message(Ok(data));
504 Ok(())
505 }
506 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
507 if let Some(error) = error {
508 o.handle_incoming_message(Err(Error::FDomain(*error)));
509 }
510 o.is_streaming = false;
511 Ok(())
512 }
513 _ => Err(InnerError::ProtocolStreamEventIncompatible),
514 }
515 }
516 _ => Err(::fidl::Error::UnknownOrdinal {
517 ordinal: header.ordinal,
518 protocol_name:
519 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
520 }
521 .into()),
522 }
523 }
524
525 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
529 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
530 for (_, v) in std::mem::take(&mut self.transactions) {
531 let _ = v.handle(self, Err(e.clone()));
532 }
533
534 Poll::Ready(())
535 } else {
536 Poll::Pending
537 }
538 }
539
540 pub(crate) fn handle_socket_read_response(
542 &mut self,
543 msg: Result<proto::SocketData, Error>,
544 id: proto::HandleId,
545 ) {
546 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
547 wakers: Vec::new(),
548 queued: VecDeque::new(),
549 is_streaming: false,
550 read_request_pending: false,
551 });
552 state.handle_incoming_message(msg);
553 state.read_request_pending = false;
554 }
555
556 pub(crate) fn handle_channel_read_response(
558 &mut self,
559 msg: Result<proto::ChannelMessage, Error>,
560 id: proto::HandleId,
561 ) {
562 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
563 wakers: Vec::new(),
564 queued: VecDeque::new(),
565 is_streaming: false,
566 read_request_pending: false,
567 });
568 state.handle_incoming_message(msg);
569 state.read_request_pending = false;
570 }
571}
572
573impl Drop for ClientInner {
574 fn drop(&mut self) {
575 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
576 for responder in responders {
577 let _ = responder.handle(self, Err(InnerError::Transport(None)));
578 }
579 for state in self.channel_read_states.values_mut() {
580 state.wakers.drain(..).for_each(Waker::wake);
581 }
582 for state in self.socket_read_states.values_mut() {
583 state.wakers.drain(..).for_each(Waker::wake);
584 }
585 }
586}
587
588pub struct Client(pub(crate) Mutex<ClientInner>);
595
596impl std::fmt::Debug for Client {
597 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598 f.debug_tuple("Client").field(&"...").finish()
599 }
600}
601
602pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
606 Arc::new(Client(Mutex::new(ClientInner {
607 transport: Transport::Error(InnerError::Transport(None)),
608 transactions: HashMap::new(),
609 channel_read_states: HashMap::new(),
610 socket_read_states: HashMap::new(),
611 next_tx_id: 1,
612 waiting_to_close: Vec::new(),
613 waiting_to_close_waker: std::task::Waker::noop().clone(),
614 })))
615});
616
617impl Client {
618 pub fn new(
625 transport: impl FDomainTransport + 'static,
626 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
627 let ret = Arc::new(Client(Mutex::new(ClientInner {
628 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
629 transactions: HashMap::new(),
630 socket_read_states: HashMap::new(),
631 channel_read_states: HashMap::new(),
632 next_tx_id: 1,
633 waiting_to_close: Vec::new(),
634 waiting_to_close_waker: std::task::Waker::noop().clone(),
635 })));
636
637 let client_weak = Arc::downgrade(&ret);
638 let fut = futures::future::poll_fn(move |ctx| {
639 let Some(client) = client_weak.upgrade() else {
640 return Poll::Ready(());
641 };
642
643 client.0.lock().poll_transport(ctx)
644 });
645
646 (ret, fut)
647 }
648
649 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
651 let new_handle = self.new_hid();
652 self.transaction(
653 ordinals::GET_NAMESPACE,
654 proto::FDomainGetNamespaceRequest { new_handle },
655 Responder::Namespace,
656 )
657 .await?;
658 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
659 }
660
661 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
663 let id_a = self.new_hid();
664 let id_b = self.new_hid();
665 let fut = self.transaction(
666 ordinals::CREATE_CHANNEL,
667 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
668 Responder::CreateChannel,
669 );
670
671 fuchsia_async::Task::spawn(async move {
672 if let Err(e) = fut.await {
673 log::debug!("FDomain channel creation failed: {e}");
674 }
675 })
676 .detach();
677
678 (
679 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
680 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
681 )
682 }
683
684 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
686 self: &Arc<Self>,
687 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
688 let (client, server) = self.create_channel();
689 let client_end = crate::fidl::ClientEnd::<F>::new(client);
690 let server_end = crate::fidl::ServerEnd::new(server);
691 (client_end, server_end)
692 }
693
694 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
696 self: &Arc<Self>,
697 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
698 let (client_end, server_end) = self.create_endpoints::<F>();
699 (client_end.into_proxy(), server_end)
700 }
701
702 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
704 self: &Arc<Self>,
705 ) -> (F::Proxy, F::RequestStream) {
706 let (client_end, server_end) = self.create_endpoints::<F>();
707 (client_end.into_proxy(), server_end.into_stream())
708 }
709
710 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
712 self: &Arc<Self>,
713 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
714 let (client_end, server_end) = self.create_endpoints::<F>();
715 (client_end, server_end.into_stream())
716 }
717
718 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
720 let id_a = self.new_hid();
721 let id_b = self.new_hid();
722 let fut = self.transaction(
723 ordinals::CREATE_SOCKET,
724 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
725 Responder::CreateSocket,
726 );
727
728 fuchsia_async::Task::spawn(async move {
729 if let Err(e) = fut.await {
730 log::debug!("FDomain socket creation failed: {e}");
731 }
732 })
733 .detach();
734
735 (
736 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
737 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
738 )
739 }
740
741 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
743 self.create_socket(proto::SocketType::Stream)
744 }
745
746 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
748 self.create_socket(proto::SocketType::Datagram)
749 }
750
751 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
753 let id_a = self.new_hid();
754 let id_b = self.new_hid();
755 let fut = self.transaction(
756 ordinals::CREATE_EVENT_PAIR,
757 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
758 Responder::CreateEventPair,
759 );
760
761 fuchsia_async::Task::spawn(async move {
762 if let Err(e) = fut.await {
763 log::debug!("FDomain event pair creation failed: {e}");
764 }
765 })
766 .detach();
767
768 (
769 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
770 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
771 )
772 }
773
774 pub fn create_event(self: &Arc<Self>) -> Event {
776 let id = self.new_hid();
777 let fut = self.transaction(
778 ordinals::CREATE_EVENT,
779 proto::EventCreateEventRequest { handle: id },
780 Responder::CreateEvent,
781 );
782
783 fuchsia_async::Task::spawn(async move {
784 if let Err(e) = fut.await {
785 log::debug!("FDomain event creation failed: {e}");
786 }
787 })
788 .detach();
789
790 Event(Handle { id: id.id, client: Arc::downgrade(self) })
791 }
792
793 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
795 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
800 }
801
802 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
808 self: &Arc<Self>,
809 ordinal: u64,
810 request: S,
811 f: F,
812 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
813 where
814 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
815 {
816 let mut inner = self.0.lock();
817
818 let (sender, receiver) = futures::channel::oneshot::channel();
819 inner.request(ordinal, request, f(sender));
820 receiver.map(|x| x.expect("Oneshot went away without reply!"))
821 }
822
823 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
825 let mut inner = self.0.lock();
826 if let Some(e) = inner.transport.error() {
827 return Err(e.into());
828 }
829
830 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
831 wakers: Vec::new(),
832 queued: VecDeque::new(),
833 is_streaming: false,
834 read_request_pending: false,
835 });
836
837 assert!(!state.is_streaming, "Initiated streaming twice!");
838 state.is_streaming = true;
839
840 inner.request(
841 ordinals::READ_SOCKET_STREAMING_START,
842 proto::SocketReadSocketStreamingStartRequest { handle: id },
843 Responder::Ignore,
844 );
845 Ok(())
846 }
847
848 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
852 let mut inner = self.0.lock();
853 if let Some(state) = inner.socket_read_states.get_mut(&id) {
854 if state.is_streaming {
855 state.is_streaming = false;
856 let _ = inner.request(
858 ordinals::READ_SOCKET_STREAMING_STOP,
859 proto::ChannelReadChannelStreamingStopRequest { handle: id },
860 Responder::Ignore,
861 );
862 }
863 }
864 }
865
866 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
868 let mut inner = self.0.lock();
869 if let Some(e) = inner.transport.error() {
870 return Err(e.into());
871 }
872 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
873 wakers: Vec::new(),
874 queued: VecDeque::new(),
875 is_streaming: false,
876 read_request_pending: false,
877 });
878
879 assert!(!state.is_streaming, "Initiated streaming twice!");
880 state.is_streaming = true;
881
882 inner.request(
883 ordinals::READ_CHANNEL_STREAMING_START,
884 proto::ChannelReadChannelStreamingStartRequest { handle: id },
885 Responder::Ignore,
886 );
887
888 Ok(())
889 }
890
891 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
895 let mut inner = self.0.lock();
896 if let Some(state) = inner.channel_read_states.get_mut(&id) {
897 if state.is_streaming {
898 state.is_streaming = false;
899 let _ = inner.request(
901 ordinals::READ_CHANNEL_STREAMING_STOP,
902 proto::ChannelReadChannelStreamingStopRequest { handle: id },
903 Responder::Ignore,
904 );
905 }
906 }
907 }
908
909 pub(crate) fn poll_socket(
911 &self,
912 id: proto::HandleId,
913 ctx: &mut Context<'_>,
914 out: &mut [u8],
915 ) -> Poll<Result<usize, Error>> {
916 let mut inner = self.0.lock();
917 if let Some(error) = inner.transport.error() {
918 return Poll::Ready(Err(error.into()));
919 }
920
921 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
922 wakers: Vec::new(),
923 queued: VecDeque::new(),
924 is_streaming: false,
925 read_request_pending: false,
926 });
927
928 if let Some(got) = state.queued.front_mut() {
929 match got.as_mut() {
930 Ok(data) => {
931 let read_size = std::cmp::min(data.data.len(), out.len());
932 out[..read_size].copy_from_slice(&data.data[..read_size]);
933
934 if data.data.len() > read_size && !data.is_datagram {
935 let _ = data.data.drain(..read_size);
936 } else {
937 let _ = state.queued.pop_front();
938 }
939
940 return Poll::Ready(Ok(read_size));
941 }
942 Err(_) => {
943 let err = state.queued.pop_front().unwrap().unwrap_err();
944 return Poll::Ready(Err(err));
945 }
946 }
947 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
948 state.wakers.push(ctx.waker().clone());
949 }
950
951 if !state.read_request_pending && !state.is_streaming {
952 inner.request(
953 ordinals::READ_SOCKET,
954 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
955 Responder::ReadSocket(id),
956 );
957 }
958
959 Poll::Pending
960 }
961
962 pub(crate) fn poll_channel(
964 &self,
965 id: proto::HandleId,
966 ctx: &mut Context<'_>,
967 for_stream: bool,
968 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
969 let mut inner = self.0.lock();
970 if let Some(error) = inner.transport.error() {
971 return Poll::Ready(Some(Err(error.into())));
972 }
973
974 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
975 wakers: Vec::new(),
976 queued: VecDeque::new(),
977 is_streaming: false,
978 read_request_pending: false,
979 });
980
981 if let Some(got) = state.queued.pop_front() {
982 return Poll::Ready(Some(got));
983 } else if for_stream && !state.is_streaming {
984 return Poll::Ready(None);
985 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
986 state.wakers.push(ctx.waker().clone());
987 }
988
989 if !state.read_request_pending && !state.is_streaming {
990 inner.request(
991 ordinals::READ_CHANNEL,
992 proto::ChannelReadChannelRequest { handle: id },
993 Responder::ReadChannel(id),
994 );
995 }
996
997 Poll::Pending
998 }
999
1000 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1002 let inner = self.0.lock();
1003 let Some(state) = inner.channel_read_states.get(&id) else {
1004 return false;
1005 };
1006 state.is_streaming
1007 }
1008
1009 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1012 let inner = self.0.lock();
1013 match handles {
1014 proto::Handles::Handles(handles) => {
1015 for handle in handles {
1016 assert!(
1017 !(inner.channel_read_states.contains_key(handle)
1018 || inner.socket_read_states.contains_key(handle)),
1019 "Tried to transfer handle after reading"
1020 );
1021 }
1022 }
1023 proto::Handles::Dispositions(dispositions) => {
1024 for disposition in dispositions {
1025 match &disposition.handle {
1026 proto::HandleOp::Move_(handle) => assert!(
1027 !(inner.channel_read_states.contains_key(handle)
1028 || inner.socket_read_states.contains_key(handle)),
1029 "Tried to transfer handle after reading"
1030 ),
1031 proto::HandleOp::Duplicate(_) => (),
1033 }
1034 }
1035 }
1036 }
1037 }
1038}