1use crate::handle::handle_type;
6use crate::responder::Responder;
7use crate::{Error, Event, EventPair, Handle, OnFDomainSignals, Socket, ordinals};
8use fidl_fuchsia_fdomain as proto;
9use futures::future::Either;
10use futures::stream::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{Context, Poll, ready};
15
16#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub struct Channel(pub(crate) Handle);
19
20handle_type!(Channel CHANNEL peered);
21
22#[derive(Debug)]
24pub struct MessageBuf {
25 pub bytes: Vec<u8>,
26 pub handles: Vec<HandleInfo>,
27}
28
29impl MessageBuf {
30 pub fn new() -> Self {
32 MessageBuf { bytes: Vec::new(), handles: Vec::new() }
33 }
34
35 pub fn split(self) -> (Vec<u8>, Vec<HandleInfo>) {
37 (self.bytes, self.handles)
38 }
39
40 pub fn ensure_capacity_bytes(&mut self, bytes: usize) {
42 self.bytes.reserve(bytes);
43 }
44
45 pub fn clear(&mut self) {
47 self.bytes.clear();
48 self.handles.clear();
49 }
50
51 pub fn bytes(&self) -> &[u8] {
53 self.bytes.as_slice()
54 }
55
56 fn from_proto(client: &Arc<crate::Client>, message: proto::ChannelMessage) -> MessageBuf {
58 let proto::ChannelMessage { data, handles } = message;
59 MessageBuf {
60 bytes: data,
61 handles: handles
62 .into_iter()
63 .map(|info| {
64 let handle = Handle { id: info.handle.id, client: Arc::downgrade(client) };
65 HandleInfo {
66 rights: info.rights,
67 handle: AnyHandle::from_handle(handle, info.type_),
68 }
69 })
70 .collect(),
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct HandleInfo {
78 pub handle: AnyHandle,
79 pub rights: fidl::Rights,
80}
81
82#[derive(Debug)]
85pub enum AnyHandle {
86 Channel(Channel),
87 Socket(Socket),
88 Event(Event),
89 EventPair(EventPair),
90 Unknown(Handle, fidl::ObjectType),
91}
92
93impl AnyHandle {
94 pub fn from_handle(handle: Handle, ty: fidl::ObjectType) -> AnyHandle {
96 match ty {
97 fidl::ObjectType::CHANNEL => AnyHandle::Channel(Channel(handle)),
98 fidl::ObjectType::SOCKET => AnyHandle::Socket(Socket(handle)),
99 fidl::ObjectType::EVENT => AnyHandle::Event(Event(handle)),
100 fidl::ObjectType::EVENTPAIR => AnyHandle::EventPair(EventPair(handle)),
101 _ => AnyHandle::Unknown(handle, ty),
102 }
103 }
104
105 pub fn invalid() -> AnyHandle {
107 AnyHandle::Unknown(Handle::invalid(), fidl::ObjectType::NONE)
108 }
109
110 pub fn object_type(&self) -> fidl::ObjectType {
112 match self {
113 AnyHandle::Channel(_) => fidl::ObjectType::CHANNEL,
114 AnyHandle::Socket(_) => fidl::ObjectType::SOCKET,
115 AnyHandle::Event(_) => fidl::ObjectType::EVENT,
116 AnyHandle::EventPair(_) => fidl::ObjectType::EVENTPAIR,
117 AnyHandle::Unknown(_, t) => *t,
118 }
119 }
120}
121
122impl From<AnyHandle> for Handle {
123 fn from(item: AnyHandle) -> Handle {
124 match item {
125 AnyHandle::Channel(h) => h.into(),
126 AnyHandle::Socket(h) => h.into(),
127 AnyHandle::Event(h) => h.into(),
128 AnyHandle::EventPair(h) => h.into(),
129 AnyHandle::Unknown(h, _) => h,
130 }
131 }
132}
133
134pub enum HandleOp<'h> {
136 Move(Handle, fidl::Rights),
137 Duplicate(&'h Handle, fidl::Rights),
138}
139
140impl Channel {
141 pub fn recv_msg(&self) -> impl Future<Output = Result<MessageBuf, Error>> + use<> {
143 let client = self.0.client();
144 let handle = self.0.proto();
145
146 futures::future::poll_fn(move |ctx| {
147 client.poll_channel(handle, ctx, false).map(|x| {
148 x.expect("Got stream termination indication from non-streaming read!")
149 .map(|x| MessageBuf::from_proto(&client, x))
150 })
151 })
152 }
153
154 pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<MessageBuf, Error>> {
156 let client = self.0.client();
157 let handle = self.0.proto();
158
159 client.poll_channel(handle, cx, false).map(|x| {
160 x.expect("Got stream termination indication from non-streaming read!")
161 .map(|x| MessageBuf::from_proto(&client, x))
162 })
163 }
164
165 pub fn recv_from(&self, cx: &mut Context<'_>, buf: &mut MessageBuf) -> Poll<Result<(), Error>> {
167 let client = self.0.client();
168 match ready!(client.poll_channel(self.0.proto(), cx, false))
169 .expect("Got stream termination indication from non-streaming read!")
170 {
171 Ok(msg) => {
172 *buf = MessageBuf::from_proto(&client, msg);
173 Poll::Ready(Ok(()))
174 }
175 Err(e) => Poll::Ready(Err(e)),
176 }
177 }
178
179 pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
181 if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
182 || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
183 {
184 return Err(Error::FDomain(proto::Error::TargetError(
185 fidl::Status::OUT_OF_RANGE.into_raw(),
186 )));
187 }
188
189 let _ = self.write_inner(
190 bytes,
191 proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
192 );
193 Ok(())
194 }
195
196 pub fn fdomain_write(
200 &self,
201 bytes: &[u8],
202 handles: Vec<Handle>,
203 ) -> impl Future<Output = Result<(), Error>> + use<> {
204 if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
205 || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
206 {
207 Either::Left(async {
208 Err(Error::FDomain(proto::Error::TargetError(
209 fidl::Status::OUT_OF_RANGE.into_raw(),
210 )))
211 })
212 } else {
213 Either::Right(self.write_inner(
214 bytes,
215 proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
216 ))
217 }
218 }
219
220 pub fn on_closed(&self) -> OnFDomainSignals {
222 OnFDomainSignals::new(&self.0, fidl::Signals::OBJECT_PEER_CLOSED)
223 }
224
225 pub fn is_closed(&self) -> bool {
227 self.0.client.upgrade().is_none()
228 }
229
230 pub fn fdomain_write_etc<'b>(
234 &self,
235 bytes: &[u8],
236 handles: Vec<HandleOp<'b>>,
237 ) -> impl Future<Output = Result<(), Error>> + use<'b> {
238 let handles = handles
239 .into_iter()
240 .map(|handle| match handle {
241 HandleOp::Move(x, rights) => {
242 if Weak::ptr_eq(&x.client, &self.0.client) {
243 Ok(proto::HandleDisposition {
244 handle: proto::HandleOp::Move_(x.take_proto()),
245 rights,
246 })
247 } else {
248 Err(Error::ConnectionMismatch)
249 }
250 }
251 HandleOp::Duplicate(x, rights) => {
252 if Weak::ptr_eq(&x.client, &self.0.client) {
253 Ok(proto::HandleDisposition {
254 handle: proto::HandleOp::Duplicate(x.proto()),
255 rights,
256 })
257 } else {
258 Err(Error::ConnectionMismatch)
259 }
260 }
261 })
262 .collect::<Result<Vec<_>, Error>>();
263
264 let handles = if handles
265 .as_ref()
266 .map(|x| x.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize)
267 .unwrap_or(false)
268 || bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
269 {
270 Err(Error::FDomain(proto::Error::TargetError(fidl::Status::OUT_OF_RANGE.into_raw())))
271 } else {
272 handles
273 };
274
275 match handles {
276 Ok(handles) => {
277 Either::Left(self.write_inner(bytes, proto::Handles::Dispositions(handles)))
278 }
279 Err(e) => Either::Right(async move { Err(e) }),
280 }
281 }
282
283 fn write_inner(
285 &self,
286 bytes: &[u8],
287 handles: proto::Handles,
288 ) -> impl Future<Output = Result<(), Error>> + use<> {
289 let data = bytes.to_vec();
290 let client = self.0.client();
291 let handle = self.0.proto();
292
293 client.clear_handles_for_transfer(&handles);
294 client.transaction(
295 ordinals::WRITE_CHANNEL,
296 proto::ChannelWriteChannelRequest { handle, data, handles },
297 move |x| Responder::WriteChannel(x),
298 )
299 }
300
301 pub fn stream(self) -> Result<(ChannelMessageStream, ChannelWriter), Error> {
308 self.0.client().start_channel_streaming(self.0.proto())?;
309
310 let a = Arc::new(self);
311 let b = Arc::clone(&a);
312
313 Ok((ChannelMessageStream(a), ChannelWriter(b)))
314 }
315}
316
317#[derive(Debug, Clone)]
319pub struct ChannelWriter(Arc<Channel>);
320
321impl ChannelWriter {
322 pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
324 self.0.write(bytes, handles)
325 }
326
327 pub fn fdomain_write(
331 &self,
332 bytes: &[u8],
333 handles: Vec<Handle>,
334 ) -> impl Future<Output = Result<(), Error>> {
335 self.0.fdomain_write(bytes, handles)
336 }
337
338 pub fn fdomain_write_etc<'b>(
340 &self,
341 bytes: &[u8],
342 handles: Vec<HandleOp<'b>>,
343 ) -> impl Future<Output = Result<(), Error>> + 'b {
344 self.0.fdomain_write_etc(bytes, handles)
345 }
346
347 pub fn as_channel(&self) -> &Channel {
349 &*self.0
350 }
351}
352
353#[derive(Debug)]
355pub struct ChannelMessageStream(Arc<Channel>);
356
357impl ChannelMessageStream {
358 pub fn rejoin(mut self, writer: ChannelWriter) -> Channel {
365 assert!(Arc::ptr_eq(&self.0, &writer.0), "Tried to join stream with wrong writer!");
366 if let Some(client) = self.0.0.client.upgrade() {
367 client.stop_channel_streaming(self.0.0.proto())
368 }
369 std::mem::drop(writer);
370 let channel = std::mem::replace(&mut self.0, Arc::new(Channel(Handle::invalid())));
371 Arc::try_unwrap(channel).expect("Stream pointer no longer unique!")
372 }
373
374 pub fn is_closed(&self) -> bool {
376 let client = self.0.0.client();
377
378 !client.channel_is_streaming(self.0.0.proto())
379 }
380
381 pub fn as_channel(&self) -> &Channel {
383 &*self.0
384 }
385}
386
387impl Stream for ChannelMessageStream {
388 type Item = Result<MessageBuf, Error>;
389 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
390 let client = self.0.0.client();
391 client
392 .poll_channel(self.0.0.proto(), ctx, true)
393 .map(|x| x.map(|x| x.map(|x| MessageBuf::from_proto(&client, x))))
394 }
395}
396
397impl Drop for ChannelMessageStream {
398 fn drop(&mut self) {
399 if let Some(client) = self.0.0.client.upgrade() {
400 client.stop_channel_streaming(self.0.0.proto());
401 }
402 }
403}