guest_cli/
vsockperf.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::platform::PlatformServices;
6use anyhow::{anyhow, Error};
7use fidl::endpoints::{create_proxy, create_request_stream};
8use fidl_fuchsia_virtualization::{
9    GuestManagerProxy, GuestMarker, GuestStatus, HostVsockAcceptorMarker, HostVsockEndpointMarker,
10};
11use futures::{select, try_join, AsyncReadExt, AsyncWriteExt, FutureExt, TryStreamExt};
12use prettytable::format::consts::FORMAT_CLEAN;
13use prettytable::{cell, row, Table};
14use std::collections::{HashMap, HashSet};
15use std::fmt;
16use std::io::Write;
17use {fuchsia_async as fasync, guest_cli_args as arguments, zx_status};
18
19const LATENCY_CHECK_SIZE_BYTES: usize = 4096;
20const THROUGHPUT_SIZE_MEBIBYTES: usize = 128;
21const THROUGHPUT_SIZE_BYTES: usize = (1 << 20) * THROUGHPUT_SIZE_MEBIBYTES;
22
23const HOST_PORT: u32 = 8500;
24const CONTROL_STREAM: u32 = 8501;
25const LATENCY_CHECK_STREAM: u32 = 8502;
26
27const SINGLE_STREAM_THROUGHPUT: u32 = 8503;
28const SINGLE_STREAM_MAGIC_NUM: u8 = 123;
29
30const MULTI_STREAM_THROUGHPUT1: u32 = 8504;
31const MULTI_STREAM_MAGIC_NUM1: u8 = 124;
32const MULTI_STREAM_THROUGHPUT2: u32 = 8505;
33const MULTI_STREAM_MAGIC_NUM2: u8 = 125;
34const MULTI_STREAM_THROUGHPUT3: u32 = 8506;
35const MULTI_STREAM_MAGIC_NUM3: u8 = 126;
36const MULTI_STREAM_THROUGHPUT4: u32 = 8507;
37const MULTI_STREAM_MAGIC_NUM4: u8 = 127;
38const MULTI_STREAM_THROUGHPUT5: u32 = 8508;
39const MULTI_STREAM_MAGIC_NUM5: u8 = 128;
40
41const SINGLE_STREAM_BIDIRECTIONAL: u32 = 8509;
42#[allow(dead_code)]
43const SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM: u8 = 129;
44
45#[derive(Clone, Copy, serde::Serialize, serde::Deserialize)]
46enum PercentileUnit {
47    Nanoseconds,
48    MebibytesPerSecond,
49}
50
51#[derive(serde::Serialize, serde::Deserialize)]
52pub struct Percentiles {
53    min: f64,
54    p_25th: f64,
55    p_50th: f64,
56    p_75th: f64,
57    p_99th: f64,
58    max: f64,
59    unit: PercentileUnit,
60}
61
62impl fmt::Display for Percentiles {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        let get_units = |val: f64, unit: PercentileUnit| -> String {
65            match unit {
66                PercentileUnit::Nanoseconds => {
67                    format!("{}ns ({:.3}ms)", val as u64, val / 1_000_000.0)
68                }
69                PercentileUnit::MebibytesPerSecond => {
70                    format!("{:.2}MiB/s", val)
71                }
72            }
73        };
74
75        let mut table = Table::new();
76        table.set_format(*FORMAT_CLEAN);
77
78        table.add_row(row!["\tMin:", get_units(self.min, self.unit)]);
79        table.add_row(row!["\t25th percentile:", get_units(self.p_25th, self.unit)]);
80        table.add_row(row!["\t50th percentile:", get_units(self.p_50th, self.unit)]);
81        table.add_row(row!["\t75th percentile:", get_units(self.p_75th, self.unit)]);
82        table.add_row(row!["\t99th percentile:", get_units(self.p_99th, self.unit)]);
83        table.add_row(row!["\tMax:", get_units(self.max, self.unit)]);
84
85        write!(f, "\n{}", table)
86    }
87}
88
89#[derive(Default, serde::Serialize, serde::Deserialize)]
90pub struct Measurements {
91    data_corruption: Option<bool>,
92    round_trip_page: Option<Percentiles>,
93    tx_throughput: Option<Percentiles>,
94    rx_throughput: Option<Percentiles>,
95    single_stream_unidirectional: Option<Percentiles>,
96    multi_stream_unidirectional: Option<Percentiles>,
97}
98
99impl fmt::Display for Measurements {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        let format_percentiles = |percentiles: &Option<Percentiles>| -> String {
102            match percentiles {
103                None => " NOT RUN".to_owned(),
104                Some(percentile) => percentile.to_string(),
105            }
106        };
107
108        writeln!(f, "\n\nMicrobenchmark Results\n------------------------")?;
109
110        writeln!(
111            f,
112            "* Data corruption check: {}",
113            match self.data_corruption {
114                None => "NOT RUN",
115                Some(result) =>
116                    if result {
117                        "PASSED"
118                    } else {
119                        "FAILED"
120                    },
121            }
122        )?;
123
124        writeln!(
125            f,
126            "* Round trip latency of {LATENCY_CHECK_SIZE_BYTES} bytes:{}",
127            format_percentiles(&self.round_trip_page)
128        )?;
129        writeln!(
130            f,
131            "* TX (guest -> host, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
132            format_percentiles(&self.tx_throughput)
133        )?;
134        writeln!(
135            f,
136            "* RX (host -> guest, unreliable) throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
137            format_percentiles(&self.rx_throughput)
138        )?;
139        writeln!(
140            f,
141            "* Single stream unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
142            format_percentiles(&self.single_stream_unidirectional)
143        )?;
144        writeln!(
145            f,
146            "* Multistream (5 connections) unidirectional round trip throughput of {THROUGHPUT_SIZE_MEBIBYTES} MiB:{}",
147            format_percentiles(&self.multi_stream_unidirectional)
148        )
149    }
150}
151
152// TODO(https://fxbug.dev/324167674): fix.
153#[allow(clippy::large_enum_variant)]
154#[derive(serde::Serialize, serde::Deserialize)]
155pub enum VsockPerfResult {
156    BenchmarkComplete(Measurements),
157    UnsupportedGuest(arguments::GuestType),
158    Internal(String),
159}
160
161impl fmt::Display for VsockPerfResult {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            VsockPerfResult::BenchmarkComplete(result) => write!(f, "{}", result),
165            VsockPerfResult::UnsupportedGuest(guest) => {
166                write!(f, "VsockPerf is not supported for '{}'. Only 'debian' is supported", guest)
167            }
168            VsockPerfResult::Internal(context) => {
169                write!(f, "Internal error: {}", context)
170            }
171        }
172    }
173}
174
175fn get_time_delta_nanos(before: fasync::MonotonicInstant, after: fasync::MonotonicInstant) -> i64 {
176    #[cfg(target_os = "fuchsia")]
177    {
178        (after - before).into_nanos()
179    }
180
181    #[cfg(not(target_os = "fuchsia"))]
182    {
183        (after - before).as_nanos().try_into().unwrap()
184    }
185}
186
187pub async fn handle_vsockperf<P: PlatformServices>(
188    services: &P,
189    args: &arguments::vsockperf_args::VsockPerfArgs,
190) -> Result<VsockPerfResult, Error> {
191    if args.guest_type != arguments::GuestType::Debian {
192        return Ok(VsockPerfResult::UnsupportedGuest(args.guest_type));
193    }
194
195    let guest_manager = services.connect_to_manager(args.guest_type).await?;
196    #[allow(clippy::large_futures)]
197    Ok(match run_micro_benchmark(guest_manager).await {
198        Err(err) => VsockPerfResult::Internal(format!("{}", err)),
199        Ok(result) => VsockPerfResult::BenchmarkComplete(result),
200    })
201}
202
203fn percentile(durations: &[u64], percentile: u8) -> u64 {
204    assert!(percentile <= 100 && !durations.is_empty());
205    // Don't bother interpolating between two points if this isn't a whole number, just floor it.
206    let location = (((percentile as f64) / 100.0) * ((durations.len() - 1) as f64)) as usize;
207    durations[location]
208}
209
210fn latency_percentile(durations: &[u64]) -> Percentiles {
211    Percentiles {
212        min: percentile(&durations, 0) as f64,
213        p_25th: percentile(&durations, 25) as f64,
214        p_50th: percentile(&durations, 50) as f64,
215        p_75th: percentile(&durations, 75) as f64,
216        p_99th: percentile(&durations, 99) as f64,
217        max: percentile(&durations, 100) as f64,
218        unit: PercentileUnit::Nanoseconds,
219    }
220}
221
222fn throughput_percentile(durations: &[u64], bytes: usize) -> Percentiles {
223    let to_mebibytes_per_second = |nanos: u64| -> f64 {
224        let seconds = nanos as f64 / (1000.0 * 1000.0 * 1000.0);
225        let bytes_per_second = (bytes as f64) / seconds;
226        bytes_per_second / (1 << 20) as f64
227    };
228
229    Percentiles {
230        min: to_mebibytes_per_second(percentile(&durations, 0)),
231        p_25th: to_mebibytes_per_second(percentile(&durations, 25)),
232        p_50th: to_mebibytes_per_second(percentile(&durations, 50)),
233        p_75th: to_mebibytes_per_second(percentile(&durations, 75)),
234        p_99th: to_mebibytes_per_second(percentile(&durations, 99)),
235        max: to_mebibytes_per_second(percentile(&durations, 100)),
236        unit: PercentileUnit::MebibytesPerSecond,
237    }
238}
239
240async fn warmup_and_data_corruption_check(socket: &mut fasync::Socket) -> Result<bool, Error> {
241    // Send and receive 100 messages, checking for a known but changing pattern.
242    let mut buffer = vec![0u8; LATENCY_CHECK_SIZE_BYTES];
243    for i in 0..100 {
244        let pattern = format!("DAVID{:0>3}", i).repeat(512);
245        let packet = pattern.as_bytes();
246        assert_eq!(packet.len(), buffer.len());
247
248        if packet.len() != socket.as_ref().write(&packet)? {
249            return Err(anyhow!("failed to write full packet"));
250        }
251
252        let timeout =
253            fasync::MonotonicInstant::now() + std::time::Duration::from_millis(100).into();
254        select! {
255            () = fasync::Timer::new(timeout).fuse() => {
256                return Err(anyhow!("warmup timed out waiting 100ms for a packet echoed"));
257            }
258            result = socket.read_exact(&mut buffer).fuse() => {
259                result.map_err(|err| anyhow!("failed to read from socket during warmup: {}", err))?;
260            }
261        }
262
263        if buffer != packet {
264            return Ok(false);
265        }
266    }
267
268    Ok(true)
269}
270
271// Get the magic numbers for a test case from the guest to know that it's ready.
272async fn wait_for_magic_numbers(
273    mut numbers: HashSet<u8>,
274    control_socket: &mut fasync::Socket,
275) -> Result<(), Error> {
276    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(5).into();
277    let mut magic_buf = [0u8];
278    loop {
279        select! {
280            () = fasync::Timer::new(timeout).fuse() => {
281                return Err(anyhow!("timeout waiting 5s to get the test ready"));
282            }
283            result = control_socket.read_exact(&mut magic_buf).fuse() => {
284                result.map_err(|err| anyhow!("failed to read magic value from socket: {}", err))?;
285                match numbers.contains(&magic_buf[0]) {
286                    false => Err(anyhow!("unexpected magic number from guest: {}", magic_buf[0])),
287                    true => {
288                        numbers.remove(&magic_buf[0]);
289                        Ok(())
290                    }
291                }?;
292
293                if numbers.is_empty() {
294                    break;
295                }
296            }
297        }
298    }
299
300    Ok(())
301}
302
303async fn read_single_stream(
304    total_size: usize,
305    socket: &mut fasync::Socket,
306) -> Result<fasync::MonotonicInstant, Error> {
307    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
308    let mut buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
309    let segments = total_size / buffer.len();
310
311    for _ in 0..segments {
312        select! {
313            () = fasync::Timer::new(timeout).fuse() => {
314                return Err(anyhow!("timeout waiting 10s for test iteration read to finish"));
315            }
316            result = socket.read_exact(&mut buffer).fuse() => {
317                result.map_err(|err| anyhow!("failed to read segment from socket: {}", err))?;
318            }
319        }
320    }
321
322    Ok(fasync::MonotonicInstant::now())
323}
324
325async fn write_single_stream(
326    total_size: usize,
327    socket: &mut fasync::Socket,
328) -> Result<fasync::MonotonicInstant, Error> {
329    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(10).into();
330    let buffer = [0u8; LATENCY_CHECK_SIZE_BYTES]; // 4 KiB
331    let segments = total_size / buffer.len();
332
333    for _ in 0..segments {
334        select! {
335            () = fasync::Timer::new(timeout).fuse() => {
336                return Err(anyhow!("timeout waiting 10s for test iteration write to finish"));
337            }
338            result = socket.write_all(&buffer).fuse() => {
339                result.map_err(
340                    |err| anyhow!("failed to write segment to socket: {}", err))?;
341            }
342        }
343    }
344
345    Ok(fasync::MonotonicInstant::now())
346}
347
348async fn write_read_high_throughput(
349    total_size: usize,
350    socket: &mut fasync::Socket,
351) -> Result<(), Error> {
352    // This is intentionally sequential to measure roundtrip throughput from the perspective of
353    // the host.
354    write_single_stream(total_size, socket).await?;
355    read_single_stream(total_size, socket).await?;
356    Ok(())
357}
358
359#[cfg(target_os = "fuchsia")]
360async fn run_single_stream_bidirectional_test(
361    mut read_socket: fasync::Socket,
362    control_socket: &mut fasync::Socket,
363    measurements: &mut Measurements,
364) -> Result<(), Error> {
365    use fidl::HandleBased;
366
367    println!("Starting single stream bidirectional round trip throughput test...");
368
369    let mut write_socket = fasync::Socket::from_socket(
370        read_socket.as_ref().duplicate_handle(fidl::Rights::SAME_RIGHTS)?,
371    );
372
373    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_BIDIRECTIONAL_MAGIC_NUM]), control_socket)
374        .await?;
375
376    let total_size = THROUGHPUT_SIZE_BYTES;
377    let mut rx_durations: Vec<u64> = Vec::new();
378    let mut tx_durations: Vec<u64> = Vec::new();
379
380    for i in 0..100 {
381        let before = fasync::MonotonicInstant::now();
382
383        let (write_finish, read_finish) = try_join!(
384            write_single_stream(total_size, &mut write_socket),
385            read_single_stream(total_size, &mut read_socket)
386        )?;
387
388        rx_durations.push(
389            get_time_delta_nanos(before, write_finish)
390                .try_into()
391                .expect("durations measured by the same thread must be greater than zero"),
392        );
393
394        tx_durations.push(
395            get_time_delta_nanos(before, read_finish)
396                .try_into()
397                .expect("durations measured by the same thread must be greater than zero"),
398        );
399
400        print!("\rFinished {} bidirectional throughput measurements", i + 1);
401        std::io::stdout().flush().expect("failed to flush stdout");
402    }
403
404    rx_durations.sort();
405    rx_durations.reverse();
406
407    tx_durations.sort();
408    tx_durations.reverse();
409
410    assert_eq!(rx_durations.len(), tx_durations.len());
411    println!("\rFinished {} bidirectional throughput measurements", rx_durations.len());
412
413    measurements.tx_throughput = Some(throughput_percentile(&tx_durations, total_size));
414    measurements.rx_throughput = Some(throughput_percentile(&rx_durations, total_size));
415
416    Ok(())
417}
418
419async fn run_single_stream_unidirectional_round_trip_test(
420    mut data_socket: fasync::Socket,
421    control_socket: &mut fasync::Socket,
422    measurements: &mut Measurements,
423) -> Result<(), Error> {
424    println!("Starting single stream unidirectional round trip throughput test...");
425
426    wait_for_magic_numbers(HashSet::from([SINGLE_STREAM_MAGIC_NUM]), control_socket).await?;
427
428    let total_size = THROUGHPUT_SIZE_BYTES;
429    let mut durations: Vec<u64> = Vec::new();
430
431    for i in 0..100 {
432        let before = fasync::MonotonicInstant::now();
433
434        write_read_high_throughput(total_size, &mut data_socket).await?;
435
436        let after = fasync::MonotonicInstant::now();
437        durations.push(
438            get_time_delta_nanos(before, after)
439                .try_into()
440                .expect("durations measured by the same thread must be greater than zero"),
441        );
442
443        print!("\rFinished {} round trip throughput measurements", i + 1);
444        std::io::stdout().flush().expect("failed to flush stdout");
445    }
446
447    durations.sort();
448    durations.reverse();
449    println!("\rFinished {} single stream round trip throughput measurements", durations.len());
450
451    measurements.single_stream_unidirectional =
452        Some(throughput_percentile(&durations, total_size * 2));
453
454    Ok(())
455}
456
457async fn run_multi_stream_unidirectional_round_trip_test(
458    mut data_socket1: fasync::Socket,
459    mut data_socket2: fasync::Socket,
460    mut data_socket3: fasync::Socket,
461    mut data_socket4: fasync::Socket,
462    mut data_socket5: fasync::Socket,
463    control_socket: &mut fasync::Socket,
464    measurements: &mut Measurements,
465) -> Result<(), Error> {
466    println!("Starting multistream unidirectional round trip throughput test...");
467
468    wait_for_magic_numbers(
469        HashSet::from([
470            MULTI_STREAM_MAGIC_NUM1,
471            MULTI_STREAM_MAGIC_NUM2,
472            MULTI_STREAM_MAGIC_NUM3,
473            MULTI_STREAM_MAGIC_NUM4,
474            MULTI_STREAM_MAGIC_NUM5,
475        ]),
476        control_socket,
477    )
478    .await?;
479
480    let total_size = THROUGHPUT_SIZE_BYTES;
481    let mut durations: Vec<u64> = Vec::new();
482
483    for i in 0..50 {
484        let before = fasync::MonotonicInstant::now();
485
486        try_join!(
487            write_read_high_throughput(total_size, &mut data_socket1),
488            write_read_high_throughput(total_size, &mut data_socket2),
489            write_read_high_throughput(total_size, &mut data_socket3),
490            write_read_high_throughput(total_size, &mut data_socket4),
491            write_read_high_throughput(total_size, &mut data_socket5)
492        )?;
493
494        let after = fasync::MonotonicInstant::now();
495        durations.push(
496            get_time_delta_nanos(before, after)
497                .try_into()
498                .expect("durations measured by the same thread must be greater than zero"),
499        );
500
501        print!("\rFinished {} multistream round trip throughput measurements", i + 1);
502        std::io::stdout().flush().expect("failed to flush stdout");
503    }
504
505    durations.sort();
506    durations.reverse();
507    println!("\rFinished {} multistream round trip throughput measurements", durations.len());
508
509    measurements.multi_stream_unidirectional =
510        Some(throughput_percentile(&durations, total_size * 2));
511
512    Ok(())
513}
514
515async fn run_latency_test(
516    mut socket: fasync::Socket,
517    measurements: &mut Measurements,
518) -> Result<(), Error> {
519    println!("Checking for data corruption...");
520    measurements.data_corruption = Some(warmup_and_data_corruption_check(&mut socket).await?);
521    println!("Finished data corruption check");
522
523    let packet = [42u8; LATENCY_CHECK_SIZE_BYTES];
524    let mut buffer = vec![0u8; packet.len()];
525    let mut latencies: Vec<u64> = Vec::new();
526
527    println!("Starting latency test...");
528    for i in 0..10000 {
529        let before = fasync::MonotonicInstant::now();
530        let timeout = before + std::time::Duration::from_millis(100).into();
531
532        if packet.len() != socket.as_ref().write(&packet)? {
533            return Err(anyhow!("failed to write full packet"));
534        }
535
536        select! {
537            () = fasync::Timer::new(timeout).fuse() => {
538                return Err(anyhow!("latency test timed out waiting 100ms for a packet echoed"));
539            }
540            result = socket.read_exact(&mut buffer).fuse() => {
541                result.map_err(
542                    |err| anyhow!("failed to read from socket during latency test: {}", err))?;
543            }
544        }
545
546        let after = fasync::MonotonicInstant::now();
547        latencies.push(
548            get_time_delta_nanos(before, after)
549                .try_into()
550                .expect("durations measured by the same thread must be greater than zero"),
551        );
552
553        if (i + 1) % 50 == 0 {
554            print!("\rFinished measuring round trip latency for {} packets", i + 1);
555            std::io::stdout().flush().expect("failed to flush stdout");
556        }
557    }
558
559    latencies.sort();
560    println!("\rFinished measuring round trip latency for {} packets", latencies.len());
561
562    measurements.round_trip_page = Some(latency_percentile(&latencies));
563
564    Ok(())
565}
566
567async fn run_micro_benchmark(guest_manager: GuestManagerProxy) -> Result<Measurements, Error> {
568    let guest_info = guest_manager.get_info().await?;
569    if guest_info.guest_status.unwrap() != GuestStatus::Running {
570        return Err(anyhow!(zx_status::Status::NOT_CONNECTED));
571    }
572
573    let (guest_endpoint, guest_server_end) = create_proxy::<GuestMarker>();
574    guest_manager
575        .connect(guest_server_end)
576        .await
577        .map_err(|err| anyhow!("failed to get a connect response: {}", err))?
578        .map_err(|err| anyhow!("connect failed with: {:?}", err))?;
579
580    let (vsock_endpoint, vsock_server_end) = create_proxy::<HostVsockEndpointMarker>();
581    guest_endpoint
582        .get_host_vsock_endpoint(vsock_server_end)
583        .await?
584        .map_err(|err| anyhow!("failed to get HostVsockEndpoint: {:?}", err))?;
585
586    let (acceptor, mut client_stream) = create_request_stream::<HostVsockAcceptorMarker>();
587    vsock_endpoint
588        .listen(HOST_PORT, acceptor)
589        .await
590        .map_err(|err| anyhow!("failed to get a listen response: {}", err))?
591        .map_err(|err| anyhow!("listen failed with: {}", zx_status::Status::from_raw(err)))?;
592
593    let socket = guest_endpoint
594        .get_console()
595        .await
596        .map_err(|err| anyhow!("failed to get a get_console response: {}", err))?
597        .map_err(|err| anyhow!("get_console failed with: {:?}", err))?;
598
599    // Start the micro benchmark utility on the guest which will begin by opening the necessary
600    // connections.
601    let command = b"../test_utils/virtio_vsock_test_util micro_benchmark\n";
602    let bytes_written = socket
603        .write(command)
604        .map_err(|err| anyhow!("failed to write command to socket: {}", err))?;
605    if bytes_written != command.len() {
606        return Err(anyhow!(
607            "attempted to send command '{}', but only managed to write '{}'",
608            std::str::from_utf8(command).expect("failed to parse as utf-8"),
609            std::str::from_utf8(&command[0..bytes_written]).expect("failed to parse as utf-8")
610        ));
611    }
612
613    let mut expected_connections = HashSet::from([
614        CONTROL_STREAM,
615        LATENCY_CHECK_STREAM,
616        SINGLE_STREAM_THROUGHPUT,
617        MULTI_STREAM_THROUGHPUT1,
618        MULTI_STREAM_THROUGHPUT2,
619        MULTI_STREAM_THROUGHPUT3,
620        MULTI_STREAM_THROUGHPUT4,
621        MULTI_STREAM_THROUGHPUT5,
622        SINGLE_STREAM_BIDIRECTIONAL,
623    ]);
624    let mut active_connections = HashMap::new();
625
626    // Give the utility 15s to open all the expected connections.
627    let timeout = fasync::MonotonicInstant::now() + std::time::Duration::from_secs(15).into();
628    loop {
629        select! {
630            () = fasync::Timer::new(timeout).fuse() => {
631                return Err(anyhow!("vsockperf timed out waiting 15s for vsock connections"));
632            }
633            request = client_stream.try_next() => {
634                let request = request
635                    .map_err(|err| anyhow!("failed to get acceptor request: {}", err))?
636                    .ok_or_else(|| anyhow!("unexpected end of Listener stream"))?;
637                let (_src_cid, src_port, _port, responder) = request
638                    .into_accept().ok_or_else(|| anyhow!("failed to parse message as Accept"))?;
639
640                match expected_connections.contains(&src_port) {
641                    false => Err(anyhow!("unexpected connection from guest port: {}", src_port)),
642                    true => {
643                        expected_connections.remove(&src_port);
644                        Ok(())
645                    }
646                }?;
647
648                let (client_socket, device_socket) = fidl::Socket::create_stream();
649                let client_socket = fasync::Socket::from_socket(client_socket);
650
651                responder.send(Ok(device_socket))
652                    .map_err(|err| anyhow!("failed to send response to device: {}", err))?;
653
654                if let Some(_) = active_connections.insert(src_port, client_socket) {
655                    panic!("Connections must be unique");
656                }
657
658                if expected_connections.is_empty() {
659                    break;
660                }
661            }
662        }
663    }
664
665    let mut measurements = Measurements::default();
666
667    run_latency_test(
668        active_connections.remove(&LATENCY_CHECK_STREAM).expect("socket should exist"),
669        &mut measurements,
670    )
671    .await?;
672
673    // TODO(https://fxbug.dev/42068091): Re-enable when overnet supports duplicated socket handles.
674    #[cfg(target_os = "fuchsia")]
675    run_single_stream_bidirectional_test(
676        active_connections.remove(&SINGLE_STREAM_BIDIRECTIONAL).expect("socket should exist"),
677        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
678        &mut measurements,
679    )
680    .await?;
681
682    run_single_stream_unidirectional_round_trip_test(
683        active_connections.remove(&SINGLE_STREAM_THROUGHPUT).expect("socket should exist"),
684        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
685        &mut measurements,
686    )
687    .await?;
688
689    #[allow(clippy::large_futures)]
690    run_multi_stream_unidirectional_round_trip_test(
691        active_connections.remove(&MULTI_STREAM_THROUGHPUT1).expect("socket should exist"),
692        active_connections.remove(&MULTI_STREAM_THROUGHPUT2).expect("socket should exist"),
693        active_connections.remove(&MULTI_STREAM_THROUGHPUT3).expect("socket should exist"),
694        active_connections.remove(&MULTI_STREAM_THROUGHPUT4).expect("socket should exist"),
695        active_connections.remove(&MULTI_STREAM_THROUGHPUT5).expect("socket should exist"),
696        active_connections.get_mut(&CONTROL_STREAM).expect("socket should exist"),
697        &mut measurements,
698    )
699    .await?;
700
701    return Ok(measurements);
702}