fdomain_client/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::handle::handle_type;
6use crate::responder::Responder;
7use crate::{Error, Event, EventPair, Handle, OnFDomainSignals, Socket, ordinals};
8use fidl_fuchsia_fdomain as proto;
9use futures::future::Either;
10use futures::stream::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{Context, Poll, ready};
15
16/// A channel in a remote FDomain.
17#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub struct Channel(pub(crate) Handle);
19
20handle_type!(Channel CHANNEL peered);
21
22/// A message which has been read from a channel.
23#[derive(Debug)]
24pub struct MessageBuf {
25    pub bytes: Vec<u8>,
26    pub handles: Vec<HandleInfo>,
27}
28
29impl MessageBuf {
30    /// Create a new [`MessageBuf`]
31    pub fn new() -> Self {
32        MessageBuf { bytes: Vec::new(), handles: Vec::new() }
33    }
34
35    /// Get the components of this buffer separately.
36    pub fn split(self) -> (Vec<u8>, Vec<HandleInfo>) {
37        (self.bytes, self.handles)
38    }
39
40    /// Make sure this buffer has room for a certain number of bytes.
41    pub fn ensure_capacity_bytes(&mut self, bytes: usize) {
42        self.bytes.reserve(bytes);
43    }
44
45    /// Clear out the contents of this buffer.
46    pub fn clear(&mut self) {
47        self.bytes.clear();
48        self.handles.clear();
49    }
50
51    /// Get the byte content of this buffer.
52    pub fn bytes(&self) -> &[u8] {
53        self.bytes.as_slice()
54    }
55
56    /// Convert a proto ChannelMessage to a MessageBuf.
57    fn from_proto(client: &Arc<crate::Client>, message: proto::ChannelMessage) -> MessageBuf {
58        let proto::ChannelMessage { data, handles } = message;
59        MessageBuf {
60            bytes: data,
61            handles: handles
62                .into_iter()
63                .map(|info| {
64                    let handle = Handle { id: info.handle.id, client: Arc::downgrade(client) };
65                    HandleInfo {
66                        rights: info.rights,
67                        handle: AnyHandle::from_handle(handle, info.type_),
68                    }
69                })
70                .collect(),
71        }
72    }
73}
74
75/// A handle which has been read from a channel.
76#[derive(Debug)]
77pub struct HandleInfo {
78    pub handle: AnyHandle,
79    pub rights: fidl::Rights,
80}
81
82/// Sum type of all the handle types which can be read from a channel. Allows
83/// the user to learn the type of a handle after it has been read.
84#[derive(Debug)]
85pub enum AnyHandle {
86    Channel(Channel),
87    Socket(Socket),
88    Event(Event),
89    EventPair(EventPair),
90    Unknown(Handle, fidl::ObjectType),
91}
92
93impl AnyHandle {
94    /// Construct an `AnyHandle` from a `Handle` and an object type.
95    pub fn from_handle(handle: Handle, ty: fidl::ObjectType) -> AnyHandle {
96        match ty {
97            fidl::ObjectType::CHANNEL => AnyHandle::Channel(Channel(handle)),
98            fidl::ObjectType::SOCKET => AnyHandle::Socket(Socket(handle)),
99            fidl::ObjectType::EVENT => AnyHandle::Event(Event(handle)),
100            fidl::ObjectType::EVENTPAIR => AnyHandle::EventPair(EventPair(handle)),
101            _ => AnyHandle::Unknown(handle, ty),
102        }
103    }
104
105    /// Get an `AnyHandle` wrapping an invalid handle.
106    pub fn invalid() -> AnyHandle {
107        AnyHandle::Unknown(Handle::invalid(), fidl::ObjectType::NONE)
108    }
109
110    /// Get the object type for a handle.
111    pub fn object_type(&self) -> fidl::ObjectType {
112        match self {
113            AnyHandle::Channel(_) => fidl::ObjectType::CHANNEL,
114            AnyHandle::Socket(_) => fidl::ObjectType::SOCKET,
115            AnyHandle::Event(_) => fidl::ObjectType::EVENT,
116            AnyHandle::EventPair(_) => fidl::ObjectType::EVENTPAIR,
117            AnyHandle::Unknown(_, t) => *t,
118        }
119    }
120}
121
122impl From<AnyHandle> for Handle {
123    fn from(item: AnyHandle) -> Handle {
124        match item {
125            AnyHandle::Channel(h) => h.into(),
126            AnyHandle::Socket(h) => h.into(),
127            AnyHandle::Event(h) => h.into(),
128            AnyHandle::EventPair(h) => h.into(),
129            AnyHandle::Unknown(h, _) => h,
130        }
131    }
132}
133
134/// Operation to perform on a handle when writing it to a channel.
135pub enum HandleOp<'h> {
136    Move(Handle, fidl::Rights),
137    Duplicate(&'h Handle, fidl::Rights),
138}
139
140impl Channel {
141    /// Reads a message from the channel.
142    pub fn recv_msg(&self) -> impl Future<Output = Result<MessageBuf, Error>> + use<> {
143        let client = self.0.client();
144        let handle = self.0.proto();
145
146        futures::future::poll_fn(move |ctx| {
147            client.poll_channel(handle, ctx, false).map(|x| {
148                x.expect("Got stream termination indication from non-streaming read!")
149                    .map(|x| MessageBuf::from_proto(&client, x))
150            })
151        })
152    }
153
154    /// Poll to try and read a channel message.
155    pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<MessageBuf, Error>> {
156        let client = self.0.client();
157        let handle = self.0.proto();
158
159        client.poll_channel(handle, cx, false).map(|x| {
160            x.expect("Got stream termination indication from non-streaming read!")
161                .map(|x| MessageBuf::from_proto(&client, x))
162        })
163    }
164
165    /// Poll a channel for a message to read.
166    pub fn recv_from(&self, cx: &mut Context<'_>, buf: &mut MessageBuf) -> Poll<Result<(), Error>> {
167        let client = self.0.client();
168        match ready!(client.poll_channel(self.0.proto(), cx, false))
169            .expect("Got stream termination indication from non-streaming read!")
170        {
171            Ok(msg) => {
172                *buf = MessageBuf::from_proto(&client, msg);
173                Poll::Ready(Ok(()))
174            }
175            Err(e) => Poll::Ready(Err(e)),
176        }
177    }
178
179    /// Writes a message into the channel.
180    pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
181        if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
182            || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
183        {
184            return Err(Error::FDomain(proto::Error::TargetError(
185                fidl::Status::OUT_OF_RANGE.into_raw(),
186            )));
187        }
188
189        let _ = self.write_inner(
190            bytes,
191            proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
192        );
193        Ok(())
194    }
195
196    /// Writes a message into the channel. Returns a future that will allow you
197    /// to wait for the write to move across the FDomain connection and return
198    /// with the result of the actual write call on the target.
199    pub fn fdomain_write(
200        &self,
201        bytes: &[u8],
202        handles: Vec<Handle>,
203    ) -> impl Future<Output = Result<(), Error>> + use<> {
204        if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
205            || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
206        {
207            Either::Left(async {
208                Err(Error::FDomain(proto::Error::TargetError(
209                    fidl::Status::OUT_OF_RANGE.into_raw(),
210                )))
211            })
212        } else {
213            Either::Right(self.write_inner(
214                bytes,
215                proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
216            ))
217        }
218    }
219
220    /// A future that returns when the channel is closed.
221    pub fn on_closed(&self) -> OnFDomainSignals {
222        OnFDomainSignals::new(&self.0, fidl::Signals::OBJECT_PEER_CLOSED)
223    }
224
225    /// Whether this handle is closed.
226    pub fn is_closed(&self) -> bool {
227        self.0.client.upgrade().is_none()
228    }
229
230    /// Writes a message into the channel. Optionally duplicates some of the
231    /// handles rather than consuming them, and can update the handle's rights
232    /// before sending.
233    pub fn fdomain_write_etc<'b>(
234        &self,
235        bytes: &[u8],
236        handles: Vec<HandleOp<'b>>,
237    ) -> impl Future<Output = Result<(), Error>> + use<'b> {
238        let handles = handles
239            .into_iter()
240            .map(|handle| match handle {
241                HandleOp::Move(x, rights) => {
242                    if Weak::ptr_eq(&x.client, &self.0.client) {
243                        Ok(proto::HandleDisposition {
244                            handle: proto::HandleOp::Move_(x.take_proto()),
245                            rights,
246                        })
247                    } else {
248                        Err(Error::ConnectionMismatch)
249                    }
250                }
251                HandleOp::Duplicate(x, rights) => {
252                    if Weak::ptr_eq(&x.client, &self.0.client) {
253                        Ok(proto::HandleDisposition {
254                            handle: proto::HandleOp::Duplicate(x.proto()),
255                            rights,
256                        })
257                    } else {
258                        Err(Error::ConnectionMismatch)
259                    }
260                }
261            })
262            .collect::<Result<Vec<_>, Error>>();
263
264        let handles = if handles
265            .as_ref()
266            .map(|x| x.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize)
267            .unwrap_or(false)
268            || bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
269        {
270            Err(Error::FDomain(proto::Error::TargetError(fidl::Status::OUT_OF_RANGE.into_raw())))
271        } else {
272            handles
273        };
274
275        match handles {
276            Ok(handles) => {
277                Either::Left(self.write_inner(bytes, proto::Handles::Dispositions(handles)))
278            }
279            Err(e) => Either::Right(async move { Err(e) }),
280        }
281    }
282
283    /// Writes a message into the channel.
284    fn write_inner(
285        &self,
286        bytes: &[u8],
287        handles: proto::Handles,
288    ) -> impl Future<Output = Result<(), Error>> + use<> {
289        let data = bytes.to_vec();
290        let client = self.0.client();
291        let handle = self.0.proto();
292
293        client.clear_handles_for_transfer(&handles);
294        client.transaction(
295            ordinals::WRITE_CHANNEL,
296            proto::ChannelWriteChannelRequest { handle, data, handles },
297            move |x| Responder::WriteChannel(x),
298        )
299    }
300
301    /// Split this channel into a streaming reader and a writer. This is more
302    /// efficient on the read side if you intend to consume all of the messages
303    /// from the channel. However it will prevent you from transferring the
304    /// handle in the future. It also means messages will build up in the
305    /// buffer, so it may lead to memory issues if you don't intend to use the
306    /// messages from the channel as fast as they come.
307    pub fn stream(self) -> Result<(ChannelMessageStream, ChannelWriter), Error> {
308        self.0.client().start_channel_streaming(self.0.proto())?;
309
310        let a = Arc::new(self);
311        let b = Arc::clone(&a);
312
313        Ok((ChannelMessageStream(a), ChannelWriter(b)))
314    }
315}
316
317/// A write-only handle to a socket.
318#[derive(Debug, Clone)]
319pub struct ChannelWriter(Arc<Channel>);
320
321impl ChannelWriter {
322    /// Writes a message into the channel.
323    pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
324        self.0.write(bytes, handles)
325    }
326
327    /// Writes a message into the channel. Returns a future that will allow you
328    /// to wait for the write to move across the FDomain connection and return
329    /// with the result of the actual write call on the target.
330    pub fn fdomain_write(
331        &self,
332        bytes: &[u8],
333        handles: Vec<Handle>,
334    ) -> impl Future<Output = Result<(), Error>> {
335        self.0.fdomain_write(bytes, handles)
336    }
337
338    /// Writes a message into the channel.
339    pub fn fdomain_write_etc<'b>(
340        &self,
341        bytes: &[u8],
342        handles: Vec<HandleOp<'b>>,
343    ) -> impl Future<Output = Result<(), Error>> + 'b {
344        self.0.fdomain_write_etc(bytes, handles)
345    }
346
347    /// Get a reference to the inner channel.
348    pub fn as_channel(&self) -> &Channel {
349        &*self.0
350    }
351}
352
353/// A stream of data issuing from a socket.
354#[derive(Debug)]
355pub struct ChannelMessageStream(Arc<Channel>);
356
357impl ChannelMessageStream {
358    /// Turn a `ChannelMessageStream` and its accompanying `ChannelWriter` back
359    /// into a `Channel`.
360    ///
361    /// # Panics
362    /// If this stream and the writer passed didn't come from the same call to
363    /// `Channel::stream`, or if there is more than one writer.
364    pub fn rejoin(mut self, writer: ChannelWriter) -> Channel {
365        assert!(Arc::ptr_eq(&self.0, &writer.0), "Tried to join stream with wrong writer!");
366        if let Some(client) = self.0.0.client.upgrade() {
367            client.stop_channel_streaming(self.0.0.proto())
368        }
369        std::mem::drop(writer);
370        let channel = std::mem::replace(&mut self.0, Arc::new(Channel(Handle::invalid())));
371        Arc::try_unwrap(channel).expect("Stream pointer no longer unique!")
372    }
373
374    /// Whether this stream is closed.
375    pub fn is_closed(&self) -> bool {
376        let client = self.0.0.client();
377
378        !client.channel_is_streaming(self.0.0.proto())
379    }
380
381    /// Get a reference to the inner channel.
382    pub fn as_channel(&self) -> &Channel {
383        &*self.0
384    }
385}
386
387impl Stream for ChannelMessageStream {
388    type Item = Result<MessageBuf, Error>;
389    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
390        let client = self.0.0.client();
391        client
392            .poll_channel(self.0.0.proto(), ctx, true)
393            .map(|x| x.map(|x| x.map(|x| MessageBuf::from_proto(&client, x))))
394    }
395}
396
397impl Drop for ChannelMessageStream {
398    fn drop(&mut self) {
399        if let Some(client) = self.0.0.client.upgrade() {
400            client.stop_channel_streaming(self.0.0.proto());
401        }
402    }
403}