speedtest/
socket.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::num::{NonZeroU32, TryFromIntError};
6use std::time::{Duration, Instant};
7use std::u64;
8
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use thiserror::Error;
11use {fidl_fuchsia_developer_ffx_speedtest as fspeedtest, fuchsia_async as fasync};
12
13pub struct Transfer {
14    pub socket: fasync::Socket,
15    pub params: TransferParams,
16}
17
18#[derive(Debug, Clone)]
19pub struct TransferParams {
20    pub data_len: NonZeroU32,
21    pub buffer_len: NonZeroU32,
22}
23
24impl TryFrom<fspeedtest::TransferParams> for TransferParams {
25    type Error = TryFromIntError;
26    fn try_from(value: fspeedtest::TransferParams) -> Result<Self, Self::Error> {
27        let fspeedtest::TransferParams { len_bytes, buffer_bytes, __source_breaking } = value;
28        Ok(Self {
29            data_len: len_bytes.unwrap_or(fspeedtest::DEFAULT_TRANSFER_SIZE).try_into()?,
30            buffer_len: buffer_bytes.unwrap_or(fspeedtest::DEFAULT_BUFFER_SIZE).try_into()?,
31        })
32    }
33}
34
35impl TryFrom<TransferParams> for fspeedtest::TransferParams {
36    type Error = TryFromIntError;
37    fn try_from(value: TransferParams) -> Result<Self, Self::Error> {
38        let TransferParams { data_len, buffer_len } = value;
39        Ok(Self {
40            len_bytes: Some(data_len.try_into()?),
41            buffer_bytes: Some(buffer_len.try_into()?),
42            __source_breaking: fidl::marker::SourceBreaking,
43        })
44    }
45}
46
47#[derive(Debug)]
48pub struct Report {
49    pub duration: Duration,
50}
51
52impl From<Report> for fspeedtest::TransferReport {
53    fn from(value: Report) -> Self {
54        let Report { duration } = value;
55        Self {
56            duration_nsec: Some(duration.as_nanos().try_into().unwrap_or(u64::MAX)),
57            __source_breaking: fidl::marker::SourceBreaking,
58        }
59    }
60}
61
62#[derive(Error, Debug)]
63#[error("missing mandatory field")]
64pub struct MissingFieldError;
65
66impl TryFrom<fspeedtest::TransferReport> for Report {
67    type Error = MissingFieldError;
68
69    fn try_from(value: fspeedtest::TransferReport) -> Result<Self, Self::Error> {
70        let fspeedtest::TransferReport { duration_nsec, __source_breaking } = value;
71        Ok(Self { duration: Duration::from_nanos(duration_nsec.ok_or(MissingFieldError)?) })
72    }
73}
74
75#[derive(Error, Debug)]
76pub enum TransferError {
77    #[error(transparent)]
78    IntConversion(#[from] TryFromIntError),
79    #[error(transparent)]
80    Io(#[from] std::io::Error),
81    #[error("remote hung up before terminating transfer")]
82    Hangup,
83}
84
85impl Transfer {
86    pub async fn send(self) -> Result<Report, TransferError> {
87        let Self { mut socket, params: TransferParams { data_len, buffer_len } } = self;
88        let buffer_len = usize::try_from(buffer_len.get())?;
89        let mut data_len = usize::try_from(data_len.get())?;
90        let buffer = vec![0xAA; buffer_len];
91        let start = Instant::now();
92        while data_len != 0 {
93            let send = buffer_len.min(data_len);
94            let written = socket.write(&buffer[..send]).await?;
95            data_len -= written;
96        }
97        let end = Instant::now();
98        Ok(Report { duration: end - start })
99    }
100
101    pub async fn receive(self) -> Result<Report, TransferError> {
102        let Self { mut socket, params: TransferParams { data_len, buffer_len } } = self;
103        let buffer_len = usize::try_from(buffer_len.get())?;
104        let mut data_len = usize::try_from(data_len.get())?;
105        let mut buffer = vec![0x00; buffer_len];
106        let start = Instant::now();
107        while data_len != 0 {
108            let recv = buffer_len.min(data_len);
109            let recv = socket.read(&mut buffer[..recv]).await?;
110            if recv == 0 {
111                return Err(TransferError::Hangup);
112            }
113            data_len -= recv;
114        }
115        let end = Instant::now();
116        Ok(Report { duration: end - start })
117    }
118}
119
120#[cfg(test)]
121mod test {
122    use super::*;
123
124    use assert_matches::assert_matches;
125
126    #[fasync::run_singlethreaded(test)]
127    async fn receive_hangup() {
128        let (socket, _) = fidl::Socket::create_stream();
129        let result = Transfer {
130            socket: fasync::Socket::from_socket(socket),
131            params: TransferParams {
132                data_len: NonZeroU32::new(10).unwrap(),
133                buffer_len: NonZeroU32::new(100).unwrap(),
134            },
135        }
136        .receive()
137        .await;
138
139        assert_matches!(result, Err(TransferError::Hangup));
140    }
141}