fdomain_container/
wire.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fdomain as proto;
6use std::collections::VecDeque;
7use std::num::NonZeroU32;
8use std::task::{Context, Poll, Waker};
9
10use proto::f_domain_ordinals as ordinals;
11
12/// Wraps an [`FDomain`] and provides an interface that operates on
13/// binary-encoded FIDL messages, as opposed to the FIDL request/response
14/// structs [`FDomain`] itself deals with.
15///
16/// Request messages are passed in with the [`message`] method, and polling the
17/// `FDomainCodec` as a stream will yield the responses.
18#[pin_project::pin_project]
19pub struct FDomainCodec {
20    #[pin]
21    fdomain: crate::FDomain,
22    outgoing: VecDeque<Box<[u8]>>,
23    wakers: Vec<Waker>,
24}
25
26impl FDomainCodec {
27    /// Construct a new [`FDomainCodec`] around the given [`FDomain`]
28    pub fn new(fdomain: crate::FDomain) -> FDomainCodec {
29        FDomainCodec { fdomain, outgoing: VecDeque::new(), wakers: Vec::new() }
30    }
31
32    /// Process an incoming message.
33    pub fn message(&mut self, data: &[u8]) -> fidl::Result<()> {
34        let (header, rest) = fidl_message::decode_transaction_header(data)?;
35        let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
36            return Err(fidl::Error::UnknownOrdinal {
37                ordinal: header.ordinal,
38                protocol_name:
39                    <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
40            });
41        };
42
43        match header.ordinal {
44            ordinals::GET_NAMESPACE => {
45                let request = fidl_message::decode_message::<proto::FDomainGetNamespaceRequest>(
46                    header, rest,
47                )?;
48                let result = self.fdomain.get_namespace(request);
49                self.send_response(tx_id, header.ordinal, result)?;
50            }
51            ordinals::CREATE_CHANNEL => {
52                let request = fidl_message::decode_message::<proto::ChannelCreateChannelRequest>(
53                    header, rest,
54                )?;
55                let result = self.fdomain.create_channel(request);
56                self.send_response(tx_id, header.ordinal, result)?;
57            }
58            ordinals::CREATE_SOCKET => {
59                let request =
60                    fidl_message::decode_message::<proto::SocketCreateSocketRequest>(header, rest)?;
61                let result = self.fdomain.create_socket(request);
62                self.send_response(tx_id, header.ordinal, result)?;
63            }
64            ordinals::CREATE_EVENT_PAIR => {
65                let request = fidl_message::decode_message::<proto::EventPairCreateEventPairRequest>(
66                    header, rest,
67                )?;
68                let result = self.fdomain.create_event_pair(request);
69                self.send_response(tx_id, header.ordinal, result)?;
70            }
71            ordinals::CREATE_EVENT => {
72                let request =
73                    fidl_message::decode_message::<proto::EventCreateEventRequest>(header, rest)?;
74                let result = self.fdomain.create_event(request);
75                self.send_response(tx_id, header.ordinal, result)?;
76            }
77            ordinals::SET_SOCKET_DISPOSITION => {
78                let request = fidl_message::decode_message::<
79                    proto::SocketSetSocketDispositionRequest,
80                >(header, rest)?;
81                self.fdomain.set_socket_disposition(tx_id, request);
82            }
83            ordinals::READ_SOCKET => {
84                let request =
85                    fidl_message::decode_message::<proto::SocketReadSocketRequest>(header, rest)?;
86                self.fdomain.read_socket(tx_id, request);
87            }
88            ordinals::READ_CHANNEL => {
89                let request =
90                    fidl_message::decode_message::<proto::ChannelReadChannelRequest>(header, rest)?;
91                self.fdomain.read_channel(tx_id, request);
92            }
93            ordinals::WRITE_SOCKET => {
94                let request =
95                    fidl_message::decode_message::<proto::SocketWriteSocketRequest>(header, rest)?;
96                self.fdomain.write_socket(tx_id, request);
97            }
98            ordinals::WRITE_CHANNEL => {
99                let request = fidl_message::decode_message::<proto::ChannelWriteChannelRequest>(
100                    header, rest,
101                )?;
102                self.fdomain.write_channel(tx_id, request);
103            }
104            ordinals::WAIT_FOR_SIGNALS => {
105                let request = fidl_message::decode_message::<proto::FDomainWaitForSignalsRequest>(
106                    header, rest,
107                )?;
108                self.fdomain.wait_for_signals(tx_id, request);
109            }
110            ordinals::CLOSE => {
111                let request =
112                    fidl_message::decode_message::<proto::FDomainCloseRequest>(header, rest)?;
113                self.fdomain.close(tx_id, request);
114            }
115            ordinals::DUPLICATE => {
116                let request =
117                    fidl_message::decode_message::<proto::FDomainDuplicateRequest>(header, rest)?;
118                let result = self.fdomain.duplicate(request);
119                self.send_response(tx_id, header.ordinal, result)?;
120            }
121            ordinals::REPLACE => {
122                let request =
123                    fidl_message::decode_message::<proto::FDomainReplaceRequest>(header, rest)?;
124                let result = self.fdomain.replace(tx_id, request);
125                self.send_response(tx_id, header.ordinal, result)?;
126            }
127            ordinals::SIGNAL => {
128                let request =
129                    fidl_message::decode_message::<proto::FDomainSignalRequest>(header, rest)?;
130                let result = self.fdomain.signal(request);
131                self.send_response(tx_id, header.ordinal, result)?;
132            }
133            ordinals::SIGNAL_PEER => {
134                let request =
135                    fidl_message::decode_message::<proto::FDomainSignalPeerRequest>(header, rest)?;
136                let result = self.fdomain.signal_peer(request);
137                self.send_response(tx_id, header.ordinal, result)?;
138            }
139            ordinals::READ_CHANNEL_STREAMING_START => {
140                let request = fidl_message::decode_message::<
141                    proto::ChannelReadChannelStreamingStartRequest,
142                >(header, rest)?;
143                self.fdomain.read_channel_streaming_start(tx_id, request);
144            }
145            ordinals::READ_CHANNEL_STREAMING_STOP => {
146                let request = fidl_message::decode_message::<
147                    proto::ChannelReadChannelStreamingStopRequest,
148                >(header, rest)?;
149                self.fdomain.read_channel_streaming_stop(tx_id, request);
150            }
151            ordinals::READ_SOCKET_STREAMING_START => {
152                let request = fidl_message::decode_message::<
153                    proto::SocketReadSocketStreamingStartRequest,
154                >(header, rest)?;
155                self.fdomain.read_socket_streaming_start(tx_id, request);
156            }
157            ordinals::READ_SOCKET_STREAMING_STOP => {
158                let request = fidl_message::decode_message::<
159                    proto::SocketReadSocketStreamingStopRequest,
160                >(header, rest)?;
161                self.fdomain.read_socket_streaming_stop(tx_id, request);
162            }
163            unknown if header.dynamic_flags().contains(fidl_message::DynamicFlags::FLEXIBLE) => {
164                if header.tx_id != 0 {
165                    let header = fidl_message::TransactionHeader::new(
166                        header.tx_id,
167                        unknown,
168                        fidl_message::DynamicFlags::FLEXIBLE,
169                    );
170                    self.enqueue_outgoing::<Vec<u8>>(
171                        fidl_message::encode_response_flexible_unknown(header)?.into(),
172                    );
173                }
174            }
175            _ => {
176                return Err(fidl::Error::UnknownOrdinal {
177                    ordinal: header.ordinal,
178                    protocol_name:
179                        <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
180                })
181            }
182        }
183
184        Ok(())
185    }
186
187    /// Add an outgoing message to our outgoing queue and wake any wakers that
188    /// were waiting for that.
189    fn enqueue_outgoing<T: Into<Box<[u8]>>>(&mut self, msg: T) {
190        self.outgoing.push_back(msg.into());
191        self.wakers.drain(..).for_each(Waker::wake);
192    }
193
194    /// Encode and enqueue a fallible response message to the client. The
195    /// `ordinal` field should correspond correctly to the type of message given
196    /// by the type argument.
197    fn send_response<T: fidl_message::Body, E: fidl_message::ErrorType>(
198        &mut self,
199        tx_id: NonZeroU32,
200        ordinal: u64,
201        body: Result<T, E>,
202    ) -> fidl::Result<()>
203where
204    for<'a> <<T as fidl_message::Body>::MarkerInResultUnion as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
205        fidl::encoding::Encode<T::MarkerInResultUnion, fidl::encoding::NoHandleResourceDialect>,
206    for<'a> <<E as fidl_message::ErrorType>::Marker as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
207        fidl::encoding::Encode<E::Marker, fidl::encoding::NoHandleResourceDialect>,
208    {
209        let header = fidl_message::TransactionHeader::new(
210            tx_id.into(),
211            ordinal,
212            fidl_message::DynamicFlags::FLEXIBLE,
213        );
214        self.enqueue_outgoing(fidl_message::encode_response_result::<T, E>(header, body)?);
215
216        Ok(())
217    }
218
219    /// Encode and enqueue an event message to the client. The `ordinal` field
220    /// should correspond correctly to the type of message given by the type
221    /// argument.
222    fn send_event<T: fidl_message::Body>(&mut self, ordinal: u64, body: T) -> fidl::Result<()>
223    where
224        for<'a> <<T as fidl_message::Body>::MarkerAtTopLevel as fidl::encoding::ValueTypeMarker>::Borrowed<
225            'a,
226        >: fidl::encoding::Encode<T::MarkerAtTopLevel, fidl::encoding::NoHandleResourceDialect>,
227    {
228        let header =
229            fidl_message::TransactionHeader::new(0, ordinal, fidl_message::DynamicFlags::empty());
230        self.enqueue_outgoing(fidl_message::encode_message(header, body)?);
231
232        Ok(())
233    }
234}
235
236impl futures::Stream for FDomainCodec {
237    type Item = fidl::Result<Box<[u8]>>;
238
239    fn poll_next(
240        mut self: std::pin::Pin<&mut Self>,
241        ctx: &mut Context<'_>,
242    ) -> Poll<Option<Self::Item>> {
243        while let Poll::Ready(Some(event)) = self.as_mut().project().fdomain.poll_next(ctx) {
244            let result = match event {
245                crate::FDomainEvent::ChannelStreamingReadStart(tx_id, msg) => {
246                    self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_START, msg)
247                }
248                crate::FDomainEvent::ChannelStreamingReadStop(tx_id, msg) => {
249                    self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_STOP, msg)
250                }
251                crate::FDomainEvent::SocketStreamingReadStart(tx_id, msg) => {
252                    self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_START, msg)
253                }
254                crate::FDomainEvent::SocketStreamingReadStop(tx_id, msg) => {
255                    self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_STOP, msg)
256                }
257                crate::FDomainEvent::WaitForSignals(tx_id, msg) => {
258                    self.send_response(tx_id, ordinals::WAIT_FOR_SIGNALS, msg)
259                }
260                crate::FDomainEvent::SocketData(tx_id, msg) => {
261                    self.send_response(tx_id, ordinals::READ_SOCKET, msg)
262                }
263                crate::FDomainEvent::SocketStreamingData(msg) => {
264                    self.send_event(ordinals::ON_SOCKET_STREAMING_DATA, msg)
265                }
266                crate::FDomainEvent::SocketDispositionSet(tx_id, msg) => {
267                    self.send_response(tx_id, ordinals::SET_SOCKET_DISPOSITION, msg)
268                }
269                crate::FDomainEvent::WroteSocket(tx_id, msg) => {
270                    self.send_response(tx_id, ordinals::WRITE_SOCKET, msg)
271                }
272                crate::FDomainEvent::ChannelData(tx_id, msg) => {
273                    self.send_response(tx_id, ordinals::READ_CHANNEL, msg)
274                }
275                crate::FDomainEvent::ChannelStreamingData(msg) => {
276                    self.send_event(ordinals::ON_CHANNEL_STREAMING_DATA, msg)
277                }
278                crate::FDomainEvent::WroteChannel(tx_id, msg) => {
279                    self.send_response(tx_id, ordinals::WRITE_CHANNEL, msg)
280                }
281                crate::FDomainEvent::ClosedHandle(tx_id, msg) => {
282                    self.send_response(tx_id, ordinals::CLOSE, msg)
283                }
284                crate::FDomainEvent::ReplacedHandle(tx_id, msg) => {
285                    self.send_response(tx_id, ordinals::REPLACE, msg)
286                }
287            };
288
289            if let Err(e) = result {
290                return Poll::Ready(Some(Err(e)));
291            }
292        }
293
294        if let Some(got) = self.outgoing.pop_front() {
295            Poll::Ready(Some(Ok(got)))
296        } else {
297            self.wakers.push(ctx.waker().clone());
298            Poll::Pending
299        }
300    }
301}