usb_vsock/
packet.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18/// The serializable enumeration of packet types that can be used over a usb vsock link. These
19/// roughly correspond to the state machine described by the `fuchsia.hardware.vsock` fidl library.
20#[repr(u8)]
21#[derive(
22    Debug,
23    TryFromBytes,
24    IntoBytes,
25    KnownLayout,
26    Immutable,
27    Unaligned,
28    PartialEq,
29    Eq,
30    PartialOrd,
31    Ord,
32    Hash,
33    Clone,
34    Copy,
35)]
36pub enum PacketType {
37    /// Synchronizes the connection between host and device. Each side must send and receive a
38    /// sync packet with the same payload before any other packet may be recognized on the usb
39    /// connection. If this packet is received mid-stream, all connections must be considered
40    /// reset to avoid data loss. It should also only ever be the last vsock packet in a given
41    /// usb packet.
42    Sync = b'S',
43    /// Connect to a cid:port pair from a cid:port pair on the other side. The payload must be empty.
44    Connect = b'C',
45    /// Notify the other end that this connection should be closed. The other end should respond
46    /// with an [`PacketType::Reset`] when the connection has been closed on the other end. The
47    /// payload must be empty.
48    Finish = b'F',
49    /// Terminate or refuse a connection on a particular cid:port pair set. There must have been a
50    /// previous [`PacketType::Connect`] request for this, and after this that particular set of
51    /// pairs must be considered disconnected and no more [`PacketType::Data`] packets may be sent
52    /// for it unless a new connection is initiated. The payload must be empty.
53    Reset = b'R',
54    /// Accepts a connection previously requested with [`PacketType::Connect`] on the given cid:port
55    /// pair set. The payload must be empty.
56    Accept = b'A',
57    /// A data packet for a particular cid:port pair set previously established with a [`PacketType::Connect`]
58    /// and [`PacketType::Accept`] message. If all of the cid and port fields of the packet are
59    /// zero, this is for a special data stream between the host and device that does not require
60    /// an established connection.
61    Data = b'D',
62}
63
64/// The packet header for a vsock packet passed over the usb vsock link. Each usb packet can contain
65/// one or more packets, each of which must start with a valid header and correct payload length.
66#[repr(C, packed(1))]
67#[derive(
68    Debug,
69    TryFromBytes,
70    IntoBytes,
71    KnownLayout,
72    Immutable,
73    Unaligned,
74    PartialEq,
75    Eq,
76    PartialOrd,
77    Ord,
78    Hash,
79    Clone,
80)]
81pub struct Header {
82    magic: [u8; 3],
83    /// The type of this packet
84    pub packet_type: PacketType,
85    /// For Connect, Reset, Accept, and Data packets this represents the device side's address.
86    /// Usually this will be a special value representing either that it is simply "the device",
87    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
88    /// packet. Must be zero for any other packet type.
89    pub device_cid: little_endian::U32,
90    /// For Connect, Reset, Accept, and Data packets this represents the host side's address.
91    /// Usually this will be a special value representing either that it is simply "the host",
92    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
93    /// packet. Must be zero for any other packet type.
94    pub host_cid: little_endian::U32,
95    /// For Connect, Reset, Accept, and Data packets this represents the device side's port.
96    /// This must be a valid positive value for any of those packet types, unless all of the cid and
97    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
98    /// other packet type.
99    pub device_port: little_endian::U32,
100    /// For Connect, Reset, Accept, and Data packets this represents the host side's port.
101    /// This must be a valid positive value for any of those packet types, unless all of the cid and
102    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
103    /// other packet type.
104    pub host_port: little_endian::U32,
105    /// The length of the packet payload. This must be zero for any packet type other than Sync or
106    /// Data.
107    pub payload_len: little_endian::U32,
108}
109
110impl Header {
111    /// Helper constant for the size of a header on the wire
112    pub const SIZE: usize = size_of::<Self>();
113    const MAGIC: &'static [u8; 3] = b"ffx";
114
115    /// Builds a new packet with correct magic value and packet type and all other fields
116    /// initialized to zero.
117    pub fn new(packet_type: PacketType) -> Self {
118        let device_cid = 0.into();
119        let host_cid = 0.into();
120        let device_port = 0.into();
121        let host_port = 0.into();
122        let payload_len = 0.into();
123        Header {
124            magic: *Self::MAGIC,
125            packet_type,
126            device_cid,
127            host_cid,
128            device_port,
129            host_port,
130            payload_len,
131        }
132    }
133
134    /// Gets the size of this packet on the wire with the header and a payload of length
135    /// [`Self::payload_len`].
136    pub fn packet_size(&self) -> usize {
137        Packet::size_with_payload(self.payload_len.get() as usize)
138    }
139
140    /// Sets the address fields of this packet header based on the normalized address in `addr`.
141    pub fn set_address(&mut self, addr: &Address) {
142        self.device_cid.set(addr.device_cid);
143        self.host_cid.set(addr.host_cid);
144        self.device_port.set(addr.device_port);
145        self.host_port.set(addr.host_port);
146    }
147}
148
149/// A typed reference to the contents of a packet in a buffer.
150#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
151pub struct Packet<'a> {
152    /// The packet's header
153    pub header: &'a Header,
154    /// The packet's payload
155    pub payload: &'a [u8],
156}
157
158impl<'a> Packet<'a> {
159    /// The size of this packet according to its header (as [`Self::payload`] may have been
160    /// over-allocated for the size of the packet).
161    pub fn size(&self) -> usize {
162        self.header.packet_size()
163    }
164
165    fn size_with_payload(payload_size: usize) -> usize {
166        size_of::<Header>() + payload_size
167    }
168
169    fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
170        // split off and validate the header
171        let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
172            return Err(std::io::Error::other("insufficient data for last packet"));
173        };
174        let header = Header::try_ref_from_bytes(header).map_err(|err| {
175            std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
176        })?;
177        if header.magic != *Header::MAGIC {
178            return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
179        }
180        // validate the payload length
181        let payload_len = Into::<u64>::into(header.payload_len) as usize;
182        let body_len = body.len();
183        if payload_len > body_len {
184            return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
185        }
186
187        let (payload, remain) = body.split_at(payload_len);
188        Ok((Packet { header, payload }, remain))
189    }
190
191    /// Writes the packet to a buffer when the buffer is known to be large enough to hold it. Note
192    /// that the packet header's [`Header::payload_len`] must be correct before calling this, it
193    /// does not use the size of [`Self::payload`] to decide how much of the payload buffer is
194    /// valid.
195    ///
196    /// # Panics
197    ///
198    /// Panics if the buffer is not large enough for the packet.
199    pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
200        let (packet, remain) = buf.split_at_mut(self.size());
201        self.header.write_to_prefix(packet).unwrap();
202        self.payload.write_to_suffix(packet).unwrap();
203        remain
204    }
205}
206
207/// A typed mutable reference to the contents of a packet in a buffer.
208#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
209pub struct PacketMut<'a> {
210    /// The packet's header.
211    pub header: &'a mut Header,
212    /// The packet's payload.
213    pub payload: &'a mut [u8],
214}
215
216impl<'a> PacketMut<'a> {
217    /// Creates a new [`PacketMut`] inside the given buffer and initializes the header to the given
218    /// [`PacketType`] before returning it. All other fields in the header will be zeroed, and the
219    /// [`PacketMut::payload`] will be the remaining area of the buffer after the header.
220    ///
221    /// Use [`PacketMut::finish`] to validate and write the proper packet length and return the
222    /// total size of the packet.
223    ///
224    /// # Panics
225    ///
226    /// The buffer must be large enough to hold at least a packet header, and this will panic if
227    /// it's not.
228    pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
229        Header::new(packet_type)
230            .write_to_prefix(buf)
231            .expect("not enough room in buffer for packet header");
232        let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
233        let header = Header::try_mut_from_bytes(header_bytes).unwrap();
234        PacketMut { header, payload }
235    }
236
237    /// Validates the correctness of the packet and returns the size of the packet within the
238    /// original buffer.
239    pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
240        if payload_len <= self.payload.len() {
241            self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
242            Ok(Header::SIZE + payload_len)
243        } else {
244            Err(PacketTooBigError)
245        }
246    }
247}
248
249/// Reads a sequence of vsock packets from a given usb packet buffer
250pub struct VsockPacketIterator<'a> {
251    buf: Option<&'a [u8]>,
252}
253
254impl<'a> VsockPacketIterator<'a> {
255    /// Creates a new [`PacketStream`] from the contents of `buf`. The returned stream will
256    /// iterate over individual vsock packets.
257    pub fn new(buf: &'a [u8]) -> Self {
258        Self { buf: Some(buf) }
259    }
260}
261
262impl<'a> FusedIterator for VsockPacketIterator<'a> {}
263impl<'a> Iterator for VsockPacketIterator<'a> {
264    type Item = Result<Packet<'a>, std::io::Error>;
265
266    fn next(&mut self) -> Option<Self::Item> {
267        // return immediately if we've already returned `None` or `Some(Err)`
268        let data = self.buf.take()?;
269
270        // also return immediately if there's no more data in the buffer
271        if data.len() == 0 {
272            return None;
273        }
274
275        match Packet::parse_next(data) {
276            Ok((header, rest)) => {
277                // update our pointer for next time
278                self.buf = Some(rest);
279                Some(Ok(header))
280            }
281            Err(err) => Some(Err(err)),
282        }
283    }
284}
285
286/// Builds an aggregate usb packet out of vsock packets and gives readiness
287/// notifications when there is room to add another packet or data available to send.
288pub struct UsbPacketBuilder<B> {
289    buffer: B,
290    offset: usize,
291    space_waker: AtomicWaker,
292    packet_waker: AtomicWaker,
293}
294
295/// the size of the packet would have been too large even if the buffer was empty
296#[derive(Debug, Copy, Clone)]
297pub struct PacketTooBigError;
298
299impl<B> UsbPacketBuilder<B> {
300    /// Creates a new builder from `buffer`, which is a type that can be used as a mutable slice
301    /// with space available for storing vsock packets. The `readable_notify` will have a message
302    /// sent to it whenever a usb packet could be transmitted.
303    pub fn new(buffer: B) -> Self {
304        let offset = 0;
305        let space_waker = AtomicWaker::default();
306        let packet_waker = AtomicWaker::default();
307        Self { buffer, offset, space_waker, packet_waker }
308    }
309
310    /// Returns true if the packet has data in it
311    pub fn has_data(&self) -> bool {
312        self.offset > 0
313    }
314}
315
316impl<B> UsbPacketBuilder<B>
317where
318    B: std::ops::DerefMut<Target = [u8]>,
319{
320    /// Gets the space currently available for another packet in the buffer
321    pub fn available(&self) -> usize {
322        self.buffer.len() - self.offset
323    }
324
325    /// Writes the given packet into the buffer. The packet and header must be able to fit
326    /// within the buffer provided at creation time.
327    pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
328        let packet_size = packet.size();
329        if self.available() >= packet_size {
330            packet.write_to_unchecked(&mut self.buffer[self.offset..]);
331            self.offset += packet_size;
332            self.packet_waker.wake();
333            Ok(())
334        } else {
335            Err(PacketTooBigError)
336        }
337    }
338
339    /// Takes the current usb packet, if there is one. The returned mutable slice
340    /// will be only the data written to the buffer so far, and packet writing will be reset to the
341    /// beginning of the buffer.
342    pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
343        let written = self.offset;
344        if written == 0 {
345            return None;
346        }
347        self.offset = 0;
348        self.space_waker.wake();
349        Some(&mut self.buffer[0..written])
350    }
351}
352
353pub(crate) struct UsbPacketFiller<B> {
354    current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
355    out_packet_waker: AtomicWaker,
356    filled_packet_waker: AtomicWaker,
357}
358
359impl<B> Default for UsbPacketFiller<B> {
360    fn default() -> Self {
361        let current_out_packet = Mutex::default();
362        let out_packet_waker = AtomicWaker::default();
363        let filled_packet_waker = AtomicWaker::default();
364        Self { current_out_packet, out_packet_waker, filled_packet_waker }
365    }
366}
367
368impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
369    fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
370        WaitForFillable { filler: &self, min_packet_size }
371    }
372
373    pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
374        let mut builder = self.wait_for_fillable(packet.size()).await;
375        builder.as_mut().unwrap().write_vsock_packet(packet)?;
376        self.filled_packet_waker.wake();
377        Ok(())
378    }
379
380    pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
381        let header = &mut Header::new(PacketType::Data);
382        header.set_address(&address);
383        let mut builder = self.wait_for_fillable(1).await;
384        let builder = builder.as_mut().unwrap();
385        let writing = min(payload.len(), builder.available() - Header::SIZE);
386        header.payload_len.set(writing as u32);
387        builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
388        self.filled_packet_waker.wake();
389        writing
390    }
391
392    pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
393        let mut written = 0;
394        while written < payload.len() {
395            written += self.write_vsock_data(address, &payload[written..]).await;
396        }
397    }
398
399    /// Provides a packet builder for the state machine to write packets to. Returns a future that
400    /// will be fulfilled when there is data available to send on the packet.
401    ///
402    /// # Panics
403    ///
404    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
405    pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
406        FillUsbPacket(&self, Some(builder))
407    }
408}
409
410pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
411
412impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
413    type Output = UsbPacketBuilder<B>;
414
415    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
416        // if we're holding a `PacketBuilder` we haven't been waited on yet. Otherwise we want
417        // to return ready when there's a packet and it's got data in it.
418        if let Some(builder) = self.1.take() {
419            // if the packet we were handed for some reason already has data in it, hand it back
420            if builder.has_data() {
421                return Poll::Ready(builder);
422            }
423
424            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
425            assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
426            current_out_packet.replace(builder);
427            self.0.out_packet_waker.wake();
428            self.0.filled_packet_waker.register(cx.waker());
429            Poll::Pending
430        } else {
431            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
432            let Some(builder) = current_out_packet.take() else {
433                panic!("Packet builder was somehow removed from connection prematurely");
434            };
435
436            if builder.has_data() {
437                self.0.filled_packet_waker.wake();
438                Poll::Ready(builder)
439            } else {
440                // if there hasn't been any data placed in the packet, put the builder back and
441                // return Pending.
442                current_out_packet.replace(builder);
443                Poll::Pending
444            }
445        }
446    }
447}
448
449pub(crate) struct WaitForFillable<'a, B> {
450    filler: &'a UsbPacketFiller<B>,
451    min_packet_size: usize,
452}
453
454impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
455    type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
456
457    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
458        let current_out_packet = self.filler.current_out_packet.lock().unwrap();
459        let Some(builder) = &*current_out_packet else {
460            self.filler.out_packet_waker.register(cx.waker());
461            return Poll::Pending;
462        };
463        if builder.available() >= self.min_packet_size {
464            Poll::Ready(current_out_packet)
465        } else {
466            self.filler.out_packet_waker.register(cx.waker());
467            Poll::Pending
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use std::sync::Arc;
475
476    use super::*;
477    use fuchsia_async::Task;
478    use futures::poll;
479
480    async fn assert_pending<F: Future>(fut: F) {
481        let fut = std::pin::pin!(fut);
482        if let Poll::Ready(_) = poll!(fut) {
483            panic!("Future was ready when it shouldn't have been");
484        }
485    }
486
487    #[fuchsia::test]
488    async fn roundtrip_packet() {
489        let payload = b"hello world!";
490        let packet = Packet {
491            payload,
492            header: &Header {
493                device_cid: 1.into(),
494                host_cid: 2.into(),
495                device_port: 3.into(),
496                host_port: 4.into(),
497                payload_len: little_endian::U32::from(payload.len() as u32),
498                ..Header::new(PacketType::Data)
499            },
500        };
501        let buffer = vec![0; packet.size()];
502        let builder = UsbPacketBuilder::new(buffer);
503        let filler = UsbPacketFiller::default();
504        let mut filled_fut = filler.fill_usb_packet(builder);
505        println!("we should not be ready to pull a usb packet off yet");
506        assert_pending(&mut filled_fut).await;
507
508        println!("we should be able to write a packet though ({} bytes)", packet.size());
509        filler.write_vsock_packet(&packet).await.unwrap();
510
511        println!("we shouldn't have any space for another packet now");
512        assert_pending(filler.wait_for_fillable(1)).await;
513
514        println!("but we should have a new usb packet available");
515        let mut builder = filled_fut.await;
516        let buffer = builder.take_usb_packet().unwrap();
517
518        println!("the packet we get back out should be the same one we put in");
519        let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
520        assert_eq!(packet, read_packet);
521        assert!(remain.is_empty());
522    }
523
524    #[fuchsia::test]
525    async fn many_packets() {
526        fn make_numbered_packet(num: u32) -> (Header, String) {
527            let payload = format!("packet #{num}!");
528            let header = Header {
529                device_cid: num.into(),
530                device_port: num.into(),
531                host_cid: num.into(),
532                host_port: num.into(),
533                payload_len: little_endian::U32::from(payload.len() as u32),
534                ..Header::new(PacketType::Data)
535            };
536            (header, payload)
537        }
538        const BUFFER_SIZE: usize = 256;
539        let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
540        let filler = Arc::new(UsbPacketFiller::default());
541
542        let send_filler = filler.clone();
543        let send_task = Task::spawn(async move {
544            for packet_num in 0..1024 {
545                let next_packet = make_numbered_packet(packet_num);
546                let next_packet =
547                    Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
548                send_filler.write_vsock_packet(&next_packet).await.unwrap();
549            }
550        });
551
552        let mut read_packet_num = 0;
553        while read_packet_num < 1024 {
554            builder = filler.fill_usb_packet(builder).await;
555            let buffer = builder.take_usb_packet().unwrap();
556            let mut num_packets = 0;
557            for packet in VsockPacketIterator::new(&buffer) {
558                let packet_compare = make_numbered_packet(read_packet_num);
559                let packet_compare =
560                    Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
561                assert_eq!(packet.unwrap(), packet_compare);
562                read_packet_num += 1;
563                num_packets += 1;
564            }
565            println!(
566                "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
567                count = BUFFER_SIZE - buffer.len()
568            );
569        }
570        send_task.await;
571        assert_eq!(1024, read_packet_num);
572    }
573
574    #[fuchsia::test]
575    async fn packet_fillable_futures() {
576        let filler = UsbPacketFiller::default();
577
578        for _ in 0..10 {
579            println!("register an interest in filling a usb packet");
580            let mut fillable_fut = filler.wait_for_fillable(1);
581            println!("make sure we have nothing to fill");
582            assert!(poll!(&mut fillable_fut).is_pending());
583
584            println!("register a packet for filling");
585            let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
586            println!("make sure we've registered the buffer");
587            assert!(poll!(&mut filled_fut).is_pending());
588
589            println!("now put some things in the packet");
590            let header = &mut Header::new(PacketType::Data);
591            header.payload_len.set(99);
592            let Poll::Ready(mut builder) = poll!(fillable_fut) else {
593                panic!("should have been ready to fill a packet")
594            };
595            builder
596                .as_mut()
597                .unwrap()
598                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
599                .unwrap();
600            drop(builder);
601            let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
602                panic!("should have been ready to fill a packet(2)")
603            };
604            builder
605                .as_mut()
606                .unwrap()
607                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
608                .unwrap();
609            drop(builder);
610
611            println!("but if we ask for too much space we'll get pending");
612            assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
613
614            println!("and now resolve the filled future and get our data back");
615            let mut filled = filled_fut.await;
616            let packets =
617                Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
618            assert_eq!(packets.len(), 2);
619        }
620    }
621}