1use std::fmt::{self, Display};
6use std::num::{NonZeroU32, TryFromIntError};
7use std::time::{Duration, Instant};
8
9use futures::TryFutureExt as _;
10use thiserror::Error;
11use {fidl_fuchsia_developer_ffx_speedtest as fspeedtest, fuchsia_async as fasync};
12
13use crate::throughput::BytesFormatter;
14use crate::{socket, Throughput};
15pub use socket::TransferParams;
16
17pub struct Client {
18 proxy: fspeedtest::SpeedtestProxy,
19}
20
21#[derive(Error, Debug)]
22pub enum ClientError {
23 #[error(transparent)]
24 Fidl(#[from] fidl::Error),
25 #[error("integer conversion error {0}")]
26 Conversion(#[from] TryFromIntError),
27 #[error(transparent)]
28 Transfer(#[from] socket::TransferError),
29 #[error(transparent)]
30 Encoding(#[from] socket::MissingFieldError),
31}
32
33#[derive(Debug)]
34pub struct PingReport {
35 pub min: Duration,
36 pub avg: Duration,
37 pub max: Duration,
38}
39
40impl Display for PingReport {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 let Self { max, avg, min } = self;
43 write!(f, "min = {min:?}, avg = {avg:?}, max = {max:?}")?;
44 Ok(())
45 }
46}
47
48#[derive(Debug, Copy, Clone, Eq, PartialEq)]
49pub enum Direction {
50 Tx,
51 Rx,
52}
53
54impl Direction {
55 pub fn flip(self) -> Self {
56 match self {
57 Self::Tx => Self::Rx,
58 Self::Rx => Self::Tx,
59 }
60 }
61
62 fn local_label(&self) -> &'static str {
63 match self {
64 Direction::Tx => "sender",
65 Direction::Rx => "receiver",
66 }
67 }
68}
69
70#[derive(Debug)]
71pub struct SocketTransferParams {
72 pub direction: Direction,
73 pub params: socket::TransferParams,
74}
75
76#[derive(Debug)]
77pub struct SocketTransferReport {
78 pub direction: Direction,
79 pub client: SocketTransferReportInner,
80 pub server: SocketTransferReportInner,
81}
82
83impl Display for SocketTransferReport {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 let Self { direction, client, server } = self;
86 let local_label = direction.local_label();
87 let remote_label = direction.flip().local_label();
88 writeln!(f, "local({local_label}): {client}")?;
89 write!(f, "remote({remote_label}): {server}")?;
90 Ok(())
91 }
92}
93
94#[derive(Debug)]
95pub struct SocketTransferReportInner {
96 pub transfer_len: NonZeroU32,
97 pub duration: Duration,
98 pub throughput: Throughput,
99}
100
101impl Display for SocketTransferReportInner {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 let Self { transfer_len, duration, throughput } = self;
104 let transfer_len = BytesFormatter(transfer_len.get().into());
105 write!(f, "{transfer_len} in {duration:.1?} => {throughput}")
106 }
107}
108
109impl SocketTransferReportInner {
110 fn new(params: &socket::TransferParams, report: socket::Report) -> Self {
111 let socket::Report { duration } = report;
112 let transfer_len = params.data_len;
113 let throughput = Throughput::from_len_and_duration(transfer_len.get(), duration);
114 Self { transfer_len, duration, throughput }
115 }
116}
117
118impl Client {
119 pub async fn new(proxy: fspeedtest::SpeedtestProxy) -> Result<Self, ClientError> {
120 proxy.ping().await?;
122 Ok(Self { proxy })
123 }
124
125 pub async fn ping(&self, repeat: NonZeroU32) -> Result<PingReport, ClientError> {
126 let mut total = Duration::ZERO;
127 let mut max = Duration::ZERO;
128 let mut min = Duration::MAX;
129 for _ in 0..repeat.get() {
130 let start = Instant::now();
131 self.proxy.ping().await?;
132 let dur = Instant::now() - start;
133 total += dur;
134 max = max.max(dur);
135 min = min.min(dur);
136 }
137
138 Ok(PingReport { max, avg: total / repeat.get(), min })
139 }
140
141 pub async fn socket(
142 &self,
143 params: SocketTransferParams,
144 ) -> Result<SocketTransferReport, ClientError> {
145 let SocketTransferParams { direction, params } = params;
146 let (client, server) = fidl::Socket::create_stream();
147 let client = fasync::Socket::from_socket(client);
148 let transfer = socket::Transfer { socket: client, params: params.clone() };
149 let (server_report, client_report) = match direction {
150 Direction::Tx => {
151 let server_fut = self
152 .proxy
153 .socket_down(server, ¶ms.clone().try_into()?)
154 .map_err(ClientError::from);
155 let client_fut = transfer.send().map_err(ClientError::from);
156 futures::future::try_join(server_fut, client_fut).await?
157 }
158 Direction::Rx => {
159 let server_fut = self
160 .proxy
161 .socket_up(server, ¶ms.clone().try_into()?)
162 .map_err(ClientError::from);
163 let client_fut = transfer.receive().map_err(ClientError::from);
164 futures::future::try_join(server_fut, client_fut).await?
165 }
166 };
167 Ok(SocketTransferReport {
168 direction,
169 client: SocketTransferReportInner::new(¶ms, client_report),
170 server: SocketTransferReportInner::new(¶ms, server_report.try_into()?),
171 })
172 }
173}