speedtest/
client.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::fmt::{self, Display};
6use std::num::{NonZeroU32, TryFromIntError};
7use std::time::{Duration, Instant};
8
9use flex_client::ProxyHasDomain;
10use flex_fuchsia_developer_ffx_speedtest as fspeedtest;
11use futures::TryFutureExt as _;
12use thiserror::Error;
13
14use crate::throughput::BytesFormatter;
15use crate::{Throughput, socket};
16#[cfg(feature = "fdomain")]
17pub use socket::FDomainTransferParams;
18pub use socket::TransferParams;
19
20pub struct Client {
21    proxy: fspeedtest::SpeedtestProxy,
22}
23
24#[derive(Error, Debug)]
25pub enum ClientError {
26    #[error(transparent)]
27    Fidl(#[from] fidl::Error),
28    #[error("integer conversion error {0}")]
29    Conversion(#[from] TryFromIntError),
30    #[error(transparent)]
31    Transfer(#[from] socket::TransferError),
32    #[error(transparent)]
33    Encoding(#[from] socket::MissingFieldError),
34}
35
36#[derive(Debug)]
37pub struct PingReport {
38    pub min: Duration,
39    pub avg: Duration,
40    pub max: Duration,
41}
42
43impl Display for PingReport {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        let Self { max, avg, min } = self;
46        write!(f, "min = {min:?}, avg = {avg:?}, max = {max:?}")?;
47        Ok(())
48    }
49}
50
51#[derive(Debug, Copy, Clone, Eq, PartialEq)]
52pub enum Direction {
53    Tx,
54    Rx,
55}
56
57impl Direction {
58    pub fn flip(self) -> Self {
59        match self {
60            Self::Tx => Self::Rx,
61            Self::Rx => Self::Tx,
62        }
63    }
64
65    fn local_label(&self) -> &'static str {
66        match self {
67            Direction::Tx => "sender",
68            Direction::Rx => "receiver",
69        }
70    }
71}
72
73#[derive(Debug)]
74pub struct SocketTransferParams {
75    pub direction: Direction,
76    pub params: socket::TransferParams,
77}
78
79#[derive(Debug)]
80pub struct SocketTransferReport {
81    pub direction: Direction,
82    pub client: SocketTransferReportInner,
83    pub server: SocketTransferReportInner,
84}
85
86impl Display for SocketTransferReport {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        let Self { direction, client, server } = self;
89        let local_label = direction.local_label();
90        let remote_label = direction.flip().local_label();
91        writeln!(f, "local({local_label}): {client}")?;
92        write!(f, "remote({remote_label}): {server}")?;
93        Ok(())
94    }
95}
96
97#[derive(Debug)]
98pub struct SocketTransferReportInner {
99    pub transfer_len: NonZeroU32,
100    pub duration: Duration,
101    pub throughput: Throughput,
102}
103
104impl Display for SocketTransferReportInner {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        let Self { transfer_len, duration, throughput } = self;
107        let transfer_len = BytesFormatter(transfer_len.get().into());
108        write!(f, "{transfer_len} in {duration:.1?} => {throughput}")
109    }
110}
111
112impl SocketTransferReportInner {
113    fn new(params: &socket::TransferParams, report: socket::Report) -> Self {
114        let socket::Report { duration } = report;
115        let transfer_len = params.data_len;
116        let throughput = Throughput::from_len_and_duration(transfer_len.get(), duration);
117        Self { transfer_len, duration, throughput }
118    }
119}
120
121impl Client {
122    pub async fn new(proxy: fspeedtest::SpeedtestProxy) -> Result<Self, ClientError> {
123        // Run a ping to ensure the service has started.
124        proxy.ping().await?;
125        Ok(Self { proxy })
126    }
127
128    pub async fn ping(&self, repeat: NonZeroU32) -> Result<PingReport, ClientError> {
129        let mut total = Duration::ZERO;
130        let mut max = Duration::ZERO;
131        let mut min = Duration::MAX;
132        for _ in 0..repeat.get() {
133            let start = Instant::now();
134            self.proxy.ping().await?;
135            let dur = Instant::now() - start;
136            total += dur;
137            max = max.max(dur);
138            min = min.min(dur);
139        }
140
141        Ok(PingReport { max, avg: total / repeat.get(), min })
142    }
143
144    pub async fn socket(
145        &self,
146        params: SocketTransferParams,
147    ) -> Result<SocketTransferReport, ClientError> {
148        let SocketTransferParams { direction, params } = params;
149        let (client, server) = self.proxy.domain().create_stream_socket();
150        let transfer = socket::Transfer { socket: client, params: params.clone() };
151        let (server_report, client_report) = match direction {
152            Direction::Tx => {
153                let server_fut = self
154                    .proxy
155                    .socket_down(server, &params.clone().try_into()?)
156                    .map_err(ClientError::from);
157                let client_fut = transfer.send().map_err(ClientError::from);
158                futures::future::try_join(server_fut, client_fut).await?
159            }
160            Direction::Rx => {
161                let server_fut = self
162                    .proxy
163                    .socket_up(server, &params.clone().try_into()?)
164                    .map_err(ClientError::from);
165                let client_fut = transfer.receive().map_err(ClientError::from);
166                futures::future::try_join(server_fut, client_fut).await?
167            }
168        };
169        Ok(SocketTransferReport {
170            direction,
171            client: SocketTransferReportInner::new(&params, client_report),
172            server: SocketTransferReportInner::new(&params, server_report.try_into()?),
173        })
174    }
175}