1use fidl::endpoints::ClientEnd;
6use fidl::{AsHandleRef, HandleBased};
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32 fn new() -> Self {
34 Queue(VecDeque::new(), None)
35 }
36
37 fn is_empty(&self) -> bool {
39 self.0.is_empty()
40 }
41
42 fn destroy_front(&mut self) {
47 assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48 }
49
50 fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52 if let Some(t) = self.0.pop_front() {
53 Poll::Ready(t)
54 } else {
55 self.1 = Some(ctx.waker().clone());
56 Poll::Pending
57 }
58 }
59
60 fn push_front_no_wake(&mut self, t: T) {
67 self.0.push_front(t)
68 }
69
70 fn push_back(&mut self, t: T) {
72 self.0.push_back(t);
73 self.1.take().map(Waker::wake);
74 }
75
76 fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78 if let Some(t) = self.0.front_mut() {
79 Poll::Ready(t)
80 } else {
81 self.1 = Some(ctx.waker().clone());
82 Poll::Pending
83 }
84 }
85}
86
87const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90#[derive(Debug)]
92pub enum FDomainEvent {
93 ChannelStreamingReadStart(NonZeroU32, Result<()>),
94 ChannelStreamingReadStop(NonZeroU32, Result<()>),
95 SocketStreamingReadStart(NonZeroU32, Result<()>),
96 SocketStreamingReadStop(NonZeroU32, Result<()>),
97 WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98 SocketData(NonZeroU32, Result<proto::SocketData>),
99 SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100 SocketDispositionSet(NonZeroU32, Result<()>),
101 WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102 ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103 ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104 WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105 ClosedHandle(NonZeroU32, Result<()>),
106 ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109enum UnprocessedFDomainEvent {
113 Ready(FDomainEvent),
114 ChannelData(NonZeroU32, fidl::MessageBufEtc),
115 ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119 fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120 UnprocessedFDomainEvent::Ready(other)
121 }
122}
123
124enum ReadOp {
126 StreamingChannel(NonZeroU32, bool),
128 StreamingSocket(NonZeroU32, bool),
130 Socket(NonZeroU32, u64),
131 Channel(NonZeroU32),
132}
133
134struct SocketWrite {
138 tid: NonZeroU32,
139 wrote: usize,
140 to_write: Vec<u8>,
141}
142
143enum WriteOp {
145 Socket(SocketWrite),
146 Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147 SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150enum ShuttingDownHandle {
155 InUse(proto::HandleId, HandleState),
156 Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160 fn poll_ready(
161 &mut self,
162 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163 ctx: &mut Context<'_>,
164 ) -> Poll<()> {
165 replace_with(self, |this| match this {
166 this @ ShuttingDownHandle::Ready(_) => this,
167 ShuttingDownHandle::InUse(hid, mut state) => {
168 state.poll(event_queue, ctx);
169
170 if state.write_queue.is_empty() {
171 while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172 match op {
173 ReadOp::StreamingChannel(tid, start) => {
174 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175 id: hid.id,
176 }));
177 let event = if start {
178 FDomainEvent::ChannelStreamingReadStart(tid, err)
179 } else {
180 FDomainEvent::ChannelStreamingReadStop(tid, err)
181 };
182 event_queue.push_back(event.into());
183 }
184 ReadOp::StreamingSocket(tid, start) => {
185 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186 id: hid.id,
187 }));
188 let event = if start {
189 FDomainEvent::SocketStreamingReadStart(tid, err)
190 } else {
191 FDomainEvent::SocketStreamingReadStop(tid, err)
192 };
193 event_queue.push_back(event.into());
194 }
195 ReadOp::Channel(tid) => {
196 let err = state
197 .handle
198 .expected_type(fidl::ObjectType::CHANNEL)
199 .err()
200 .unwrap_or(proto::Error::ClosedDuringRead(
201 proto::ClosedDuringRead,
202 ));
203 event_queue
204 .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205 }
206 ReadOp::Socket(tid, _max_bytes) => {
207 let err = state
208 .handle
209 .expected_type(fidl::ObjectType::SOCKET)
210 .err()
211 .unwrap_or(proto::Error::ClosedDuringRead(
212 proto::ClosedDuringRead,
213 ));
214 event_queue
215 .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216 }
217 }
218 }
219
220 if state.async_read_in_progress {
221 match &*state.handle {
222 AnyHandle::Channel(_) => event_queue.push_back(
223 FDomainEvent::ChannelStreamingData(
224 proto::ChannelOnChannelStreamingDataRequest {
225 handle: hid,
226 channel_sent: proto::ChannelSent::Stopped(
227 proto::AioStopped { error: None },
228 ),
229 },
230 )
231 .into(),
232 ),
233 AnyHandle::Socket(_) => event_queue.push_back(
234 FDomainEvent::SocketStreamingData(
235 proto::SocketOnSocketStreamingDataRequest {
236 handle: hid,
237 socket_message: proto::SocketMessage::Stopped(
238 proto::AioStopped { error: None },
239 ),
240 },
241 )
242 .into(),
243 ),
244 AnyHandle::EventPair(_)
245 | AnyHandle::Event(_)
246 | AnyHandle::Unknown(_) => unreachable!(),
247 }
248 }
249
250 state.signal_waiters.clear();
251 state.io_waiter = None;
252
253 ShuttingDownHandle::Ready(
254 Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255 )
256 } else {
257 ShuttingDownHandle::InUse(hid, state)
258 }
259 }
260 });
261
262 if matches!(self, ShuttingDownHandle::Ready(_)) {
263 Poll::Ready(())
264 } else {
265 Poll::Pending
266 }
267 }
268}
269
270enum HandlesToWrite {
274 SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
275 AllReady(Vec<fidl::HandleDisposition<'static>>),
276}
277
278impl HandlesToWrite {
279 fn poll_ready(
280 &mut self,
281 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
282 ctx: &mut Context<'_>,
283 ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
284 match self {
285 HandlesToWrite::AllReady(s) => Poll::Ready(s),
286 HandlesToWrite::SomeInUse(handles) => {
287 let mut ready = true;
288 for (handle, _) in handles.iter_mut() {
289 ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
290 }
291
292 if !ready {
293 return Poll::Pending;
294 }
295
296 *self = HandlesToWrite::AllReady(
297 handles
298 .drain(..)
299 .map(|(handle, rights)| {
300 let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
301
302 fidl::HandleDisposition::new(
303 fidl::HandleOp::Move(handle.into()),
304 fidl::ObjectType::NONE,
305 rights,
306 fidl::Status::OK,
307 )
308 })
309 .collect(),
310 );
311
312 let HandlesToWrite::AllReady(s) = self else { unreachable!() };
313 Poll::Ready(s)
314 }
315 }
316 }
317}
318
319struct AnyHandleRef(Arc<AnyHandle>);
320
321impl AsHandleRef for AnyHandleRef {
322 fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
323 self.0.as_handle_ref()
324 }
325}
326
327#[cfg(target_os = "fuchsia")]
328type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
329
330#[cfg(not(target_os = "fuchsia"))]
331type OnSignals = fasync::OnSignalsRef<'static>;
332
333struct SignalWaiter {
336 tid: NonZeroU32,
337 waiter: OnSignals,
338}
339
340struct HandleState {
342 handle: Arc<AnyHandle>,
344 hid: proto::HandleId,
346 is_datagram_socket: bool,
350 async_read_in_progress: bool,
355 read_queue: Queue<ReadOp>,
361 write_queue: Queue<WriteOp>,
368 signal_waiters: Vec<SignalWaiter>,
370 io_waiter: Option<OnSignals>,
373}
374
375impl HandleState {
376 fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
377 let is_datagram_socket = match handle.is_datagram_socket() {
378 IsDatagramSocket::Unknown => {
379 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
380 type_: proto::SocketType::unknown(),
381 }))
382 }
383 other => other.is_datagram(),
384 };
385 Ok(HandleState {
386 handle: Arc::new(handle),
387 hid,
388 async_read_in_progress: false,
389 is_datagram_socket,
390 read_queue: Queue::new(),
391 write_queue: Queue::new(),
392 signal_waiters: Vec::new(),
393 io_waiter: None,
394 })
395 }
396
397 fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
400 self.signal_waiters.retain_mut(|x| {
401 let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
402 return true;
403 };
404
405 event_queue.push_back(
406 FDomainEvent::WaitForSignals(
407 x.tid,
408 result
409 .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
410 .map_err(|e| proto::Error::TargetError(e.into_raw())),
411 )
412 .into(),
413 );
414
415 false
416 });
417
418 let read_signals = self.handle.read_signals();
419 let write_signals = self.handle.write_signals();
420
421 loop {
422 if let Some(signal_waiter) = self.io_waiter.as_mut() {
423 if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
424 if let Ok(sigs) = sigs {
425 if sigs.intersects(read_signals) {
426 self.process_read_queue(event_queue, ctx);
427 }
428 if sigs.intersects(write_signals) {
429 self.process_write_queue(event_queue, ctx);
430 }
431 }
432 } else {
433 let need_read = matches!(
434 self.read_queue.front_mut(ctx),
435 Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
436 );
437 let need_write = matches!(
438 self.write_queue.front_mut(ctx),
439 Poll::Ready(WriteOp::SetDisposition(_, _, _))
440 );
441
442 self.process_read_queue(event_queue, ctx);
443 self.process_write_queue(event_queue, ctx);
444
445 if !(need_read || need_write) {
446 break;
447 }
448 }
449 }
450
451 let subscribed_signals =
452 if self.async_read_in_progress || !self.read_queue.is_empty() {
453 read_signals
454 } else {
455 fidl::Signals::NONE
456 } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
457
458 if !subscribed_signals.is_empty() {
459 self.io_waiter = Some(OnSignals::new(
460 AnyHandleRef(Arc::clone(&self.handle)),
461 subscribed_signals,
462 ));
463 } else {
464 self.io_waiter = None;
465 break;
466 }
467 }
468 }
469
470 fn try_enable_async_read(&mut self) -> Result<()> {
472 if self.async_read_in_progress {
473 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
474 } else {
475 self.async_read_in_progress = true;
476 Ok(())
477 }
478 }
479
480 fn try_disable_async_read(&mut self) -> Result<()> {
482 if !self.async_read_in_progress {
483 Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
484 } else {
485 self.async_read_in_progress = false;
486 Ok(())
487 }
488 }
489
490 fn process_read_queue(
492 &mut self,
493 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
494 ctx: &mut Context<'_>,
495 ) {
496 while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
497 match op {
498 ReadOp::StreamingChannel(tid, true) => {
499 let tid = *tid;
500 let result = self.try_enable_async_read();
501 event_queue
502 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
503 self.read_queue.destroy_front();
504 }
505 ReadOp::StreamingChannel(tid, false) => {
506 let tid = *tid;
507 let result = self.try_disable_async_read();
508 event_queue
509 .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
510 self.read_queue.destroy_front();
511 }
512 ReadOp::StreamingSocket(tid, true) => {
513 let tid = *tid;
514 let result = self.try_enable_async_read();
515 event_queue
516 .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
517 self.read_queue.destroy_front();
518 }
519 ReadOp::StreamingSocket(tid, false) => {
520 let tid = *tid;
521 let result = self.try_disable_async_read();
522 event_queue
523 .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
524 self.read_queue.destroy_front();
525 }
526 ReadOp::Socket(tid, max_bytes) => {
527 let (tid, max_bytes) = (*tid, *max_bytes);
528 if let Some(event) = self.do_read_socket(tid, max_bytes) {
529 let _ = self.read_queue.pop_front(ctx);
530 event_queue.push_back(event.into());
531 } else {
532 break;
533 }
534 }
535 ReadOp::Channel(tid) => {
536 let tid = *tid;
537 if let Some(event) = self.do_read_channel(tid) {
538 let _ = self.read_queue.pop_front(ctx);
539 event_queue.push_back(event.into());
540 } else {
541 break;
542 }
543 }
544 }
545 }
546
547 if self.async_read_in_progress {
548 assert!(self.read_queue.is_empty());
551 self.process_async_read(event_queue);
552 }
553 }
554
555 fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
556 assert!(self.async_read_in_progress);
557
558 match &*self.handle {
559 AnyHandle::Channel(_) => {
560 'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
561 match result {
562 Ok(msg) => event_queue.push_back(
563 UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
564 ),
565 Err(e) => {
566 event_queue.push_back(
567 FDomainEvent::ChannelStreamingData(
568 proto::ChannelOnChannelStreamingDataRequest {
569 handle: self.hid,
570 channel_sent: proto::ChannelSent::Stopped(
571 proto::AioStopped { error: Some(Box::new(e)) },
572 ),
573 },
574 )
575 .into(),
576 );
577 self.async_read_in_progress = false;
578 break 'read_loop;
579 }
580 }
581 }
582 }
583
584 AnyHandle::Socket(_) => {
585 'read_loop: while let Some(result) =
586 self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
587 {
588 match result {
589 Ok(data) => {
590 event_queue.push_back(
591 FDomainEvent::SocketStreamingData(
592 proto::SocketOnSocketStreamingDataRequest {
593 handle: self.hid,
594 socket_message: proto::SocketMessage::Data(
595 proto::SocketData {
596 data,
597 is_datagram: self.is_datagram_socket,
598 },
599 ),
600 },
601 )
602 .into(),
603 );
604 }
605 Err(e) => {
606 event_queue.push_back(
607 FDomainEvent::SocketStreamingData(
608 proto::SocketOnSocketStreamingDataRequest {
609 handle: self.hid,
610 socket_message: proto::SocketMessage::Stopped(
611 proto::AioStopped { error: Some(Box::new(e)) },
612 ),
613 },
614 )
615 .into(),
616 );
617 self.async_read_in_progress = false;
618 break 'read_loop;
619 }
620 }
621 }
622 }
623
624 _ => unreachable!("Processed async read for unreadable handle type!"),
625 }
626 }
627
628 fn process_write_queue(
630 &mut self,
631 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
632 ctx: &mut Context<'_>,
633 ) {
634 while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
639 match op {
640 WriteOp::Socket(mut op) => {
641 if let Some(event) = self.do_write_socket(&mut op) {
642 event_queue.push_back(event.into());
643 } else {
644 self.write_queue.push_front_no_wake(WriteOp::Socket(op));
645 break;
646 }
647 }
648 WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
649 let result = { self.handle.socket_disposition(disposition, disposition_peer) };
650 event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
651 }
652 WriteOp::Channel(tid, data, mut handles) => {
653 if self
654 .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
655 .is_pending()
656 {
657 self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
658 break;
659 }
660 }
661 }
662 }
663 }
664
665 fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
669 if self.async_read_in_progress {
670 return Some(
671 FDomainEvent::SocketData(
672 tid,
673 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
674 )
675 .into(),
676 );
677 }
678
679 let max_bytes = if self.is_datagram_socket {
680 let AnyHandle::Socket(s) = &*self.handle else {
681 unreachable!("Read socket from state that wasn't for a socket!");
682 };
683 match s.info() {
684 Ok(x) => x.rx_buf_available as u64,
685 Err(e) => {
691 return Some(FDomainEvent::SocketData(
692 tid,
693 Err(proto::Error::TargetError(e.into_raw())),
694 ))
695 }
696 }
697 } else {
698 max_bytes
699 };
700 self.handle.read_socket(max_bytes).transpose().map(|x| {
701 FDomainEvent::SocketData(
702 tid,
703 x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
704 )
705 })
706 }
707
708 fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
712 match self.handle.write_socket(&op.to_write) {
713 Ok(wrote) => {
714 op.wrote += wrote;
715 op.to_write.drain(..wrote);
716
717 if op.to_write.is_empty() {
718 Some(FDomainEvent::WroteSocket(
719 op.tid,
720 Ok(proto::SocketWriteSocketResponse {
721 wrote: op.wrote.try_into().unwrap(),
722 }),
723 ))
724 } else {
725 None
726 }
727 }
728 Err(error) => Some(FDomainEvent::WroteSocket(
729 op.tid,
730 Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
731 )),
732 }
733 }
734
735 fn do_write_channel(
739 &mut self,
740 tid: NonZeroU32,
741 data: &[u8],
742 handles: &mut HandlesToWrite,
743 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
744 ctx: &mut Context<'_>,
745 ) -> Poll<()> {
746 let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
747 return Poll::Pending;
748 };
749
750 let ret = self.handle.write_channel(data, handles);
751 if let Some(ret) = ret {
752 event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
753 }
754 Poll::Ready(())
755 }
756
757 fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
761 if self.async_read_in_progress {
762 return Some(
763 FDomainEvent::ChannelData(
764 tid,
765 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
766 )
767 .into(),
768 );
769 }
770 match self.handle.read_channel() {
771 Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
772 Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
773 }
774 }
775}
776
777struct ClosingHandle {
780 action: Arc<CloseAction>,
781 state: Option<ShuttingDownHandle>,
782}
783
784impl ClosingHandle {
785 fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
786 if let Some(state) = self.state.as_mut() {
787 if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
788 let state = self.state.take().unwrap();
789 let ShuttingDownHandle::Ready(handle) = state else {
790 unreachable!();
791 };
792 self.action.perform(fdomain, handle);
793 Poll::Ready(())
794 } else {
795 Poll::Pending
796 }
797 } else {
798 Poll::Ready(())
799 }
800 }
801}
802
803enum CloseAction {
808 Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
809 Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
810}
811
812impl CloseAction {
813 fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
814 match self {
815 CloseAction::Close { tid, count, result } => {
816 if count.fetch_sub(1, Ordering::Relaxed) == 1 {
817 fdomain
818 .event_queue
819 .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
820 }
821 }
822 CloseAction::Replace { tid, new_hid, rights } => {
823 let result = handle
824 .replace(*rights)
825 .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
826 fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
827 }
828 }
829 }
830}
831
832#[pin_project::pin_project]
837pub struct FDomain {
838 namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
839 handles: HashMap<proto::HandleId, HandleState>,
840 closing_handles: Vec<ClosingHandle>,
841 event_queue: VecDeque<UnprocessedFDomainEvent>,
842 waker: Option<Waker>,
843}
844
845impl FDomain {
846 pub fn new_empty() -> Self {
849 Self::new(|| Err(fidl::Status::NOT_FOUND))
850 }
851
852 pub fn new(
854 namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
855 ) -> Self {
856 FDomain {
857 namespace: Box::new(namespace),
858 handles: HashMap::new(),
859 closing_handles: Vec::new(),
860 event_queue: VecDeque::new(),
861 waker: None,
862 }
863 }
864
865 fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
867 self.event_queue.push_back(event.into());
868 self.waker.take().map(Waker::wake);
869 }
870
871 fn process_message(
875 &mut self,
876 message: fidl::MessageBufEtc,
877 ) -> Result<proto::ChannelMessage, proto::Error> {
878 let (data, handles) = message.split();
879 let handles = handles
880 .into_iter()
881 .map(|info| {
882 let type_ = info.object_type;
883
884 let handle = match info.object_type {
885 fidl::ObjectType::CHANNEL => {
886 AnyHandle::Channel(fidl::Channel::from_handle(info.handle))
887 }
888 fidl::ObjectType::SOCKET => {
889 AnyHandle::Socket(fidl::Socket::from_handle(info.handle))
890 }
891 fidl::ObjectType::EVENTPAIR => {
892 AnyHandle::EventPair(fidl::EventPair::from_handle(info.handle))
893 }
894 fidl::ObjectType::EVENT => {
895 AnyHandle::Event(fidl::Event::from_handle(info.handle))
896 }
897 _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
898 };
899
900 Ok(proto::HandleInfo {
901 rights: info.rights,
902 handle: self.alloc_fdomain_handle(handle)?,
903 type_,
904 })
905 })
906 .collect::<Result<Vec<_>, proto::Error>>()?;
907
908 Ok(proto::ChannelMessage { data, handles })
909 }
910
911 fn alloc_client_handles<const N: usize>(
918 &mut self,
919 ids: [proto::NewHandleId; N],
920 handles: [AnyHandle; N],
921 ) -> Result<(), proto::Error> {
922 for id in ids {
923 if id.id & (1 << 31) != 0 {
924 return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
925 id: id.id,
926 }));
927 }
928
929 if self.handles.contains_key(&proto::HandleId { id: id.id }) {
930 return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
931 id: id.id,
932 same_call: false,
933 }));
934 }
935 }
936
937 let mut sorted_ids = ids;
938 sorted_ids.sort();
939
940 if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
941 Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
942 id: a[0].id,
943 same_call: true,
944 }))
945 } else {
946 let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
947 let handles = ids
948 .zip(handles.into_iter())
949 .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
950 .collect::<Result<Vec<_>, proto::Error>>()?;
951
952 self.handles.extend(handles);
953
954 Ok(())
955 }
956 }
957
958 fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
961 loop {
962 let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
963 if let Entry::Vacant(v) = self.handles.entry(id) {
964 v.insert(HandleState::new(handle, id)?);
965 break Ok(id);
966 }
967 }
968 }
969
970 fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
972 self.handles
973 .remove(&handle)
974 .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
975 }
976
977 fn using_handle<T>(
979 &mut self,
980 id: proto::HandleId,
981 f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
982 ) -> Result<T, proto::Error> {
983 if let Some(s) = self.handles.get_mut(&id) {
984 f(s)
985 } else {
986 Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
987 }
988 }
989
990 pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
991 match (self.namespace)() {
992 Ok(endpoint) => self.alloc_client_handles(
993 [request.new_handle],
994 [AnyHandle::Channel(endpoint.into_channel())],
995 ),
996 Err(e) => Err(proto::Error::TargetError(e.into_raw())),
997 }
998 }
999
1000 pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
1001 let (a, b) = fidl::Channel::create();
1002 self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
1003 }
1004
1005 pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1006 let (a, b) = match request.options {
1007 proto::SocketType::Stream => fidl::Socket::create_stream(),
1008 proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1009 type_ => {
1010 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }))
1011 }
1012 };
1013
1014 self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1015 }
1016
1017 pub fn create_event_pair(
1018 &mut self,
1019 request: proto::EventPairCreateEventPairRequest,
1020 ) -> Result<()> {
1021 let (a, b) = fidl::EventPair::create();
1022 self.alloc_client_handles(
1023 request.handles,
1024 [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1025 )
1026 }
1027
1028 pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1029 let a = fidl::Event::create();
1030 self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1031 }
1032
1033 pub fn set_socket_disposition(
1034 &mut self,
1035 tid: NonZeroU32,
1036 request: proto::SocketSetSocketDispositionRequest,
1037 ) {
1038 if let Err(err) = self.using_handle(request.handle, |h| {
1039 h.write_queue.push_back(WriteOp::SetDisposition(
1040 tid,
1041 request.disposition,
1042 request.disposition_peer,
1043 ));
1044 Ok(())
1045 }) {
1046 self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1047 }
1048 }
1049
1050 pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1051 if let Err(e) = self.using_handle(request.handle, |h| {
1052 h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1053 Ok(())
1054 }) {
1055 self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1056 }
1057 }
1058
1059 pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1060 if let Err(e) = self.using_handle(request.handle, |h| {
1061 h.read_queue.push_back(ReadOp::Channel(tid));
1062 Ok(())
1063 }) {
1064 self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1065 }
1066 }
1067
1068 pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1069 if let Err(error) = self.using_handle(request.handle, |h| {
1070 h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1071 tid,
1072 wrote: 0,
1073 to_write: request.data,
1074 }));
1075 Ok(())
1076 }) {
1077 self.push_event(FDomainEvent::WroteSocket(
1078 tid,
1079 Err(proto::WriteSocketError { error, wrote: 0 }),
1080 ));
1081 }
1082 }
1083
1084 pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1085 let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1093 proto::Handles::Handles(h) => h
1094 .into_iter()
1095 .map(|h| {
1096 if h != request.handle {
1097 self.take_handle(h).map(|handle_state| {
1098 (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1099 })
1100 } else {
1101 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1102 }
1103 })
1104 .collect(),
1105 proto::Handles::Dispositions(d) => d
1106 .into_iter()
1107 .map(|d| {
1108 let res = match d.handle {
1109 proto::HandleOp::Move_(h) => {
1110 if h != request.handle {
1111 self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1112 } else {
1113 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1114 }
1115 }
1116 proto::HandleOp::Duplicate(h) => {
1117 if h != request.handle {
1118 self.using_handle(h, |h| {
1124 h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1125 })
1126 .map(ShuttingDownHandle::Ready)
1127 } else {
1128 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1129 }
1130 }
1131 };
1132
1133 res.and_then(|x| Ok((x, d.rights)))
1134 })
1135 .collect(),
1136 };
1137
1138 if handles.iter().any(|x| x.is_err()) {
1139 let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1140
1141 self.push_event(FDomainEvent::WroteChannel(
1142 tid,
1143 Err(proto::WriteChannelError::OpErrors(e)),
1144 ));
1145 return;
1146 }
1147
1148 let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1149
1150 if let Err(e) = self.using_handle(request.handle, |h| {
1151 h.write_queue.push_back(WriteOp::Channel(
1152 tid,
1153 request.data,
1154 HandlesToWrite::SomeInUse(handles),
1155 ));
1156 Ok(())
1157 }) {
1158 self.push_event(FDomainEvent::WroteChannel(
1159 tid,
1160 Err(proto::WriteChannelError::Error(e)),
1161 ));
1162 }
1163 }
1164
1165 pub fn wait_for_signals(
1166 &mut self,
1167 tid: NonZeroU32,
1168 request: proto::FDomainWaitForSignalsRequest,
1169 ) {
1170 let result = self.using_handle(request.handle, |h| {
1171 let signals = fidl::Signals::from_bits_retain(request.signals);
1172 h.signal_waiters.push(SignalWaiter {
1173 tid,
1174 waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1175 });
1176 Ok(())
1177 });
1178
1179 if let Err(e) = result {
1180 self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1181 } else {
1182 self.waker.take().map(Waker::wake);
1183 }
1184 }
1185
1186 pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1187 let mut states = Vec::with_capacity(request.handles.len());
1188 let mut result = Ok(());
1189 for hid in request.handles {
1190 match self.take_handle(hid) {
1191 Ok(state) => states.push((hid, state)),
1192
1193 Err(e) => {
1194 result = result.and(Err(e));
1195 }
1196 }
1197 }
1198
1199 let action = Arc::new(CloseAction::Close {
1200 tid,
1201 count: AtomicU32::new(states.len().try_into().unwrap()),
1202 result,
1203 });
1204
1205 for (hid, state) in states {
1206 self.closing_handles.push(ClosingHandle {
1207 action: Arc::clone(&action),
1208 state: Some(ShuttingDownHandle::InUse(hid, state)),
1209 });
1210 }
1211 }
1212
1213 pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1214 let rights = request.rights;
1215 let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1216 handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1217 }
1218
1219 pub fn replace(
1220 &mut self,
1221 tid: NonZeroU32,
1222 request: proto::FDomainReplaceRequest,
1223 ) -> Result<()> {
1224 let rights = request.rights;
1225 let new_hid = request.new_handle;
1226 match self.take_handle(request.handle) {
1227 Ok(state) => self.closing_handles.push(ClosingHandle {
1228 action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1229 state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1230 }),
1231 Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1232 FDomainEvent::ReplacedHandle(tid, Err(e)),
1233 )),
1234 }
1235
1236 Ok(())
1237 }
1238
1239 pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1240 let set = fidl::Signals::from_bits_retain(request.set);
1241 let clear = fidl::Signals::from_bits_retain(request.clear);
1242
1243 self.using_handle(request.handle, |h| {
1244 h.handle.signal_handle(clear, set).map_err(|e| proto::Error::TargetError(e.into_raw()))
1245 })
1246 }
1247
1248 pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1249 let set = fidl::Signals::from_bits_retain(request.set);
1250 let clear = fidl::Signals::from_bits_retain(request.clear);
1251
1252 self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1253 }
1254
1255 pub fn read_channel_streaming_start(
1256 &mut self,
1257 tid: NonZeroU32,
1258 request: proto::ChannelReadChannelStreamingStartRequest,
1259 ) {
1260 if let Err(err) = self.using_handle(request.handle, |h| {
1261 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1262 h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1263 Ok(())
1264 }) {
1265 self.event_queue
1266 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1267 }
1268 }
1269
1270 pub fn read_channel_streaming_stop(
1271 &mut self,
1272 tid: NonZeroU32,
1273 request: proto::ChannelReadChannelStreamingStopRequest,
1274 ) {
1275 if let Err(err) = self.using_handle(request.handle, |h| {
1276 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1277 h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1278 Ok(())
1279 }) {
1280 self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1281 }
1282 }
1283
1284 pub fn read_socket_streaming_start(
1285 &mut self,
1286 tid: NonZeroU32,
1287 request: proto::SocketReadSocketStreamingStartRequest,
1288 ) {
1289 if let Err(err) = self.using_handle(request.handle, |h| {
1290 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1291 h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1292 Ok(())
1293 }) {
1294 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1295 }
1296 }
1297
1298 pub fn read_socket_streaming_stop(
1299 &mut self,
1300 tid: NonZeroU32,
1301 request: proto::SocketReadSocketStreamingStopRequest,
1302 ) {
1303 if let Err(err) = self.using_handle(request.handle, |h| {
1304 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1305 h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1306 Ok(())
1307 }) {
1308 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1309 }
1310 }
1311}
1312
1313impl futures::Stream for FDomain {
1316 type Item = FDomainEvent;
1317
1318 fn poll_next(
1319 mut self: std::pin::Pin<&mut Self>,
1320 ctx: &mut Context<'_>,
1321 ) -> Poll<Option<Self::Item>> {
1322 let this = &mut *self;
1323
1324 let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1325 closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1326 this.closing_handles = closing_handles;
1327
1328 let handles = &mut this.handles;
1329 let event_queue = &mut this.event_queue;
1330 for state in handles.values_mut() {
1331 state.poll(event_queue, ctx);
1332 }
1333
1334 if let Some(event) = self.event_queue.pop_front() {
1335 match event {
1336 UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1337 UnprocessedFDomainEvent::ChannelData(tid, message) => {
1338 Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1339 }
1340 UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1341 match self.process_message(message) {
1342 Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1343 proto::ChannelOnChannelStreamingDataRequest {
1344 handle: hid,
1345 channel_sent: proto::ChannelSent::Message(message),
1346 },
1347 ))),
1348 Err(e) => {
1349 self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1350 Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1351 proto::ChannelOnChannelStreamingDataRequest {
1352 handle: hid,
1353 channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1354 error: Some(Box::new(e)),
1355 }),
1356 },
1357 )))
1358 }
1359 }
1360 }
1361 }
1362 } else {
1363 self.waker = Some(ctx.waker().clone());
1364 Poll::Pending
1365 }
1366 }
1367}