fidl_next_protocol/
transport.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::error::Error;
6use core::future::Future;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use fidl_next_codec::{Decoder, Encoder};
11
12// Design philosophy:
13//
14// - Fan-out is best handled by protocols (via executors)
15// - Fan-in is best handled by transports (in whatever way is appropriate)
16//
17// Therefore:
18//
19// - A transport may only have one receiver at a time. To parallelize, spawn a future for each
20//   message.
21// - A transport may have many senders at a time. If a transport is serialized, then it must
22//   determine the best way to enqueue operations.
23
24/// A transport layer which can send and receive messages.
25///
26/// The futures provided by this trait should be cancel-safe, which constrains their behavior:
27///
28/// - Operations should not partially complete.
29/// - Operations should only complete during polling.
30///
31/// `SendFuture` should return an `Poll::Ready` with an error when polled after the transport is
32/// closed.
33pub trait Transport {
34    /// The error type for the transport.
35    type Error: Error + Send + Sync + 'static;
36
37    /// Splits the transport into a sender and receiver.
38    fn split(self) -> (Self::Sender, Self::Receiver);
39
40    /// The sender half of the transport. Dropping all of the senders for a transport should close
41    /// the transport.
42    type Sender: Send + Sync + Clone;
43    /// The buffer type for senders.
44    type SendBuffer: Encoder + Send;
45    /// The future state for send operations.
46    type SendFutureState: Send;
47
48    /// Acquires an empty send buffer for the transport.
49    fn acquire(sender: &Self::Sender) -> Self::SendBuffer;
50    /// Begins sending a `SendBuffer` over this transport.
51    ///
52    /// Returns the state for a future which can be polled with `poll_send`.
53    fn begin_send(sender: &Self::Sender, buffer: Self::SendBuffer) -> Self::SendFutureState;
54    /// Polls a `SendFutureState` for completion with a sender.
55    fn poll_send(
56        future: Pin<&mut Self::SendFutureState>,
57        cx: &mut Context<'_>,
58        sender: &Self::Sender,
59    ) -> Poll<Result<(), Self::Error>>;
60    /// Closes the transport.
61    fn close(sender: &Self::Sender);
62
63    /// The receiver half of the transport.
64    type Receiver: Send;
65    /// The future state for receive operations.
66    type RecvFutureState: Send;
67    /// The buffer type for receivers.
68    type RecvBuffer: Decoder + Send;
69
70    /// Begins receiving a `RecvBuffer` over this transport.
71    ///
72    /// Returns the state for a future which can be polled with `poll_recv`.
73    fn begin_recv(receiver: &mut Self::Receiver) -> Self::RecvFutureState;
74    /// Polls a `RecvFutureState` for completion with a receiver.
75    fn poll_recv(
76        future: Pin<&mut Self::RecvFutureState>,
77        cx: &mut Context<'_>,
78        receiver: &mut Self::Receiver,
79    ) -> Poll<Result<Option<Self::RecvBuffer>, Self::Error>>;
80}
81
82/// A transport layer which can send messages without blocking.
83///
84/// Non-blocking send operations cannot apply backpressure, which can cause memory exhaustion across
85/// the system. `NonBlockingTransport` is intended for use only while porting existing code.
86pub trait NonBlockingTransport: Transport {
87    /// Completes a `SendFutureState` using a sender without blocking.
88    fn send_immediately(
89        future_state: &mut Self::SendFutureState,
90        sender: &Self::Sender,
91    ) -> Result<(), Self::Error>;
92}
93
94/// Helper methods for `Transport`.
95pub trait TransportExt: Transport {
96    /// Sends an encoded message over the transport.
97    fn send(sender: &Self::Sender, buffer: Self::SendBuffer) -> SendFuture<'_, Self> {
98        let future_state = Self::begin_send(sender, buffer);
99        SendFuture { sender, future_state }
100    }
101
102    /// Receives an encoded message over the transport.
103    fn recv(receiver: &mut Self::Receiver) -> RecvFuture<'_, Self> {
104        let future_state = Self::begin_recv(receiver);
105        RecvFuture { receiver, future_state }
106    }
107}
108
109impl<T: Transport + ?Sized> TransportExt for T {}
110
111/// A future which sends an encoded message over the transport.
112#[must_use = "futures do nothing unless polled"]
113pub struct SendFuture<'s, T: Transport + ?Sized> {
114    sender: &'s T::Sender,
115    future_state: T::SendFutureState,
116}
117
118impl<T: NonBlockingTransport> SendFuture<'_, T> {
119    /// Completes the send operation synchronously and without blocking.
120    ///
121    /// Using this method prevents transports from applying backpressure. Prefer awaiting when
122    /// possible.
123    pub fn send_immediately(mut self) -> Result<(), T::Error> {
124        T::send_immediately(&mut self.future_state, self.sender)
125    }
126}
127
128impl<T: Transport> Future for SendFuture<'_, T> {
129    type Output = Result<(), T::Error>;
130
131    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        let this = unsafe { Pin::into_inner_unchecked(self) };
133        let future_state = unsafe { Pin::new_unchecked(&mut this.future_state) };
134        T::poll_send(future_state, cx, this.sender)
135    }
136}
137
138/// A future which receives an encoded message over the transport.
139#[must_use = "futures do nothing unless polled"]
140pub struct RecvFuture<'r, T: Transport + ?Sized> {
141    receiver: &'r mut T::Receiver,
142    future_state: T::RecvFutureState,
143}
144
145impl<T: Transport> Future for RecvFuture<'_, T> {
146    type Output = Result<Option<T::RecvBuffer>, T::Error>;
147
148    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149        let this = unsafe { Pin::into_inner_unchecked(self) };
150        let future_state = unsafe { Pin::new_unchecked(&mut this.future_state) };
151        T::poll_recv(future_state, cx, this.receiver)
152    }
153}