1use crate::handle::handle_type;
6use crate::responder::Responder;
7use crate::{ordinals, Error, Event, EventPair, Handle, OnFDomainSignals, Socket};
8use fidl_fuchsia_fdomain as proto;
9use futures::future::Either;
10use futures::stream::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Weak};
14use std::task::{ready, Context, Poll};
15
16#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub struct Channel(pub(crate) Handle);
19
20handle_type!(Channel CHANNEL peered);
21
22#[derive(Debug)]
24pub struct MessageBuf {
25 pub bytes: Vec<u8>,
26 pub handles: Vec<HandleInfo>,
27}
28
29impl MessageBuf {
30 pub fn new() -> Self {
32 MessageBuf { bytes: Vec::new(), handles: Vec::new() }
33 }
34
35 pub fn ensure_capacity_bytes(&mut self, bytes: usize) {
37 self.bytes.reserve(bytes);
38 }
39
40 pub fn clear(&mut self) {
42 self.bytes.clear();
43 self.handles.clear();
44 }
45
46 pub fn bytes(&self) -> &[u8] {
48 self.bytes.as_slice()
49 }
50
51 fn from_proto(client: &Arc<crate::Client>, message: proto::ChannelMessage) -> MessageBuf {
53 let proto::ChannelMessage { data, handles } = message;
54 MessageBuf {
55 bytes: data,
56 handles: handles
57 .into_iter()
58 .map(|info| {
59 let handle = Handle { id: info.handle.id, client: Arc::downgrade(client) };
60 HandleInfo {
61 rights: info.rights,
62 handle: AnyHandle::from_handle(handle, info.type_),
63 }
64 })
65 .collect(),
66 }
67 }
68}
69
70#[derive(Debug)]
72pub struct HandleInfo {
73 pub handle: AnyHandle,
74 pub rights: fidl::Rights,
75}
76
77#[derive(Debug)]
80pub enum AnyHandle {
81 Channel(Channel),
82 Socket(Socket),
83 Event(Event),
84 EventPair(EventPair),
85 Unknown(Handle, fidl::ObjectType),
86}
87
88impl AnyHandle {
89 pub fn from_handle(handle: Handle, ty: fidl::ObjectType) -> AnyHandle {
91 match ty {
92 fidl::ObjectType::CHANNEL => AnyHandle::Channel(Channel(handle)),
93 fidl::ObjectType::SOCKET => AnyHandle::Socket(Socket(handle)),
94 fidl::ObjectType::EVENT => AnyHandle::Event(Event(handle)),
95 fidl::ObjectType::EVENTPAIR => AnyHandle::EventPair(EventPair(handle)),
96 _ => AnyHandle::Unknown(handle, ty),
97 }
98 }
99
100 pub fn invalid() -> AnyHandle {
102 AnyHandle::Unknown(Handle::invalid(), fidl::ObjectType::NONE)
103 }
104
105 pub fn object_type(&self) -> fidl::ObjectType {
107 match self {
108 AnyHandle::Channel(_) => fidl::ObjectType::CHANNEL,
109 AnyHandle::Socket(_) => fidl::ObjectType::SOCKET,
110 AnyHandle::Event(_) => fidl::ObjectType::EVENT,
111 AnyHandle::EventPair(_) => fidl::ObjectType::EVENTPAIR,
112 AnyHandle::Unknown(_, t) => *t,
113 }
114 }
115}
116
117impl From<AnyHandle> for Handle {
118 fn from(item: AnyHandle) -> Handle {
119 match item {
120 AnyHandle::Channel(h) => h.into(),
121 AnyHandle::Socket(h) => h.into(),
122 AnyHandle::Event(h) => h.into(),
123 AnyHandle::EventPair(h) => h.into(),
124 AnyHandle::Unknown(h, _) => h,
125 }
126 }
127}
128
129pub enum HandleOp<'h> {
131 Move(Handle, fidl::Rights),
132 Duplicate(&'h Handle, fidl::Rights),
133}
134
135impl Channel {
136 pub fn recv_msg(&self) -> impl Future<Output = Result<MessageBuf, Error>> {
138 let client = self.0.client();
139 let handle = self.0.proto();
140
141 futures::future::poll_fn(move |ctx| {
142 client.poll_channel(handle, ctx, false).map(|x| {
143 x.expect("Got stream termination indication from non-streaming read!")
144 .map(|x| MessageBuf::from_proto(&client, x))
145 })
146 })
147 }
148
149 pub fn recv_from(&self, cx: &mut Context<'_>, buf: &mut MessageBuf) -> Poll<Result<(), Error>> {
151 let client = self.0.client();
152 match ready!(client.poll_channel(self.0.proto(), cx, false))
153 .expect("Got stream termination indication from non-streaming read!")
154 {
155 Ok(msg) => {
156 *buf = MessageBuf::from_proto(&client, msg);
157 Poll::Ready(Ok(()))
158 }
159 Err(e) => Poll::Ready(Err(e)),
160 }
161 }
162
163 pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
165 if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
166 || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
167 {
168 return Err(Error::FDomain(proto::Error::TargetError(
169 fidl::Status::OUT_OF_RANGE.into_raw(),
170 )));
171 }
172
173 let _ = self.write_inner(
174 bytes,
175 proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
176 );
177 Ok(())
178 }
179
180 pub fn fdomain_write(
184 &self,
185 bytes: &[u8],
186 handles: Vec<Handle>,
187 ) -> impl Future<Output = Result<(), Error>> + '_ {
188 if bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
189 || handles.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize
190 {
191 Either::Left(async {
192 Err(Error::FDomain(proto::Error::TargetError(
193 fidl::Status::OUT_OF_RANGE.into_raw(),
194 )))
195 })
196 } else {
197 Either::Right(self.write_inner(
198 bytes,
199 proto::Handles::Handles(handles.into_iter().map(|x| x.take_proto()).collect()),
200 ))
201 }
202 }
203
204 pub fn on_closed(&self) -> OnFDomainSignals {
206 OnFDomainSignals::new(&self.0, fidl::Signals::OBJECT_PEER_CLOSED)
207 }
208
209 pub fn is_closed(&self) -> bool {
211 self.0.client.upgrade().is_none()
212 }
213
214 pub fn fdomain_write_etc<'b>(
218 &self,
219 bytes: &[u8],
220 handles: Vec<HandleOp<'b>>,
221 ) -> impl Future<Output = Result<(), Error>> + 'b {
222 let handles = handles
223 .into_iter()
224 .map(|handle| match handle {
225 HandleOp::Move(x, rights) => {
226 if Weak::ptr_eq(&x.client, &self.0.client) {
227 Ok(proto::HandleDisposition {
228 handle: proto::HandleOp::Move_(x.take_proto()),
229 rights,
230 })
231 } else {
232 Err(Error::ConnectionMismatch)
233 }
234 }
235 HandleOp::Duplicate(x, rights) => {
236 if Weak::ptr_eq(&x.client, &self.0.client) {
237 Ok(proto::HandleDisposition {
238 handle: proto::HandleOp::Duplicate(x.proto()),
239 rights,
240 })
241 } else {
242 Err(Error::ConnectionMismatch)
243 }
244 }
245 })
246 .collect::<Result<Vec<_>, Error>>();
247
248 let handles = if handles
249 .as_ref()
250 .map(|x| x.len() > zx_types::ZX_CHANNEL_MAX_MSG_HANDLES as usize)
251 .unwrap_or(false)
252 || bytes.len() > zx_types::ZX_CHANNEL_MAX_MSG_BYTES as usize
253 {
254 Err(Error::FDomain(proto::Error::TargetError(fidl::Status::OUT_OF_RANGE.into_raw())))
255 } else {
256 handles
257 };
258
259 match handles {
260 Ok(handles) => {
261 Either::Left(self.write_inner(bytes, proto::Handles::Dispositions(handles)))
262 }
263 Err(e) => Either::Right(async move { Err(e) }),
264 }
265 }
266
267 fn write_inner(
269 &self,
270 bytes: &[u8],
271 handles: proto::Handles,
272 ) -> impl Future<Output = Result<(), Error>> {
273 let data = bytes.to_vec();
274 let client = self.0.client();
275 let handle = self.0.proto();
276
277 client.clear_handles_for_transfer(&handles);
278 client.transaction(
279 ordinals::WRITE_CHANNEL,
280 proto::ChannelWriteChannelRequest { handle, data, handles },
281 move |x| Responder::WriteChannel(x),
282 )
283 }
284
285 pub fn stream(self) -> Result<(ChannelMessageStream, ChannelWriter), Error> {
292 self.0.client().start_channel_streaming(self.0.proto())?;
293
294 let a = Arc::new(self);
295 let b = Arc::clone(&a);
296
297 Ok((ChannelMessageStream(a), ChannelWriter(b)))
298 }
299}
300
301#[derive(Debug)]
303pub struct ChannelWriter(Arc<Channel>);
304
305impl ChannelWriter {
306 pub fn write(&self, bytes: &[u8], handles: Vec<Handle>) -> Result<(), Error> {
308 self.0.write(bytes, handles)
309 }
310
311 pub fn fdomain_write(
315 &self,
316 bytes: &[u8],
317 handles: Vec<Handle>,
318 ) -> impl Future<Output = Result<(), Error>> + '_ {
319 self.0.fdomain_write(bytes, handles)
320 }
321
322 pub fn fdomain_write_etc<'b>(
324 &self,
325 bytes: &[u8],
326 handles: Vec<HandleOp<'b>>,
327 ) -> impl Future<Output = Result<(), Error>> + 'b {
328 self.0.fdomain_write_etc(bytes, handles)
329 }
330
331 pub fn as_channel(&self) -> &Channel {
333 &*self.0
334 }
335}
336
337#[derive(Debug)]
339pub struct ChannelMessageStream(Arc<Channel>);
340
341impl ChannelMessageStream {
342 pub fn rejoin(mut self, writer: ChannelWriter) -> Channel {
349 assert!(Arc::ptr_eq(&self.0, &writer.0), "Tried to join stream with wrong writer!");
350 if let Some(client) = self.0 .0.client.upgrade() {
351 client.stop_channel_streaming(self.0 .0.proto())
352 }
353 std::mem::drop(writer);
354 let channel = std::mem::replace(&mut self.0, Arc::new(Channel(Handle::invalid())));
355 Arc::try_unwrap(channel).expect("Stream pointer no longer unique!")
356 }
357
358 pub fn is_closed(&self) -> bool {
360 let client = self.0 .0.client();
361
362 !client.channel_is_streaming(self.0 .0.proto())
363 }
364
365 pub fn as_channel(&self) -> &Channel {
367 &*self.0
368 }
369}
370
371impl Stream for ChannelMessageStream {
372 type Item = Result<MessageBuf, Error>;
373 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
374 let client = self.0 .0.client();
375 client
376 .poll_channel(self.0 .0.proto(), ctx, true)
377 .map(|x| x.map(|x| x.map(|x| MessageBuf::from_proto(&client, x))))
378 }
379}
380
381impl Drop for ChannelMessageStream {
382 fn drop(&mut self) {
383 if let Some(client) = self.0 .0.client.upgrade() {
384 client.stop_channel_streaming(self.0 .0.proto());
385 }
386 }
387}