usb_vsock/
packet.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18/// The serializable enumeration of packet types that can be used over a usb vsock link. These
19/// roughly correspond to the state machine described by the `fuchsia.hardware.vsock` fidl library.
20#[repr(u8)]
21#[derive(
22    Debug,
23    TryFromBytes,
24    IntoBytes,
25    KnownLayout,
26    Immutable,
27    Unaligned,
28    PartialEq,
29    Eq,
30    PartialOrd,
31    Ord,
32    Hash,
33    Clone,
34    Copy,
35)]
36pub enum PacketType {
37    /// Synchronizes the connection between host and device. Each side must send and receive a
38    /// sync packet with the same payload before any other packet may be recognized on the usb
39    /// connection. If this packet is received mid-stream, all connections must be considered
40    /// reset to avoid data loss. It should also only ever be the last vsock packet in a given
41    /// usb packet.
42    Sync = b'S',
43    /// An outbound echo request. The other end should reply to this with the same body and all the
44    /// same fields in a [`PacketType::EchoReply`] packet, no matter the state of the connection.
45    Echo = b'E',
46    /// A reply to a [`PacketType::Echo`] request packet. The body and all header fields should be
47    /// set the same as the original echo packet's.
48    EchoReply = b'e',
49    /// Connect to a cid:port pair from a cid:port pair on the other side. The payload must be empty.
50    Connect = b'C',
51    /// Notify the other end that this connection should be closed. The other end should respond
52    /// with an [`PacketType::Reset`] when the connection has been closed on the other end. The
53    /// payload must be empty.
54    Finish = b'F',
55    /// Terminate or refuse a connection on a particular cid:port pair set. There must have been a
56    /// previous [`PacketType::Connect`] request for this, and after this that particular set of
57    /// pairs must be considered disconnected and no more [`PacketType::Data`] packets may be sent
58    /// for it unless a new connection is initiated. The payload must be empty.
59    Reset = b'R',
60    /// Accepts a connection previously requested with [`PacketType::Connect`] on the given cid:port
61    /// pair set. The payload must be empty.
62    Accept = b'A',
63    /// A data packet for a particular cid:port pair set previously established with a [`PacketType::Connect`]
64    /// and [`PacketType::Accept`] message. If all of the cid and port fields of the packet are
65    /// zero, this is for a special data stream between the host and device that does not require
66    /// an established connection.
67    Data = b'D',
68    /// Advisory flow control message. Payload indicates flow control state "on"
69    /// or "off". If "on", it is recommended that the receiver not send more data
70    /// for this connection if possible, until the state becomes "off" again.
71    /// State is assumed "off" when no packet has been received.
72    Pause = b'X',
73}
74
75/// The packet header for a vsock packet passed over the usb vsock link. Each usb packet can contain
76/// one or more packets, each of which must start with a valid header and correct payload length.
77#[repr(C, packed(1))]
78#[derive(
79    Debug,
80    TryFromBytes,
81    IntoBytes,
82    KnownLayout,
83    Immutable,
84    Unaligned,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Hash,
90    Clone,
91)]
92pub struct Header {
93    magic: [u8; 3],
94    /// The type of this packet
95    pub packet_type: PacketType,
96    /// For Connect, Reset, Accept, and Data packets this represents the device side's address.
97    /// Usually this will be a special value representing either that it is simply "the device",
98    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
99    /// packet. Must be zero for any other packet type.
100    pub device_cid: little_endian::U32,
101    /// For Connect, Reset, Accept, and Data packets this represents the host side's address.
102    /// Usually this will be a special value representing either that it is simply "the host",
103    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
104    /// packet. Must be zero for any other packet type.
105    pub host_cid: little_endian::U32,
106    /// For Connect, Reset, Accept, and Data packets this represents the device side's port.
107    /// This must be a valid positive value for any of those packet types, unless all of the cid and
108    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
109    /// other packet type.
110    pub device_port: little_endian::U32,
111    /// For Connect, Reset, Accept, and Data packets this represents the host side's port.
112    /// This must be a valid positive value for any of those packet types, unless all of the cid and
113    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
114    /// other packet type.
115    pub host_port: little_endian::U32,
116    /// The length of the packet payload. This must be zero for any packet type other than Sync or
117    /// Data.
118    pub payload_len: little_endian::U32,
119}
120
121impl Header {
122    /// Helper constant for the size of a header on the wire
123    pub const SIZE: usize = size_of::<Self>();
124    const MAGIC: &'static [u8; 3] = b"ffx";
125
126    /// Builds a new packet with correct magic value and packet type and all other fields
127    /// initialized to zero.
128    pub fn new(packet_type: PacketType) -> Self {
129        let device_cid = 0.into();
130        let host_cid = 0.into();
131        let device_port = 0.into();
132        let host_port = 0.into();
133        let payload_len = 0.into();
134        Header {
135            magic: *Self::MAGIC,
136            packet_type,
137            device_cid,
138            host_cid,
139            device_port,
140            host_port,
141            payload_len,
142        }
143    }
144
145    /// Gets the size of this packet on the wire with the header and a payload of length
146    /// [`Self::payload_len`].
147    pub fn packet_size(&self) -> usize {
148        Packet::size_with_payload(self.payload_len.get() as usize)
149    }
150
151    /// Sets the address fields of this packet header based on the normalized address in `addr`.
152    pub fn set_address(&mut self, addr: &Address) {
153        self.device_cid.set(addr.device_cid);
154        self.host_cid.set(addr.host_cid);
155        self.device_port.set(addr.device_port);
156        self.host_port.set(addr.host_port);
157    }
158}
159
160/// A typed reference to the contents of a packet in a buffer.
161#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
162pub struct Packet<'a> {
163    /// The packet's header
164    pub header: &'a Header,
165    /// The packet's payload
166    pub payload: &'a [u8],
167}
168
169impl<'a> Packet<'a> {
170    /// The size of this packet according to its header (as [`Self::payload`] may have been
171    /// over-allocated for the size of the packet).
172    pub fn size(&self) -> usize {
173        self.header.packet_size()
174    }
175
176    fn size_with_payload(payload_size: usize) -> usize {
177        size_of::<Header>() + payload_size
178    }
179
180    fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
181        // split off and validate the header
182        let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
183            return Err(std::io::Error::other("insufficient data for last packet"));
184        };
185        let header = Header::try_ref_from_bytes(header).map_err(|err| {
186            std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
187        })?;
188        if header.magic != *Header::MAGIC {
189            return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
190        }
191        // validate the payload length
192        let payload_len = Into::<u64>::into(header.payload_len) as usize;
193        let body_len = body.len();
194        if payload_len > body_len {
195            return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
196        }
197
198        let (payload, remain) = body.split_at(payload_len);
199        Ok((Packet { header, payload }, remain))
200    }
201
202    /// Writes the packet to a buffer when the buffer is known to be large enough to hold it. Note
203    /// that the packet header's [`Header::payload_len`] must be correct before calling this, it
204    /// does not use the size of [`Self::payload`] to decide how much of the payload buffer is
205    /// valid.
206    ///
207    /// # Panics
208    ///
209    /// Panics if the buffer is not large enough for the packet.
210    pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
211        let (packet, remain) = buf.split_at_mut(self.size());
212        self.header.write_to_prefix(packet).unwrap();
213        self.payload.write_to_suffix(packet).unwrap();
214        remain
215    }
216}
217
218/// A typed mutable reference to the contents of a packet in a buffer.
219#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
220pub struct PacketMut<'a> {
221    /// The packet's header.
222    pub header: &'a mut Header,
223    /// The packet's payload.
224    pub payload: &'a mut [u8],
225}
226
227impl<'a> PacketMut<'a> {
228    /// Creates a new [`PacketMut`] inside the given buffer and initializes the header to the given
229    /// [`PacketType`] before returning it. All other fields in the header will be zeroed, and the
230    /// [`PacketMut::payload`] will be the remaining area of the buffer after the header.
231    ///
232    /// Use [`PacketMut::finish`] to validate and write the proper packet length and return the
233    /// total size of the packet.
234    ///
235    /// # Panics
236    ///
237    /// The buffer must be large enough to hold at least a packet header, and this will panic if
238    /// it's not.
239    pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
240        Header::new(packet_type)
241            .write_to_prefix(buf)
242            .expect("not enough room in buffer for packet header");
243        let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
244        let header = Header::try_mut_from_bytes(header_bytes).unwrap();
245        PacketMut { header, payload }
246    }
247
248    /// Validates the correctness of the packet and returns the size of the packet within the
249    /// original buffer.
250    pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
251        if payload_len <= self.payload.len() {
252            self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
253            Ok(Header::SIZE + payload_len)
254        } else {
255            Err(PacketTooBigError)
256        }
257    }
258}
259
260/// Reads a sequence of vsock packets from a given usb packet buffer
261pub struct VsockPacketIterator<'a> {
262    buf: Option<&'a [u8]>,
263}
264
265impl<'a> VsockPacketIterator<'a> {
266    /// Creates a new [`PacketStream`] from the contents of `buf`. The returned stream will
267    /// iterate over individual vsock packets.
268    pub fn new(buf: &'a [u8]) -> Self {
269        Self { buf: Some(buf) }
270    }
271}
272
273impl<'a> FusedIterator for VsockPacketIterator<'a> {}
274impl<'a> Iterator for VsockPacketIterator<'a> {
275    type Item = Result<Packet<'a>, std::io::Error>;
276
277    fn next(&mut self) -> Option<Self::Item> {
278        // return immediately if we've already returned `None` or `Some(Err)`
279        let data = self.buf.take()?;
280
281        // also return immediately if there's no more data in the buffer
282        if data.len() == 0 {
283            return None;
284        }
285
286        match Packet::parse_next(data) {
287            Ok((header, rest)) => {
288                // update our pointer for next time
289                self.buf = Some(rest);
290                Some(Ok(header))
291            }
292            Err(err) => Some(Err(err)),
293        }
294    }
295}
296
297/// Builds an aggregate usb packet out of vsock packets and gives readiness
298/// notifications when there is room to add another packet or data available to send.
299pub struct UsbPacketBuilder<B> {
300    buffer: B,
301    offset: usize,
302    space_waker: AtomicWaker,
303    packet_waker: AtomicWaker,
304}
305
306/// the size of the packet would have been too large even if the buffer was empty
307#[derive(Debug, Copy, Clone)]
308pub struct PacketTooBigError;
309
310impl<B> UsbPacketBuilder<B> {
311    /// Creates a new builder from `buffer`, which is a type that can be used as a mutable slice
312    /// with space available for storing vsock packets. The `readable_notify` will have a message
313    /// sent to it whenever a usb packet could be transmitted.
314    pub fn new(buffer: B) -> Self {
315        let offset = 0;
316        let space_waker = AtomicWaker::default();
317        let packet_waker = AtomicWaker::default();
318        Self { buffer, offset, space_waker, packet_waker }
319    }
320
321    /// Returns true if the packet has data in it
322    pub fn has_data(&self) -> bool {
323        self.offset > 0
324    }
325}
326
327impl<B> UsbPacketBuilder<B>
328where
329    B: std::ops::DerefMut<Target = [u8]>,
330{
331    /// Gets the space currently available for another packet in the buffer
332    pub fn available(&self) -> usize {
333        self.buffer.len() - self.offset
334    }
335
336    /// Writes the given packet into the buffer. The packet and header must be able to fit
337    /// within the buffer provided at creation time.
338    pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
339        let packet_size = packet.size();
340        if self.available() >= packet_size {
341            packet.write_to_unchecked(&mut self.buffer[self.offset..]);
342            self.offset += packet_size;
343            self.packet_waker.wake();
344            Ok(())
345        } else {
346            Err(PacketTooBigError)
347        }
348    }
349
350    /// Takes the current usb packet, if there is one. The returned mutable slice
351    /// will be only the data written to the buffer so far, and packet writing will be reset to the
352    /// beginning of the buffer.
353    pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
354        let written = self.offset;
355        if written == 0 {
356            return None;
357        }
358        self.offset = 0;
359        self.space_waker.wake();
360        Some(&mut self.buffer[0..written])
361    }
362}
363
364pub(crate) struct UsbPacketFiller<B> {
365    current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
366    out_packet_waker: AtomicWaker,
367    filled_packet_waker: AtomicWaker,
368}
369
370impl<B> Default for UsbPacketFiller<B> {
371    fn default() -> Self {
372        let current_out_packet = Mutex::default();
373        let out_packet_waker = AtomicWaker::default();
374        let filled_packet_waker = AtomicWaker::default();
375        Self { current_out_packet, out_packet_waker, filled_packet_waker }
376    }
377}
378
379impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
380    fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
381        WaitForFillable { filler: &self, min_packet_size }
382    }
383
384    pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
385        let mut builder = self.wait_for_fillable(packet.size()).await;
386        builder.as_mut().unwrap().write_vsock_packet(packet)?;
387        self.filled_packet_waker.wake();
388        Ok(())
389    }
390
391    pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
392        let header = &mut Header::new(PacketType::Data);
393        header.set_address(&address);
394        let mut builder = self.wait_for_fillable(1).await;
395        let builder = builder.as_mut().unwrap();
396        let writing = min(payload.len(), builder.available() - Header::SIZE);
397        header.payload_len.set(writing as u32);
398        builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
399        self.filled_packet_waker.wake();
400        writing
401    }
402
403    pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
404        let mut written = 0;
405        while written < payload.len() {
406            written += self.write_vsock_data(address, &payload[written..]).await;
407        }
408    }
409
410    /// Provides a packet builder for the state machine to write packets to. Returns a future that
411    /// will be fulfilled when there is data available to send on the packet.
412    ///
413    /// # Panics
414    ///
415    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
416    pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
417        FillUsbPacket(&self, Some(builder))
418    }
419}
420
421pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
422
423impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
424    type Output = UsbPacketBuilder<B>;
425
426    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
427        // if we're holding a `PacketBuilder` we haven't been waited on yet. Otherwise we want
428        // to return ready when there's a packet and it's got data in it.
429        if let Some(builder) = self.1.take() {
430            // if the packet we were handed for some reason already has data in it, hand it back
431            if builder.has_data() {
432                return Poll::Ready(builder);
433            }
434
435            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
436            assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
437            current_out_packet.replace(builder);
438            self.0.out_packet_waker.wake();
439            self.0.filled_packet_waker.register(cx.waker());
440            Poll::Pending
441        } else {
442            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
443            let Some(builder) = current_out_packet.take() else {
444                panic!("Packet builder was somehow removed from connection prematurely");
445            };
446
447            if builder.has_data() {
448                self.0.filled_packet_waker.wake();
449                Poll::Ready(builder)
450            } else {
451                // if there hasn't been any data placed in the packet, put the builder back and
452                // return Pending.
453                current_out_packet.replace(builder);
454                Poll::Pending
455            }
456        }
457    }
458}
459
460pub(crate) struct WaitForFillable<'a, B> {
461    filler: &'a UsbPacketFiller<B>,
462    min_packet_size: usize,
463}
464
465impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
466    type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
467
468    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469        let current_out_packet = self.filler.current_out_packet.lock().unwrap();
470        let Some(builder) = &*current_out_packet else {
471            self.filler.out_packet_waker.register(cx.waker());
472            return Poll::Pending;
473        };
474        if builder.available() >= self.min_packet_size {
475            Poll::Ready(current_out_packet)
476        } else {
477            self.filler.out_packet_waker.register(cx.waker());
478            Poll::Pending
479        }
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use std::sync::Arc;
486
487    use super::*;
488    use fuchsia_async::Task;
489    use futures::poll;
490
491    async fn assert_pending<F: Future>(fut: F) {
492        let fut = std::pin::pin!(fut);
493        if let Poll::Ready(_) = poll!(fut) {
494            panic!("Future was ready when it shouldn't have been");
495        }
496    }
497
498    #[fuchsia::test]
499    async fn roundtrip_packet() {
500        let payload = b"hello world!";
501        let packet = Packet {
502            payload,
503            header: &Header {
504                device_cid: 1.into(),
505                host_cid: 2.into(),
506                device_port: 3.into(),
507                host_port: 4.into(),
508                payload_len: little_endian::U32::from(payload.len() as u32),
509                ..Header::new(PacketType::Data)
510            },
511        };
512        let buffer = vec![0; packet.size()];
513        let builder = UsbPacketBuilder::new(buffer);
514        let filler = UsbPacketFiller::default();
515        let mut filled_fut = filler.fill_usb_packet(builder);
516        println!("we should not be ready to pull a usb packet off yet");
517        assert_pending(&mut filled_fut).await;
518
519        println!("we should be able to write a packet though ({} bytes)", packet.size());
520        filler.write_vsock_packet(&packet).await.unwrap();
521
522        println!("we shouldn't have any space for another packet now");
523        assert_pending(filler.wait_for_fillable(1)).await;
524
525        println!("but we should have a new usb packet available");
526        let mut builder = filled_fut.await;
527        let buffer = builder.take_usb_packet().unwrap();
528
529        println!("the packet we get back out should be the same one we put in");
530        let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
531        assert_eq!(packet, read_packet);
532        assert!(remain.is_empty());
533    }
534
535    #[fuchsia::test]
536    async fn many_packets() {
537        fn make_numbered_packet(num: u32) -> (Header, String) {
538            let payload = format!("packet #{num}!");
539            let header = Header {
540                device_cid: num.into(),
541                device_port: num.into(),
542                host_cid: num.into(),
543                host_port: num.into(),
544                payload_len: little_endian::U32::from(payload.len() as u32),
545                ..Header::new(PacketType::Data)
546            };
547            (header, payload)
548        }
549        const BUFFER_SIZE: usize = 256;
550        let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
551        let filler = Arc::new(UsbPacketFiller::default());
552
553        let send_filler = filler.clone();
554        let send_task = Task::spawn(async move {
555            for packet_num in 0..1024 {
556                let next_packet = make_numbered_packet(packet_num);
557                let next_packet =
558                    Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
559                send_filler.write_vsock_packet(&next_packet).await.unwrap();
560            }
561        });
562
563        let mut read_packet_num = 0;
564        while read_packet_num < 1024 {
565            builder = filler.fill_usb_packet(builder).await;
566            let buffer = builder.take_usb_packet().unwrap();
567            let mut num_packets = 0;
568            for packet in VsockPacketIterator::new(&buffer) {
569                let packet_compare = make_numbered_packet(read_packet_num);
570                let packet_compare =
571                    Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
572                assert_eq!(packet.unwrap(), packet_compare);
573                read_packet_num += 1;
574                num_packets += 1;
575            }
576            println!(
577                "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
578                count = BUFFER_SIZE - buffer.len()
579            );
580        }
581        send_task.await;
582        assert_eq!(1024, read_packet_num);
583    }
584
585    #[fuchsia::test]
586    async fn packet_fillable_futures() {
587        let filler = UsbPacketFiller::default();
588
589        for _ in 0..10 {
590            println!("register an interest in filling a usb packet");
591            let mut fillable_fut = filler.wait_for_fillable(1);
592            println!("make sure we have nothing to fill");
593            assert!(poll!(&mut fillable_fut).is_pending());
594
595            println!("register a packet for filling");
596            let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
597            println!("make sure we've registered the buffer");
598            assert!(poll!(&mut filled_fut).is_pending());
599
600            println!("now put some things in the packet");
601            let header = &mut Header::new(PacketType::Data);
602            header.payload_len.set(99);
603            let Poll::Ready(mut builder) = poll!(fillable_fut) else {
604                panic!("should have been ready to fill a packet")
605            };
606            builder
607                .as_mut()
608                .unwrap()
609                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
610                .unwrap();
611            drop(builder);
612            let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
613                panic!("should have been ready to fill a packet(2)")
614            };
615            builder
616                .as_mut()
617                .unwrap()
618                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
619                .unwrap();
620            drop(builder);
621
622            println!("but if we ask for too much space we'll get pending");
623            assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
624
625            println!("and now resolve the filled future and get our data back");
626            let mut filled = filled_fut.await;
627            let packets =
628                Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
629            assert_eq!(packets.len(), 2);
630        }
631    }
632}