1use std::num::{NonZeroU32, TryFromIntError};
6use std::time::{Duration, Instant};
7use std::u64;
8
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use thiserror::Error;
11use {fidl_fuchsia_developer_ffx_speedtest as fspeedtest, fuchsia_async as fasync};
12
13pub struct Transfer {
14 pub socket: fasync::Socket,
15 pub params: TransferParams,
16}
17
18#[derive(Debug, Clone)]
19pub struct TransferParams {
20 pub data_len: NonZeroU32,
21 pub buffer_len: NonZeroU32,
22}
23
24impl TryFrom<fspeedtest::TransferParams> for TransferParams {
25 type Error = TryFromIntError;
26 fn try_from(value: fspeedtest::TransferParams) -> Result<Self, Self::Error> {
27 let fspeedtest::TransferParams { len_bytes, buffer_bytes, __source_breaking } = value;
28 Ok(Self {
29 data_len: len_bytes.unwrap_or(fspeedtest::DEFAULT_TRANSFER_SIZE).try_into()?,
30 buffer_len: buffer_bytes.unwrap_or(fspeedtest::DEFAULT_BUFFER_SIZE).try_into()?,
31 })
32 }
33}
34
35impl TryFrom<TransferParams> for fspeedtest::TransferParams {
36 type Error = TryFromIntError;
37 fn try_from(value: TransferParams) -> Result<Self, Self::Error> {
38 let TransferParams { data_len, buffer_len } = value;
39 Ok(Self {
40 len_bytes: Some(data_len.try_into()?),
41 buffer_bytes: Some(buffer_len.try_into()?),
42 __source_breaking: fidl::marker::SourceBreaking,
43 })
44 }
45}
46
47#[derive(Debug)]
48pub struct Report {
49 pub duration: Duration,
50}
51
52impl From<Report> for fspeedtest::TransferReport {
53 fn from(value: Report) -> Self {
54 let Report { duration } = value;
55 Self {
56 duration_nsec: Some(duration.as_nanos().try_into().unwrap_or(u64::MAX)),
57 __source_breaking: fidl::marker::SourceBreaking,
58 }
59 }
60}
61
62#[derive(Error, Debug)]
63#[error("missing mandatory field")]
64pub struct MissingFieldError;
65
66impl TryFrom<fspeedtest::TransferReport> for Report {
67 type Error = MissingFieldError;
68
69 fn try_from(value: fspeedtest::TransferReport) -> Result<Self, Self::Error> {
70 let fspeedtest::TransferReport { duration_nsec, __source_breaking } = value;
71 Ok(Self { duration: Duration::from_nanos(duration_nsec.ok_or(MissingFieldError)?) })
72 }
73}
74
75#[derive(Error, Debug)]
76pub enum TransferError {
77 #[error(transparent)]
78 IntConversion(#[from] TryFromIntError),
79 #[error(transparent)]
80 Io(#[from] std::io::Error),
81 #[error("remote hung up before terminating transfer")]
82 Hangup,
83}
84
85impl Transfer {
86 pub async fn send(self) -> Result<Report, TransferError> {
87 let Self { mut socket, params: TransferParams { data_len, buffer_len } } = self;
88 let buffer_len = usize::try_from(buffer_len.get())?;
89 let mut data_len = usize::try_from(data_len.get())?;
90 let buffer = vec![0xAA; buffer_len];
91 let start = Instant::now();
92 while data_len != 0 {
93 let send = buffer_len.min(data_len);
94 let written = socket.write(&buffer[..send]).await?;
95 data_len -= written;
96 }
97 let end = Instant::now();
98 Ok(Report { duration: end - start })
99 }
100
101 pub async fn receive(self) -> Result<Report, TransferError> {
102 let Self { mut socket, params: TransferParams { data_len, buffer_len } } = self;
103 let buffer_len = usize::try_from(buffer_len.get())?;
104 let mut data_len = usize::try_from(data_len.get())?;
105 let mut buffer = vec![0x00; buffer_len];
106 let start = Instant::now();
107 while data_len != 0 {
108 let recv = buffer_len.min(data_len);
109 let recv = socket.read(&mut buffer[..recv]).await?;
110 if recv == 0 {
111 return Err(TransferError::Hangup);
112 }
113 data_len -= recv;
114 }
115 let end = Instant::now();
116 Ok(Report { duration: end - start })
117 }
118}
119
120#[cfg(test)]
121mod test {
122 use super::*;
123
124 use assert_matches::assert_matches;
125
126 #[fasync::run_singlethreaded(test)]
127 async fn receive_hangup() {
128 let (socket, _) = fidl::Socket::create_stream();
129 let result = Transfer {
130 socket: fasync::Socket::from_socket(socket),
131 params: TransferParams {
132 data_len: NonZeroU32::new(10).unwrap(),
133 buffer_len: NonZeroU32::new(100).unwrap(),
134 },
135 }
136 .receive()
137 .await;
138
139 assert_matches!(result, Err(TransferError::Hangup));
140 }
141}