1use fidl_fuchsia_fdomain as proto;
6use std::collections::VecDeque;
7use std::num::NonZeroU32;
8use std::task::{Context, Poll, Waker};
9
10use proto::f_domain_ordinals as ordinals;
11
12#[pin_project::pin_project]
19pub struct FDomainCodec {
20 #[pin]
21 fdomain: crate::FDomain,
22 outgoing: VecDeque<Box<[u8]>>,
23 wakers: Vec<Waker>,
24}
25
26impl FDomainCodec {
27 pub fn new(fdomain: crate::FDomain) -> FDomainCodec {
29 FDomainCodec { fdomain, outgoing: VecDeque::new(), wakers: Vec::new() }
30 }
31
32 pub fn message(&mut self, data: &[u8]) -> fidl::Result<()> {
34 let (header, rest) = fidl_message::decode_transaction_header(data)?;
35 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
36 return Err(fidl::Error::UnknownOrdinal {
37 ordinal: header.ordinal,
38 protocol_name:
39 <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
40 });
41 };
42
43 match header.ordinal {
44 ordinals::GET_NAMESPACE => {
45 let request = fidl_message::decode_message::<proto::FDomainGetNamespaceRequest>(
46 header, rest,
47 )?;
48 let result = self.fdomain.get_namespace(request);
49 self.send_response(tx_id, header.ordinal, result)?;
50 }
51 ordinals::CREATE_CHANNEL => {
52 let request = fidl_message::decode_message::<proto::ChannelCreateChannelRequest>(
53 header, rest,
54 )?;
55 let result = self.fdomain.create_channel(request);
56 self.send_response(tx_id, header.ordinal, result)?;
57 }
58 ordinals::CREATE_SOCKET => {
59 let request =
60 fidl_message::decode_message::<proto::SocketCreateSocketRequest>(header, rest)?;
61 let result = self.fdomain.create_socket(request);
62 self.send_response(tx_id, header.ordinal, result)?;
63 }
64 ordinals::CREATE_EVENT_PAIR => {
65 let request = fidl_message::decode_message::<proto::EventPairCreateEventPairRequest>(
66 header, rest,
67 )?;
68 let result = self.fdomain.create_event_pair(request);
69 self.send_response(tx_id, header.ordinal, result)?;
70 }
71 ordinals::CREATE_EVENT => {
72 let request =
73 fidl_message::decode_message::<proto::EventCreateEventRequest>(header, rest)?;
74 let result = self.fdomain.create_event(request);
75 self.send_response(tx_id, header.ordinal, result)?;
76 }
77 ordinals::SET_SOCKET_DISPOSITION => {
78 let request = fidl_message::decode_message::<
79 proto::SocketSetSocketDispositionRequest,
80 >(header, rest)?;
81 self.fdomain.set_socket_disposition(tx_id, request);
82 }
83 ordinals::READ_SOCKET => {
84 let request =
85 fidl_message::decode_message::<proto::SocketReadSocketRequest>(header, rest)?;
86 self.fdomain.read_socket(tx_id, request);
87 }
88 ordinals::READ_CHANNEL => {
89 let request =
90 fidl_message::decode_message::<proto::ChannelReadChannelRequest>(header, rest)?;
91 self.fdomain.read_channel(tx_id, request);
92 }
93 ordinals::WRITE_SOCKET => {
94 let request =
95 fidl_message::decode_message::<proto::SocketWriteSocketRequest>(header, rest)?;
96 self.fdomain.write_socket(tx_id, request);
97 }
98 ordinals::WRITE_CHANNEL => {
99 let request = fidl_message::decode_message::<proto::ChannelWriteChannelRequest>(
100 header, rest,
101 )?;
102 self.fdomain.write_channel(tx_id, request);
103 }
104 ordinals::WAIT_FOR_SIGNALS => {
105 let request = fidl_message::decode_message::<proto::FDomainWaitForSignalsRequest>(
106 header, rest,
107 )?;
108 self.fdomain.wait_for_signals(tx_id, request);
109 }
110 ordinals::CLOSE => {
111 let request =
112 fidl_message::decode_message::<proto::FDomainCloseRequest>(header, rest)?;
113 self.fdomain.close(tx_id, request);
114 }
115 ordinals::DUPLICATE => {
116 let request =
117 fidl_message::decode_message::<proto::FDomainDuplicateRequest>(header, rest)?;
118 let result = self.fdomain.duplicate(request);
119 self.send_response(tx_id, header.ordinal, result)?;
120 }
121 ordinals::REPLACE => {
122 let request =
123 fidl_message::decode_message::<proto::FDomainReplaceRequest>(header, rest)?;
124 let result = self.fdomain.replace(tx_id, request);
125 self.send_response(tx_id, header.ordinal, result)?;
126 }
127 ordinals::SIGNAL => {
128 let request =
129 fidl_message::decode_message::<proto::FDomainSignalRequest>(header, rest)?;
130 let result = self.fdomain.signal(request);
131 self.send_response(tx_id, header.ordinal, result)?;
132 }
133 ordinals::SIGNAL_PEER => {
134 let request =
135 fidl_message::decode_message::<proto::FDomainSignalPeerRequest>(header, rest)?;
136 let result = self.fdomain.signal_peer(request);
137 self.send_response(tx_id, header.ordinal, result)?;
138 }
139 ordinals::READ_CHANNEL_STREAMING_START => {
140 let request = fidl_message::decode_message::<
141 proto::ChannelReadChannelStreamingStartRequest,
142 >(header, rest)?;
143 self.fdomain.read_channel_streaming_start(tx_id, request);
144 }
145 ordinals::READ_CHANNEL_STREAMING_STOP => {
146 let request = fidl_message::decode_message::<
147 proto::ChannelReadChannelStreamingStopRequest,
148 >(header, rest)?;
149 self.fdomain.read_channel_streaming_stop(tx_id, request);
150 }
151 ordinals::READ_SOCKET_STREAMING_START => {
152 let request = fidl_message::decode_message::<
153 proto::SocketReadSocketStreamingStartRequest,
154 >(header, rest)?;
155 self.fdomain.read_socket_streaming_start(tx_id, request);
156 }
157 ordinals::READ_SOCKET_STREAMING_STOP => {
158 let request = fidl_message::decode_message::<
159 proto::SocketReadSocketStreamingStopRequest,
160 >(header, rest)?;
161 self.fdomain.read_socket_streaming_stop(tx_id, request);
162 }
163 unknown if header.dynamic_flags().contains(fidl_message::DynamicFlags::FLEXIBLE) => {
164 if header.tx_id != 0 {
165 let header = fidl_message::TransactionHeader::new(
166 header.tx_id,
167 unknown,
168 fidl_message::DynamicFlags::FLEXIBLE,
169 );
170 self.enqueue_outgoing::<Vec<u8>>(
171 fidl_message::encode_response_flexible_unknown(header)?.into(),
172 );
173 }
174 }
175 _ => {
176 return Err(fidl::Error::UnknownOrdinal {
177 ordinal: header.ordinal,
178 protocol_name:
179 <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
180 })
181 }
182 }
183
184 Ok(())
185 }
186
187 fn enqueue_outgoing<T: Into<Box<[u8]>>>(&mut self, msg: T) {
190 self.outgoing.push_back(msg.into());
191 self.wakers.drain(..).for_each(Waker::wake);
192 }
193
194 fn send_response<T: fidl_message::Body, E: fidl_message::ErrorType>(
198 &mut self,
199 tx_id: NonZeroU32,
200 ordinal: u64,
201 body: Result<T, E>,
202 ) -> fidl::Result<()>
203where
204 for<'a> <<T as fidl_message::Body>::MarkerInResultUnion as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
205 fidl::encoding::Encode<T::MarkerInResultUnion, fidl::encoding::NoHandleResourceDialect>,
206 for<'a> <<E as fidl_message::ErrorType>::Marker as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
207 fidl::encoding::Encode<E::Marker, fidl::encoding::NoHandleResourceDialect>,
208 {
209 let header = fidl_message::TransactionHeader::new(
210 tx_id.into(),
211 ordinal,
212 fidl_message::DynamicFlags::FLEXIBLE,
213 );
214 self.enqueue_outgoing(fidl_message::encode_response_result::<T, E>(header, body)?);
215
216 Ok(())
217 }
218
219 fn send_event<T: fidl_message::Body>(&mut self, ordinal: u64, body: T) -> fidl::Result<()>
223 where
224 for<'a> <<T as fidl_message::Body>::MarkerAtTopLevel as fidl::encoding::ValueTypeMarker>::Borrowed<
225 'a,
226 >: fidl::encoding::Encode<T::MarkerAtTopLevel, fidl::encoding::NoHandleResourceDialect>,
227 {
228 let header =
229 fidl_message::TransactionHeader::new(0, ordinal, fidl_message::DynamicFlags::empty());
230 self.enqueue_outgoing(fidl_message::encode_message(header, body)?);
231
232 Ok(())
233 }
234}
235
236impl futures::Stream for FDomainCodec {
237 type Item = fidl::Result<Box<[u8]>>;
238
239 fn poll_next(
240 mut self: std::pin::Pin<&mut Self>,
241 ctx: &mut Context<'_>,
242 ) -> Poll<Option<Self::Item>> {
243 while let Poll::Ready(Some(event)) = self.as_mut().project().fdomain.poll_next(ctx) {
244 let result = match event {
245 crate::FDomainEvent::ChannelStreamingReadStart(tx_id, msg) => {
246 self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_START, msg)
247 }
248 crate::FDomainEvent::ChannelStreamingReadStop(tx_id, msg) => {
249 self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_STOP, msg)
250 }
251 crate::FDomainEvent::SocketStreamingReadStart(tx_id, msg) => {
252 self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_START, msg)
253 }
254 crate::FDomainEvent::SocketStreamingReadStop(tx_id, msg) => {
255 self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_STOP, msg)
256 }
257 crate::FDomainEvent::WaitForSignals(tx_id, msg) => {
258 self.send_response(tx_id, ordinals::WAIT_FOR_SIGNALS, msg)
259 }
260 crate::FDomainEvent::SocketData(tx_id, msg) => {
261 self.send_response(tx_id, ordinals::READ_SOCKET, msg)
262 }
263 crate::FDomainEvent::SocketStreamingData(msg) => {
264 self.send_event(ordinals::ON_SOCKET_STREAMING_DATA, msg)
265 }
266 crate::FDomainEvent::SocketDispositionSet(tx_id, msg) => {
267 self.send_response(tx_id, ordinals::SET_SOCKET_DISPOSITION, msg)
268 }
269 crate::FDomainEvent::WroteSocket(tx_id, msg) => {
270 self.send_response(tx_id, ordinals::WRITE_SOCKET, msg)
271 }
272 crate::FDomainEvent::ChannelData(tx_id, msg) => {
273 self.send_response(tx_id, ordinals::READ_CHANNEL, msg)
274 }
275 crate::FDomainEvent::ChannelStreamingData(msg) => {
276 self.send_event(ordinals::ON_CHANNEL_STREAMING_DATA, msg)
277 }
278 crate::FDomainEvent::WroteChannel(tx_id, msg) => {
279 self.send_response(tx_id, ordinals::WRITE_CHANNEL, msg)
280 }
281 crate::FDomainEvent::ClosedHandle(tx_id, msg) => {
282 self.send_response(tx_id, ordinals::CLOSE, msg)
283 }
284 crate::FDomainEvent::ReplacedHandle(tx_id, msg) => {
285 self.send_response(tx_id, ordinals::REPLACE, msg)
286 }
287 };
288
289 if let Err(e) = result {
290 return Poll::Ready(Some(Err(e)));
291 }
292 }
293
294 if let Some(got) = self.outgoing.pop_front() {
295 Poll::Ready(Some(Ok(got)))
296 } else {
297 self.wakers.push(ctx.waker().clone());
298 Poll::Pending
299 }
300 }
301}