1use crate::platform::PlatformServices;
6use anyhow::{anyhow, Error};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9 GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use futures::{select, try_join, AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt};
12use prettytable::format::consts::FORMAT_CLEAN;
13use prettytable::{cell, row, Table};
14use std::collections::{HashMap, HashSet};
15use std::fmt;
16use std::io::Write;
17use {fuchsia_async as fasync, guest_cli_args as arguments, zx_status};
18
19const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
20const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
21const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
22
23const HOST_PORT: u32 = 8500;
24const CONTROL_STREAM: u32 = 8501;
25const LATENCY_CHECK_STREAM: u32 = 8502;
26
27const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
28const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
29
30const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
31const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
32const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
33const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
34const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
35const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
36const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
37const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
38const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
39const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
40
41const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
42#[allow(dead_code)]
43const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
44
45#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
46enum PercentileUnit {
47 Nanoseconds,
48 MebibytesPerSecond,
49}
50
51#[derive(serde::Serialize, serde::Deserialize)]
52pub struct Percentiles {
53 min: f64,
54 p_25th: f64,
55 p_50th: f64,
56 p_75th: f64,
57 p_99th: f64,
58 max: f64,
59 unit: PercentileUnit,
60}
61
62impl fmt::Display for Percentiles {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 let get_units = |val: f64, unit: PercentileUnit| -> String {
65 match unit {
66 PercentileUnit::Nanoseconds => {
67 format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
68 }
69 PercentileUnit::MebibytesPerSecond => {
70 format!("{:.2}MiB/s", val)
71 }
72 }
73 };
74
75 let mut table = Table::new();
76 table.set_format(*FORMAT_CLEAN);
77
78 table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
79 table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
80 table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
81 table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
82 table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
83 table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
84
85 write!(f, "\n{}", table)
86 }
87}
88
89#[derive(Default, serde::Serialize, serde::Deserialize)]
90pub struct Measurements {
91 data_corruption: Option<bool>,
92 round_trip_page: Option<Percentiles>,
93 tx_throughput: Option<Percentiles>,
94 rx_throughput: Option<Percentiles>,
95 single_stream_unidirectional: Option<Percentiles>,
96 multi_stream_unidirectional: Option<Percentiles>,
97}
98
99impl fmt::Display for Measurements {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
102 match percentiles {
103 None => " NOT RUN".to_owned(),
104 Some(percentile) => percentile.to_string(),
105 }
106 };
107
108 writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
109
110 writeln!(
111 f,
112 "* Data corruption check: {}",
113 match self.data_corruption {
114 None => "NOT RUN",
115 Some(result) =>
116 if result {
117 "PASSED"
118 } else {
119 "FAILED"
120 },
121 }
122 )?;
123
124 writeln!(
125 f,
126 "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
127 format_percentiles(&self.round_trip_page)
128 )?;
129 writeln!(
130 f,
131 "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
132 format_percentiles(&self.tx_throughput)
133 )?;
134 writeln!(
135 f,
136 "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
137 format_percentiles(&self.rx_throughput)
138 )?;
139 writeln!(
140 f,
141 "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
142 format_percentiles(&self.single_stream_unidirectional)
143 )?;
144 writeln!(
145 f,
146 "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
147 format_percentiles(&self.multi_stream_unidirectional)
148 )
149 }
150}
151
152#[allow(clippy::large_enum_variant)]
154#[derive(serde::Serialize, serde::Deserialize)]
155pub enum VsockPerfResult {
156 BenchmarkComplete(Measurements),
157 UnsupportedGuest(arguments::GuestType),
158 Internal(String),
159}
160
161impl fmt::Display for VsockPerfResult {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 match self {
164 VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
165 VsockPerfResult::UnsupportedGuest(guest) => {
166 write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
167 }
168 VsockPerfResult::Internal(context) => {
169 write!(f, "Internal error: {}", context)
170 }
171 }
172 }
173}
174
175fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
176 #[cfg(target_os = "fuchsia")]
177 {
178 (after - before).into_nanos()
179 }
180
181 #[cfg(not(target_os = "fuchsia"))]
182 {
183 (after - before).as_nanos().try_into().unwrap()
184 }
185}
186
187pub async fn handle_vsockperf<P: PlatformServices>(
188 services: &P,
189 args: &arguments::vsockperf_args::VsockPerfArgs,
190) -> Result<VsockPerfResult, Error> {
191 if args.guest_type != arguments::GuestType::Debian {
192 return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
193 }
194
195 let guest_manager = services.connect_to_manager(args.guest_type).await?;
196 #[allow(clippy::large_futures)]
197 Ok(match run_micro_benchmark(guest_manager).await {
198 Err(err) => VsockPerfResult::Internal(format!("{}", err)),
199 Ok(result) => VsockPerfResult::BenchmarkComplete(result),
200 })
201}
202
203fn percentile(durations: &[u64], percentile: u8) -> u64 {
204 assert!(percentile <= 100 && !durations.is_empty());
205 let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
207 durations[location]
208}
209
210fn latency_percentile(durations: &[u64]) -> Percentiles {
211 Percentiles {
212 min: percentile(&durations, 0) as f64,
213 p_25th: percentile(&durations, 25) as f64,
214 p_50th: percentile(&durations, 50) as f64,
215 p_75th: percentile(&durations, 75) as f64,
216 p_99th: percentile(&durations, 99) as f64,
217 max: percentile(&durations, 100) as f64,
218 unit: PercentileUnit::Nanoseconds,
219 }
220}
221
222fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
223 let to_mebibytes_per_second = |nanos: u64| -> f64 {
224 let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
225 let bytes_per_second = (bytes as f64) / seconds;
226 bytes_per_second / (1 << 20) as f64
227 };
228
229 Percentiles {
230 min: to_mebibytes_per_second(percentile(&durations, 0)),
231 p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
232 p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
233 p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
234 p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
235 max: to_mebibytes_per_second(percentile(&durations, 100)),
236 unit: PercentileUnit::MebibytesPerSecond,
237 }
238}
239
240async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
241 let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
243 for i in 0..100 {
244 let pattern = format!("DAVID{:0>3}", i).repeat(512);
245 let packet = pattern.as_bytes();
246 assert_eq!(packet.len(), buffer.len());
247
248 if packet.len() != socket.as_ref().write(&packet)? {
249 return Err(anyhow!("failed to write full packet"));
250 }
251
252 let timeout =
253 fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
254 select! {
255 () = fasync::Timer::new(timeout).fuse() => {
256 return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
257 }
258 result = socket.read_exact(&mut buffer).fuse() => {
259 result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
260 }
261 }
262
263 if buffer != packet {
264 return Ok(false);
265 }
266 }
267
268 Ok(true)
269}
270
271async fn wait_for_magic_numbers(
273 mut numbers: HashSet<u8>,
274 control_socket: &mut fasync::Socket,
275) -> Result<(), Error> {
276 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
277 let mut magic_buf = [0u8];
278 loop {
279 select! {
280 () = fasync::Timer::new(timeout).fuse() => {
281 return Err(anyhow!("timeout waiting 5s to get the test ready"));
282 }
283 result = control_socket.read_exact(&mut magic_buf).fuse() => {
284 result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
285 match numbers.contains(&magic_buf[0]) {
286 false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
287 true => {
288 numbers.remove(&magic_buf[0]);
289 Ok(())
290 }
291 }?;
292
293 if numbers.is_empty() {
294 break;
295 }
296 }
297 }
298 }
299
300 Ok(())
301}
302
303async fn read_single_stream(
304 total_size: usize,
305 socket: &mut fasync::Socket,
306) -> Result<fasync::MonotonicInstant, Error> {
307 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
308 let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
310
311 for _ in 0..segments {
312 select! {
313 () = fasync::Timer::new(timeout).fuse() => {
314 return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
315 }
316 result = socket.read_exact(&mut buffer).fuse() => {
317 result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
318 }
319 }
320 }
321
322 Ok(fasync::MonotonicInstant::now())
323}
324
325async fn write_single_stream(
326 total_size: usize,
327 socket: &mut fasync::Socket,
328) -> Result<fasync::MonotonicInstant, Error> {
329 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
330 let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; let segments = total_size / buffer.len();
332
333 for _ in 0..segments {
334 select! {
335 () = fasync::Timer::new(timeout).fuse() => {
336 return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
337 }
338 result = socket.write_all(&buffer).fuse() => {
339 result.map_err(
340 |err| anyhow!("failed to write segment to socket: {}", err))?;
341 }
342 }
343 }
344
345 Ok(fasync::MonotonicInstant::now())
346}
347
348async fn write_read_high_throughput(
349 total_size: usize,
350 socket: &mut fasync::Socket,
351) -> Result<(), Error> {
352 write_single_stream(total_size, socket).await?;
355 read_single_stream(total_size, socket).await?;
356 Ok(())
357}
358
359#[cfg(target_os = "fuchsia")]
360async fn run_single_stream_bidirectional_test(
361 mut read_socket: fasync::Socket,
362 control_socket: &mut fasync::Socket,
363 measurements: &mut Measurements,
364) -> Result<(), Error> {
365 use fidl::HandleBased;
366
367 println!("Starting single stream bidirectional round trip throughput test...");
368
369 let mut write_socket = fasync::Socket::from_socket(
370 read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
371 );
372
373 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
374 .await?;
375
376 let total_size = THROUGHPUT_SIZE_BYTES;
377 let mut rx_durations: Vec<u64> = Vec::new();
378 let mut tx_durations: Vec<u64> = Vec::new();
379
380 for i in 0..100 {
381 let before = fasync::MonotonicInstant::now();
382
383 let (write_finish, read_finish) = try_join!(
384 write_single_stream(total_size, &mut write_socket),
385 read_single_stream(total_size, &mut read_socket)
386 )?;
387
388 rx_durations.push(
389 get_time_delta_nanos(before, write_finish)
390 .try_into()
391 .expect("durations measured by the same thread must be greater than zero"),
392 );
393
394 tx_durations.push(
395 get_time_delta_nanos(before, read_finish)
396 .try_into()
397 .expect("durations measured by the same thread must be greater than zero"),
398 );
399
400 print!("\rFinished {} bidirectional throughput measurements", i + 1);
401 std::io::stdout().flush().expect("failed to flush stdout");
402 }
403
404 rx_durations.sort();
405 rx_durations.reverse();
406
407 tx_durations.sort();
408 tx_durations.reverse();
409
410 assert_eq!(rx_durations.len(), tx_durations.len());
411 println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
412
413 measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
414 measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
415
416 Ok(())
417}
418
419async fn run_single_stream_unidirectional_round_trip_test(
420 mut data_socket: fasync::Socket,
421 control_socket: &mut fasync::Socket,
422 measurements: &mut Measurements,
423) -> Result<(), Error> {
424 println!("Starting single stream unidirectional round trip throughput test...");
425
426 wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
427
428 let total_size = THROUGHPUT_SIZE_BYTES;
429 let mut durations: Vec<u64> = Vec::new();
430
431 for i in 0..100 {
432 let before = fasync::MonotonicInstant::now();
433
434 write_read_high_throughput(total_size, &mut data_socket).await?;
435
436 let after = fasync::MonotonicInstant::now();
437 durations.push(
438 get_time_delta_nanos(before, after)
439 .try_into()
440 .expect("durations measured by the same thread must be greater than zero"),
441 );
442
443 print!("\rFinished {} round trip throughput measurements", i + 1);
444 std::io::stdout().flush().expect("failed to flush stdout");
445 }
446
447 durations.sort();
448 durations.reverse();
449 println!("\rFinished {} single stream round trip throughput measurements", durations.len());
450
451 measurements.single_stream_unidirectional =
452 Some(throughput_percentile(&durations, total_size * 2));
453
454 Ok(())
455}
456
457async fn run_multi_stream_unidirectional_round_trip_test(
458 mut data_socket1: fasync::Socket,
459 mut data_socket2: fasync::Socket,
460 mut data_socket3: fasync::Socket,
461 mut data_socket4: fasync::Socket,
462 mut data_socket5: fasync::Socket,
463 control_socket: &mut fasync::Socket,
464 measurements: &mut Measurements,
465) -> Result<(), Error> {
466 println!("Starting multistream unidirectional round trip throughput test...");
467
468 wait_for_magic_numbers(
469 HashSet::from([
470 MULTI_STREAM_MAGIC_NUM1,
471 MULTI_STREAM_MAGIC_NUM2,
472 MULTI_STREAM_MAGIC_NUM3,
473 MULTI_STREAM_MAGIC_NUM4,
474 MULTI_STREAM_MAGIC_NUM5,
475 ]),
476 control_socket,
477 )
478 .await?;
479
480 let total_size = THROUGHPUT_SIZE_BYTES;
481 let mut durations: Vec<u64> = Vec::new();
482
483 for i in 0..50 {
484 let before = fasync::MonotonicInstant::now();
485
486 try_join!(
487 write_read_high_throughput(total_size, &mut data_socket1),
488 write_read_high_throughput(total_size, &mut data_socket2),
489 write_read_high_throughput(total_size, &mut data_socket3),
490 write_read_high_throughput(total_size, &mut data_socket4),
491 write_read_high_throughput(total_size, &mut data_socket5)
492 )?;
493
494 let after = fasync::MonotonicInstant::now();
495 durations.push(
496 get_time_delta_nanos(before, after)
497 .try_into()
498 .expect("durations measured by the same thread must be greater than zero"),
499 );
500
501 print!("\rFinished {} multistream round trip throughput measurements", i + 1);
502 std::io::stdout().flush().expect("failed to flush stdout");
503 }
504
505 durations.sort();
506 durations.reverse();
507 println!("\rFinished {} multistream round trip throughput measurements", durations.len());
508
509 measurements.multi_stream_unidirectional =
510 Some(throughput_percentile(&durations, total_size * 2));
511
512 Ok(())
513}
514
515async fn run_latency_test(
516 mut socket: fasync::Socket,
517 measurements: &mut Measurements,
518) -> Result<(), Error> {
519 println!("Checking for data corruption...");
520 measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
521 println!("Finished data corruption check");
522
523 let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
524 let mut buffer = vec![0u8; packet.len()];
525 let mut latencies: Vec<u64> = Vec::new();
526
527 println!("Starting latency test...");
528 for i in 0..10000 {
529 let before = fasync::MonotonicInstant::now();
530 let timeout = before + std::time::Duration::from_millis(100).into();
531
532 if packet.len() != socket.as_ref().write(&packet)? {
533 return Err(anyhow!("failed to write full packet"));
534 }
535
536 select! {
537 () = fasync::Timer::new(timeout).fuse() => {
538 return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
539 }
540 result = socket.read_exact(&mut buffer).fuse() => {
541 result.map_err(
542 |err| anyhow!("failed to read from socket during latency test: {}", err))?;
543 }
544 }
545
546 let after = fasync::MonotonicInstant::now();
547 latencies.push(
548 get_time_delta_nanos(before, after)
549 .try_into()
550 .expect("durations measured by the same thread must be greater than zero"),
551 );
552
553 if (i + 1) % 50 == 0 {
554 print!("\rFinished measuring round trip latency for {} packets", i + 1);
555 std::io::stdout().flush().expect("failed to flush stdout");
556 }
557 }
558
559 latencies.sort();
560 println!("\rFinished measuring round trip latency for {} packets", latencies.len());
561
562 measurements.round_trip_page = Some(latency_percentile(&latencies));
563
564 Ok(())
565}
566
567async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
568 let guest_info = guest_manager.get_info().await?;
569 if guest_info.guest_status.unwrap() != GuestStatus::Running {
570 return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
571 }
572
573 let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
574 guest_manager
575 .connect(guest_server_end)
576 .await
577 .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
578 .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
579
580 let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
581 guest_endpoint
582 .get_host_vsock_endpoint(vsock_server_end)
583 .await?
584 .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
585
586 let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
587 vsock_endpoint
588 .listen(HOST_PORT, acceptor)
589 .await
590 .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
591 .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
592
593 let socket = guest_endpoint
594 .get_console()
595 .await
596 .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
597 .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
598
599 let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
602 let bytes_written = socket
603 .write(command)
604 .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
605 if bytes_written != command.len() {
606 return Err(anyhow!(
607 "attempted to send command '{}', but only managed to write '{}'",
608 std::str::from_utf8(command).expect("failed to parse as utf-8"),
609 std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
610 ));
611 }
612
613 let mut expected_connections = HashSet::from([
614 CONTROL_STREAM,
615 LATENCY_CHECK_STREAM,
616 SINGLE_STREAM_THROUGHPUT,
617 MULTI_STREAM_THROUGHPUT1,
618 MULTI_STREAM_THROUGHPUT2,
619 MULTI_STREAM_THROUGHPUT3,
620 MULTI_STREAM_THROUGHPUT4,
621 MULTI_STREAM_THROUGHPUT5,
622 SINGLE_STREAM_BIDIRECTIONAL,
623 ]);
624 let mut active_connections = HashMap::new();
625
626 let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
628 loop {
629 select! {
630 () = fasync::Timer::new(timeout).fuse() => {
631 return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
632 }
633 request = client_stream.try_next() => {
634 let request = request
635 .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
636 .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
637 let (_src_cid, src_port, _port, responder) = request
638 .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
639
640 match expected_connections.contains(&src_port) {
641 false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
642 true => {
643 expected_connections.remove(&src_port);
644 Ok(())
645 }
646 }?;
647
648 let (client_socket, device_socket) = fidl::Socket::create_stream();
649 let client_socket = fasync::Socket::from_socket(client_socket);
650
651 responder.send(Ok(device_socket))
652 .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
653
654 if let Some(_) = active_connections.insert(src_port, client_socket) {
655 panic!("Connections must be unique");
656 }
657
658 if expected_connections.is_empty() {
659 break;
660 }
661 }
662 }
663 }
664
665 let mut measurements = Measurements::default();
666
667 run_latency_test(
668 active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
669 &mut measurements,
670 )
671 .await?;
672
673 #[cfg(target_os = "fuchsia")]
675 run_single_stream_bidirectional_test(
676 active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
677 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
678 &mut measurements,
679 )
680 .await?;
681
682 run_single_stream_unidirectional_round_trip_test(
683 active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
684 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
685 &mut measurements,
686 )
687 .await?;
688
689 #[allow(clippy::large_futures)]
690 run_multi_stream_unidirectional_round_trip_test(
691 active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
692 active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
693 active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
694 active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
695 active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
696 active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
697 &mut measurements,
698 )
699 .await?;
700
701 return Ok(measurements);
702}