fdomain_client/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::handle::handle_type;
6use crate::responder::Responder;
7use crate::{ordinals, Error, Event, EventPair, Handle, OnFDomainSignals, Socket};
8use fidl_fuchsia_fdomain as proto;
9use futures::future::Either;
10use futures::stream::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{ready, Context, Poll};
15
16/// A channel in a remote FDomain.
17#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub struct Channel(pub(crate) Handle);
19
20handle_type!(Channel CHANNEL peered);
21
22/// A message which has been read from a channel.
23#[derive(Debug)]
24pub struct MessageBuf {
25    pub bytes: Vec<u8>,
26    pub handles: Vec<HandleInfo>,
27}
28
29impl MessageBuf {
30    /// Create a new [`MessageBuf`]
31    pub fn new() -> Self {
32        MessageBuf { bytes: Vec::new(), handles: Vec::new() }
33    }
34
35    /// Make sure this buffer has room for a certain number of bytes.
36    pub fn ensure_capacity_bytes(&mut self, bytes: usize) {
37        self.bytes.reserve(bytes);
38    }
39
40    /// Clear out the contents of this buffer.
41    pub fn clear(&mut self) {
42        self.bytes.clear();
43        self.handles.clear();
44    }
45
46    /// Get the byte content of this buffer.
47    pub fn bytes(&self) -> &[u8] {
48        self.bytes.as_slice()
49    }
50
51    /// Convert a proto ChannelMessage to a MessageBuf.
52    fn from_proto(client: &Arc<crate::Client>, message: proto::ChannelMessage) -> MessageBuf {
53        let proto::ChannelMessage { data, handles } = message;
54        MessageBuf {
55            bytes: data,
56            handles: handles
57                .into_iter()
58                .map(|info| {
59                    let handle = Handle { id: info.handle.id, client: Arc::downgrade(client) };
60                    HandleInfo {
61                        rights: info.rights,
62                        handle: AnyHandle::from_handle(handle, info.type_),
63                    }
64                })
65                .collect(),
66        }
67    }
68}
69
70/// A handle which has been read from a channel.
71#[derive(Debug)]
72pub struct HandleInfo {
73    pub handle: AnyHandle,
74    pub rights: fidl::Rights,
75}
76
77/// Sum type of all the handle types which can be read from a channel. Allows
78/// the user to learn the type of a handle after it has been read.
79#[derive(Debug)]
80pub enum AnyHandle {
81    Channel(Channel),
82    Socket(Socket),
83    Event(Event),
84    EventPair(EventPair),
85    Unknown(Handle, fidl::ObjectType),
86}
87
88impl AnyHandle {
89    /// Construct an `AnyHandle` from a `Handle` and an object type.
90    pub fn from_handle(handle: Handle, ty: fidl::ObjectType) -> AnyHandle {
91        match ty {
92            fidl::ObjectType::CHANNEL => AnyHandle::Channel(Channel(handle)),
93            fidl::ObjectType::SOCKET => AnyHandle::Socket(Socket(handle)),
94            fidl::ObjectType::EVENT => AnyHandle::Event(Event(handle)),
95            fidl::ObjectType::EVENTPAIR => AnyHandle::EventPair(EventPair(handle)),
96            _ => AnyHandle::Unknown(handle, ty),
97        }
98    }
99
100    /// Get an `AnyHandle` wrapping an invalid handle.
101    pub fn invalid() -> AnyHandle {
102        AnyHandle::Unknown(Handle::invalid(), fidl::ObjectType::NONE)
103    }
104
105    /// Get the object type for a handle.
106    pub fn object_type(&self) -> fidl::ObjectType {
107        match self {
108            AnyHandle::Channel(_) => fidl::ObjectType::CHANNEL,
109            AnyHandle::Socket(_) => fidl::ObjectType::SOCKET,
110            AnyHandle::Event(_) => fidl::ObjectType::EVENT,
111            AnyHandle::EventPair(_) => fidl::ObjectType::EVENTPAIR,
112            AnyHandle::Unknown(_, t) => *t,
113        }
114    }
115}
116
117impl From<AnyHandle> for Handle {
118    fn from(item: AnyHandle) -> Handle {
119        match item {
120            AnyHandle::Channel(h) => h.into(),
121            AnyHandle::Socket(h) => h.into(),
122            AnyHandle::Event(h) => h.into(),
123            AnyHandle::EventPair(h) => h.into(),
124            AnyHandle::Unknown(h, _) => h,
125        }
126    }
127}
128
129/// Operation to perform on a handle when writing it to a channel.
130pub enum HandleOp<'h> {
131    Move(Handle, fidl::Rights),
132    Duplicate(&'h Handle, fidl::Rights),
133}
134
135impl Channel {
136    /// Reads a message from the channel.
137    pub fn recv_msg(&self) -> impl Future<Output = Result<MessageBuf, Error>> {
138        let client = self.0.client();
139        let handle = self.0.proto();
140
141        futures::future::poll_fn(move |ctx| {
142            client.poll_channel(handle, ctx, false).map(|x| {
143                x.expect("Got stream termination indication from non-streaming read!")
144                    .map(|x| MessageBuf::from_proto(&client, x))
145            })
146        })
147    }
148
149    /// Poll a channel for a message to read.
150    pub fn recv_from(&self, cx: &mut Context<'_>, buf: &mut MessageBuf) -> Poll<Result<(), Error>> {
151        let client = self.0.client();
152        match ready!(client.poll_channel(self.0.proto(), cx, false))
153            .expect("Got stream termination indication from non-streaming read!")
154        {
155            Ok(msg) => {
156                *buf = MessageBuf::from_proto(&client, msg);
157                Poll::Ready(Ok(()))
158            }
159            Err(e) => Poll::Ready(Err(e)),
160        }
161    }
162
163    /// Writes a message into the channel.
164    pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
165        if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
166            || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
167        {
168            return Err(Error::FDomain(proto::Error::TargetError(
169                fidl::Status::OUT_OF_RANGE.into_raw(),
170            )));
171        }
172
173        let _ = self.write_inner(
174            bytes,
175            proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
176        );
177        Ok(())
178    }
179
180    /// Writes a message into the channel. Returns a future that will allow you
181    /// to wait for the write to move across the FDomain connection and return
182    /// with the result of the actual write call on the target.
183    pub fn fdomain_write(
184        &self,
185        bytes: &[u8],
186        handles: Vec<Handle>,
187    ) -> impl Future<Output = Result<(), Error>> + '_ {
188        if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
189            || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
190        {
191            Either::Left(async {
192                Err(Error::FDomain(proto::Error::TargetError(
193                    fidl::Status::OUT_OF_RANGE.into_raw(),
194                )))
195            })
196        } else {
197            Either::Right(self.write_inner(
198                bytes,
199                proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
200            ))
201        }
202    }
203
204    /// A future that returns when the channel is closed.
205    pub fn on_closed(&self) -> OnFDomainSignals {
206        OnFDomainSignals::new(&self.0, fidl::Signals::OBJECT_PEER_CLOSED)
207    }
208
209    /// Whether this handle is closed.
210    pub fn is_closed(&self) -> bool {
211        self.0.client.upgrade().is_none()
212    }
213
214    /// Writes a message into the channel. Optionally duplicates some of the
215    /// handles rather than consuming them, and can update the handle's rights
216    /// before sending.
217    pub fn fdomain_write_etc<'b>(
218        &self,
219        bytes: &[u8],
220        handles: Vec<HandleOp<'b>>,
221    ) -> impl Future<Output = Result<(), Error>> + 'b {
222        let handles = handles
223            .into_iter()
224            .map(|handle| match handle {
225                HandleOp::Move(x, rights) => {
226                    if Weak::ptr_eq(&x.client, &self.0.client) {
227                        Ok(proto::HandleDisposition {
228                            handle: proto::HandleOp::Move_(x.take_proto()),
229                            rights,
230                        })
231                    } else {
232                        Err(Error::ConnectionMismatch)
233                    }
234                }
235                HandleOp::Duplicate(x, rights) => {
236                    if Weak::ptr_eq(&x.client, &self.0.client) {
237                        Ok(proto::HandleDisposition {
238                            handle: proto::HandleOp::Duplicate(x.proto()),
239                            rights,
240                        })
241                    } else {
242                        Err(Error::ConnectionMismatch)
243                    }
244                }
245            })
246            .collect::<Result<Vec<_>, Error>>();
247
248        let handles = if handles
249            .as_ref()
250            .map(|x| x.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize)
251            .unwrap_or(false)
252            || bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
253        {
254            Err(Error::FDomain(proto::Error::TargetError(fidl::Status::OUT_OF_RANGE.into_raw())))
255        } else {
256            handles
257        };
258
259        match handles {
260            Ok(handles) => {
261                Either::Left(self.write_inner(bytes, proto::Handles::Dispositions(handles)))
262            }
263            Err(e) => Either::Right(async move { Err(e) }),
264        }
265    }
266
267    /// Writes a message into the channel.
268    fn write_inner(
269        &self,
270        bytes: &[u8],
271        handles: proto::Handles,
272    ) -> impl Future<Output = Result<(), Error>> {
273        let data = bytes.to_vec();
274        let client = self.0.client();
275        let handle = self.0.proto();
276
277        client.clear_handles_for_transfer(&handles);
278        client.transaction(
279            ordinals::WRITE_CHANNEL,
280            proto::ChannelWriteChannelRequest { handle, data, handles },
281            move |x| Responder::WriteChannel(x),
282        )
283    }
284
285    /// Split this channel into a streaming reader and a writer. This is more
286    /// efficient on the read side if you intend to consume all of the messages
287    /// from the channel. However it will prevent you from transferring the
288    /// handle in the future. It also means messages will build up in the
289    /// buffer, so it may lead to memory issues if you don't intend to use the
290    /// messages from the channel as fast as they come.
291    pub fn stream(self) -> Result<(ChannelMessageStream, ChannelWriter), Error> {
292        self.0.client().start_channel_streaming(self.0.proto())?;
293
294        let a = Arc::new(self);
295        let b = Arc::clone(&a);
296
297        Ok((ChannelMessageStream(a), ChannelWriter(b)))
298    }
299}
300
301/// A write-only handle to a socket.
302#[derive(Debug)]
303pub struct ChannelWriter(Arc<Channel>);
304
305impl ChannelWriter {
306    /// Writes a message into the channel.
307    pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
308        self.0.write(bytes, handles)
309    }
310
311    /// Writes a message into the channel. Returns a future that will allow you
312    /// to wait for the write to move across the FDomain connection and return
313    /// with the result of the actual write call on the target.
314    pub fn fdomain_write(
315        &self,
316        bytes: &[u8],
317        handles: Vec<Handle>,
318    ) -> impl Future<Output = Result<(), Error>> + '_ {
319        self.0.fdomain_write(bytes, handles)
320    }
321
322    /// Writes a message into the channel.
323    pub fn fdomain_write_etc<'b>(
324        &self,
325        bytes: &[u8],
326        handles: Vec<HandleOp<'b>>,
327    ) -> impl Future<Output = Result<(), Error>> + 'b {
328        self.0.fdomain_write_etc(bytes, handles)
329    }
330
331    /// Get a reference to the inner channel.
332    pub fn as_channel(&self) -> &Channel {
333        &*self.0
334    }
335}
336
337/// A stream of data issuing from a socket.
338#[derive(Debug)]
339pub struct ChannelMessageStream(Arc<Channel>);
340
341impl ChannelMessageStream {
342    /// Turn a `ChannelMessageStream` and its accompanying `ChannelWriter` back
343    /// into a `Channel`.
344    ///
345    /// # Panics
346    /// If this stream and the writer passed didn't come from the same call to
347    /// `Channel::stream`.
348    pub fn rejoin(mut self, writer: ChannelWriter) -> Channel {
349        assert!(Arc::ptr_eq(&self.0, &writer.0), "Tried to join stream with wrong writer!");
350        if let Some(client) = self.0 .0.client.upgrade() {
351            client.stop_channel_streaming(self.0 .0.proto())
352        }
353        std::mem::drop(writer);
354        let channel = std::mem::replace(&mut self.0, Arc::new(Channel(Handle::invalid())));
355        Arc::try_unwrap(channel).expect("Stream pointer no longer unique!")
356    }
357
358    /// Whether this stream is closed.
359    pub fn is_closed(&self) -> bool {
360        let client = self.0 .0.client();
361
362        !client.channel_is_streaming(self.0 .0.proto())
363    }
364
365    /// Get a reference to the inner channel.
366    pub fn as_channel(&self) -> &Channel {
367        &*self.0
368    }
369}
370
371impl Stream for ChannelMessageStream {
372    type Item = Result<MessageBuf, Error>;
373    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
374        let client = self.0 .0.client();
375        client
376            .poll_channel(self.0 .0.proto(), ctx, true)
377            .map(|x| x.map(|x| x.map(|x| MessageBuf::from_proto(&client, x))))
378    }
379}
380
381impl Drop for ChannelMessageStream {
382    fn drop(&mut self) {
383        if let Some(client) = self.0 .0.client.upgrade() {
384            client.stop_channel_streaming(self.0 .0.proto());
385        }
386    }
387}