fidl_next_protocol/transport.rs
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::error::Error;
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use fidl_next_codec::{Decoder, Encoder};
11
12// Design philosophy:
13//
14// - Fan-out is best handled by protocols (via executors)
15// - Fan-in is best handled by transports (in whatever way is appropriate)
16//
17// Therefore:
18//
19// - A transport may only have one receiver at a time. To parallelize, spawn a future for each
20// message.
21// - A transport may have many senders at a time. If a transport is serialized, then it must
22// determine the best way to enqueue operations.
23
24/// A transport layer which can send and receive messages.
25///
26/// The futures provided by this trait should be cancel-safe, which constrains their behavior:
27///
28/// - Operations should not partially complete.
29/// - Operations should only complete during polling.
30///
31/// `SendFuture` should return an `Poll::Ready` with an error when polled after the transport is
32/// closed.
33pub trait Transport {
34 /// The error type for the transport.
35 type Error: Error + Send + Sync + 'static;
36
37 /// Splits the transport into a sender and receiver.
38 fn split(self) -> (Self::Sender, Self::Receiver);
39
40 /// The sender half of the transport. Dropping all of the senders for a transport should close
41 /// the transport.
42 type Sender: Send + Sync + Clone;
43 /// The buffer type for senders.
44 type SendBuffer: Encoder + Send;
45 /// The future state for send operations.
46 type SendFutureState: Send;
47
48 /// Acquires an empty send buffer for the transport.
49 fn acquire(sender: &Self::Sender) -> Self::SendBuffer;
50 /// Begins sending a `SendBuffer` over this transport.
51 ///
52 /// Returns the state for a future which can be polled with `poll_send`.
53 fn begin_send(sender: &Self::Sender, buffer: Self::SendBuffer) -> Self::SendFutureState;
54 /// Polls a `SendFutureState` for completion with a sender.
55 fn poll_send(
56 future: Pin<&mut Self::SendFutureState>,
57 cx: &mut Context<'_>,
58 sender: &Self::Sender,
59 ) -> Poll<Result<(), Self::Error>>;
60 /// Closes the transport.
61 fn close(sender: &Self::Sender);
62
63 /// The receiver half of the transport.
64 type Receiver: Send;
65 /// The future state for receive operations.
66 type RecvFutureState: Send;
67 /// The buffer type for receivers.
68 type RecvBuffer: Decoder + Send;
69
70 /// Begins receiving a `RecvBuffer` over this transport.
71 ///
72 /// Returns the state for a future which can be polled with `poll_recv`.
73 fn begin_recv(receiver: &mut Self::Receiver) -> Self::RecvFutureState;
74 /// Polls a `RecvFutureState` for completion with a receiver.
75 fn poll_recv(
76 future: Pin<&mut Self::RecvFutureState>,
77 cx: &mut Context<'_>,
78 receiver: &mut Self::Receiver,
79 ) -> Poll<Result<Option<Self::RecvBuffer>, Self::Error>>;
80}
81
82/// A transport layer which can send messages without blocking.
83///
84/// Non-blocking send operations cannot apply backpressure, which can cause memory exhaustion across
85/// the system. `NonBlockingTransport` is intended for use only while porting existing code.
86pub trait NonBlockingTransport: Transport {
87 /// Completes a `SendFutureState` using a sender without blocking.
88 fn send_immediately(
89 future_state: &mut Self::SendFutureState,
90 sender: &Self::Sender,
91 ) -> Result<(), Self::Error>;
92}
93
94/// Helper methods for `Transport`.
95pub trait TransportExt: Transport {
96 /// Sends an encoded message over the transport.
97 fn send(sender: &Self::Sender, buffer: Self::SendBuffer) -> SendFuture<'_, Self> {
98 let future_state = Self::begin_send(sender, buffer);
99 SendFuture { sender, future_state }
100 }
101
102 /// Receives an encoded message over the transport.
103 fn recv(receiver: &mut Self::Receiver) -> RecvFuture<'_, Self> {
104 let future_state = Self::begin_recv(receiver);
105 RecvFuture { receiver, future_state }
106 }
107}
108
109impl<T: Transport + ?Sized> TransportExt for T {}
110
111/// A future which sends an encoded message over the transport.
112#[must_use = "futures do nothing unless polled"]
113pub struct SendFuture<'s, T: Transport + ?Sized> {
114 sender: &'s T::Sender,
115 future_state: T::SendFutureState,
116}
117
118impl<T: NonBlockingTransport> SendFuture<'_, T> {
119 /// Completes the send operation synchronously and without blocking.
120 ///
121 /// Using this method prevents transports from applying backpressure. Prefer awaiting when
122 /// possible.
123 pub fn send_immediately(mut self) -> Result<(), T::Error> {
124 T::send_immediately(&mut self.future_state, self.sender)
125 }
126}
127
128impl<T: Transport> Future for SendFuture<'_, T> {
129 type Output = Result<(), T::Error>;
130
131 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 let this = unsafe { Pin::into_inner_unchecked(self) };
133 let future_state = unsafe { Pin::new_unchecked(&mut this.future_state) };
134 T::poll_send(future_state, cx, this.sender)
135 }
136}
137
138/// A future which receives an encoded message over the transport.
139#[must_use = "futures do nothing unless polled"]
140pub struct RecvFuture<'r, T: Transport + ?Sized> {
141 receiver: &'r mut T::Receiver,
142 future_state: T::RecvFutureState,
143}
144
145impl<T: Transport> Future for RecvFuture<'_, T> {
146 type Output = Result<Option<T::RecvBuffer>, T::Error>;
147
148 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149 let this = unsafe { Pin::into_inner_unchecked(self) };
150 let future_state = unsafe { Pin::new_unchecked(&mut this.future_state) };
151 T::poll_recv(future_state, cx, this.receiver)
152 }
153}