usb_vsock/
packet.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned, little_endian};
15
16use crate::Address;
17
18/// The serializable enumeration of packet types that can be used over a usb vsock link. These
19/// roughly correspond to the state machine described by the `fuchsia.hardware.vsock` fidl library.
20#[repr(u8)]
21#[derive(
22    Debug,
23    TryFromBytes,
24    IntoBytes,
25    KnownLayout,
26    Immutable,
27    Unaligned,
28    PartialEq,
29    Eq,
30    PartialOrd,
31    Ord,
32    Hash,
33    Clone,
34    Copy,
35)]
36pub enum PacketType {
37    /// Synchronizes the connection between host and device. Each side must send and receive a
38    /// sync packet with the same payload before any other packet may be recognized on the usb
39    /// connection. If this packet is received mid-stream, all connections must be considered
40    /// reset to avoid data loss. It should also only ever be the last vsock packet in a given
41    /// usb packet.
42    Sync = b'S',
43    /// An outbound echo request. The other end should reply to this with the same body and all the
44    /// same fields in a [`PacketType::EchoReply`] packet, no matter the state of the connection.
45    Echo = b'E',
46    /// A reply to a [`PacketType::Echo`] request packet. The body and all header fields should be
47    /// set the same as the original echo packet's.
48    EchoReply = b'e',
49    /// Connect to a cid:port pair from a cid:port pair on the other side. The payload must be empty.
50    Connect = b'C',
51    /// Notify the other end that this connection should be closed. The other end should respond
52    /// with an [`PacketType::Reset`] when the connection has been closed on the other end. The
53    /// payload must be empty.
54    Finish = b'F',
55    /// Terminate or refuse a connection on a particular cid:port pair set. There must have been a
56    /// previous [`PacketType::Connect`] request for this, and after this that particular set of
57    /// pairs must be considered disconnected and no more [`PacketType::Data`] packets may be sent
58    /// for it unless a new connection is initiated. The payload must be empty.
59    Reset = b'R',
60    /// Accepts a connection previously requested with [`PacketType::Connect`] on the given cid:port
61    /// pair set. The payload must be empty.
62    Accept = b'A',
63    /// A data packet for a particular cid:port pair set previously established with a [`PacketType::Connect`]
64    /// and [`PacketType::Accept`] message. If all of the cid and port fields of the packet are
65    /// zero, this is for a special data stream between the host and device that does not require
66    /// an established connection.
67    Data = b'D',
68    /// Advisory flow control message. Payload indicates flow control state "on"
69    /// or "off". If "on", it is recommended that the receiver not send more data
70    /// for this connection if possible, until the state becomes "off" again.
71    /// State is assumed "off" when no packet has been received.
72    Pause = b'X',
73}
74
75/// The packet header for a vsock packet passed over the usb vsock link. Each usb packet can contain
76/// one or more packets, each of which must start with a valid header and correct payload length.
77#[repr(C, packed(1))]
78#[derive(
79    Debug,
80    TryFromBytes,
81    IntoBytes,
82    KnownLayout,
83    Immutable,
84    Unaligned,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Hash,
90    Clone,
91)]
92pub struct Header {
93    magic: [u8; 3],
94    /// The type of this packet
95    pub packet_type: PacketType,
96    /// For Connect, Reset, Accept, and Data packets this represents the device side's address.
97    /// Usually this will be a special value representing either that it is simply "the device",
98    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
99    /// packet. Must be zero for any other packet type.
100    pub device_cid: little_endian::U32,
101    /// For Connect, Reset, Accept, and Data packets this represents the host side's address.
102    /// Usually this will be a special value representing either that it is simply "the host",
103    /// or zero along with the rest of the cid and port fields to indicate that it's a control stream
104    /// packet. Must be zero for any other packet type.
105    pub host_cid: little_endian::U32,
106    /// For Connect, Reset, Accept, and Data packets this represents the device side's port.
107    /// This must be a valid positive value for any of those packet types, unless all of the cid and
108    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
109    /// other packet type.
110    pub device_port: little_endian::U32,
111    /// For Connect, Reset, Accept, and Data packets this represents the host side's port.
112    /// This must be a valid positive value for any of those packet types, unless all of the cid and
113    /// port fields are also zero, in which case it is a control stream packet. Must be zero for any
114    /// other packet type.
115    pub host_port: little_endian::U32,
116    /// The length of the packet payload. This must be zero for any packet type other than Sync or
117    /// Data.
118    pub payload_len: little_endian::U32,
119}
120
121impl Header {
122    /// Helper constant for the size of a header on the wire
123    pub const SIZE: usize = size_of::<Self>();
124    const MAGIC: &'static [u8; 3] = b"ffx";
125
126    /// Builds a new packet with correct magic value and packet type and all other fields
127    /// initialized to zero.
128    pub fn new(packet_type: PacketType) -> Self {
129        let device_cid = 0.into();
130        let host_cid = 0.into();
131        let device_port = 0.into();
132        let host_port = 0.into();
133        let payload_len = 0.into();
134        Header {
135            magic: *Self::MAGIC,
136            packet_type,
137            device_cid,
138            host_cid,
139            device_port,
140            host_port,
141            payload_len,
142        }
143    }
144
145    /// Gets the size of this packet on the wire with the header and a payload of length
146    /// [`Self::payload_len`].
147    pub fn packet_size(&self) -> usize {
148        Packet::size_with_payload(self.payload_len.get() as usize)
149    }
150
151    /// Sets the address fields of this packet header based on the normalized address in `addr`.
152    pub fn set_address(&mut self, addr: &Address) {
153        self.device_cid.set(addr.device_cid);
154        self.host_cid.set(addr.host_cid);
155        self.device_port.set(addr.device_port);
156        self.host_port.set(addr.host_port);
157    }
158}
159
160/// A typed reference to the contents of a packet in a buffer.
161#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
162pub struct Packet<'a> {
163    /// The packet's header
164    pub header: &'a Header,
165    /// The packet's payload
166    pub payload: &'a [u8],
167}
168
169impl<'a> Packet<'a> {
170    /// The size of this packet according to its header (as [`Self::payload`] may have been
171    /// over-allocated for the size of the packet).
172    pub fn size(&self) -> usize {
173        self.header.packet_size()
174    }
175
176    fn size_with_payload(payload_size: usize) -> usize {
177        size_of::<Header>() + payload_size
178    }
179
180    fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
181        // split off and validate the header
182        let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
183            return Err(std::io::Error::other("insufficient data for last packet"));
184        };
185        let header = Header::try_ref_from_bytes(header).map_err(|err| {
186            std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
187        })?;
188        if header.magic != *Header::MAGIC {
189            return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
190        }
191        // validate the payload length
192        let payload_len = Into::<u64>::into(header.payload_len) as usize;
193        let body_len = body.len();
194        if payload_len > body_len {
195            return Err(std::io::Error::other(format!(
196                "payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}"
197            )));
198        }
199
200        let (payload, remain) = body.split_at(payload_len);
201        Ok((Packet { header, payload }, remain))
202    }
203
204    /// Writes the packet to a buffer when the buffer is known to be large enough to hold it. Note
205    /// that the packet header's [`Header::payload_len`] must be correct before calling this, it
206    /// does not use the size of [`Self::payload`] to decide how much of the payload buffer is
207    /// valid.
208    ///
209    /// # Panics
210    ///
211    /// Panics if the buffer is not large enough for the packet.
212    pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
213        let (packet, remain) = buf.split_at_mut(self.size());
214        self.header.write_to_prefix(packet).unwrap();
215        self.payload.write_to_suffix(packet).unwrap();
216        remain
217    }
218}
219
220/// A typed mutable reference to the contents of a packet in a buffer.
221#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
222pub struct PacketMut<'a> {
223    /// The packet's header.
224    pub header: &'a mut Header,
225    /// The packet's payload.
226    pub payload: &'a mut [u8],
227}
228
229impl<'a> PacketMut<'a> {
230    /// Creates a new [`PacketMut`] inside the given buffer and initializes the header to the given
231    /// [`PacketType`] before returning it. All other fields in the header will be zeroed, and the
232    /// [`PacketMut::payload`] will be the remaining area of the buffer after the header.
233    ///
234    /// Use [`PacketMut::finish`] to validate and write the proper packet length and return the
235    /// total size of the packet.
236    ///
237    /// # Panics
238    ///
239    /// The buffer must be large enough to hold at least a packet header, and this will panic if
240    /// it's not.
241    pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
242        Header::new(packet_type)
243            .write_to_prefix(buf)
244            .expect("not enough room in buffer for packet header");
245        let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
246        let header = Header::try_mut_from_bytes(header_bytes).unwrap();
247        PacketMut { header, payload }
248    }
249
250    /// Validates the correctness of the packet and returns the size of the packet within the
251    /// original buffer.
252    pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
253        if payload_len <= self.payload.len() {
254            self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
255            Ok(Header::SIZE + payload_len)
256        } else {
257            Err(PacketTooBigError)
258        }
259    }
260}
261
262/// Reads a sequence of vsock packets from a given usb packet buffer
263pub struct VsockPacketIterator<'a> {
264    buf: Option<&'a [u8]>,
265}
266
267impl<'a> VsockPacketIterator<'a> {
268    /// Creates a new [`PacketStream`] from the contents of `buf`. The returned stream will
269    /// iterate over individual vsock packets.
270    pub fn new(buf: &'a [u8]) -> Self {
271        Self { buf: Some(buf) }
272    }
273}
274
275impl<'a> FusedIterator for VsockPacketIterator<'a> {}
276impl<'a> Iterator for VsockPacketIterator<'a> {
277    type Item = Result<Packet<'a>, std::io::Error>;
278
279    fn next(&mut self) -> Option<Self::Item> {
280        // return immediately if we've already returned `None` or `Some(Err)`
281        let data = self.buf.take()?;
282
283        // also return immediately if there's no more data in the buffer
284        if data.len() == 0 {
285            return None;
286        }
287
288        match Packet::parse_next(data) {
289            Ok((header, rest)) => {
290                // update our pointer for next time
291                self.buf = Some(rest);
292                Some(Ok(header))
293            }
294            Err(err) => Some(Err(err)),
295        }
296    }
297}
298
299/// Builds an aggregate usb packet out of vsock packets and gives readiness
300/// notifications when there is room to add another packet or data available to send.
301pub struct UsbPacketBuilder<B> {
302    buffer: B,
303    offset: usize,
304    space_waker: AtomicWaker,
305    packet_waker: AtomicWaker,
306}
307
308/// the size of the packet would have been too large even if the buffer was empty
309#[derive(Debug, Copy, Clone)]
310pub struct PacketTooBigError;
311
312impl<B> UsbPacketBuilder<B> {
313    /// Creates a new builder from `buffer`, which is a type that can be used as a mutable slice
314    /// with space available for storing vsock packets. The `readable_notify` will have a message
315    /// sent to it whenever a usb packet could be transmitted.
316    pub fn new(buffer: B) -> Self {
317        let offset = 0;
318        let space_waker = AtomicWaker::default();
319        let packet_waker = AtomicWaker::default();
320        Self { buffer, offset, space_waker, packet_waker }
321    }
322
323    /// Returns true if the packet has data in it
324    pub fn has_data(&self) -> bool {
325        self.offset > 0
326    }
327}
328
329impl<B> UsbPacketBuilder<B>
330where
331    B: std::ops::DerefMut<Target = [u8]>,
332{
333    /// Gets the space currently available for another packet in the buffer
334    pub fn available(&self) -> usize {
335        self.buffer.len() - self.offset
336    }
337
338    /// Writes the given packet into the buffer. The packet and header must be able to fit
339    /// within the buffer provided at creation time.
340    pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
341        let packet_size = packet.size();
342        if self.available() >= packet_size {
343            packet.write_to_unchecked(&mut self.buffer[self.offset..]);
344            self.offset += packet_size;
345            self.packet_waker.wake();
346            Ok(())
347        } else {
348            Err(PacketTooBigError)
349        }
350    }
351
352    /// Takes the current usb packet, if there is one. The returned mutable slice
353    /// will be only the data written to the buffer so far, and packet writing will be reset to the
354    /// beginning of the buffer.
355    pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
356        let written = self.offset;
357        if written == 0 {
358            return None;
359        }
360        self.offset = 0;
361        self.space_waker.wake();
362        Some(&mut self.buffer[0..written])
363    }
364}
365
366pub(crate) struct UsbPacketFiller<B> {
367    current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
368    out_packet_waker: AtomicWaker,
369    filled_packet_waker: AtomicWaker,
370}
371
372impl<B> Default for UsbPacketFiller<B> {
373    fn default() -> Self {
374        let current_out_packet = Mutex::default();
375        let out_packet_waker = AtomicWaker::default();
376        let filled_packet_waker = AtomicWaker::default();
377        Self { current_out_packet, out_packet_waker, filled_packet_waker }
378    }
379}
380
381impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
382    fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
383        WaitForFillable { filler: &self, min_packet_size }
384    }
385
386    pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
387        let mut builder = self.wait_for_fillable(packet.size()).await;
388        builder.as_mut().unwrap().write_vsock_packet(packet)?;
389        self.filled_packet_waker.wake();
390        Ok(())
391    }
392
393    pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
394        let header = &mut Header::new(PacketType::Data);
395        header.set_address(&address);
396        let mut builder = self.wait_for_fillable(Header::SIZE + 1).await;
397        let builder = builder.as_mut().unwrap();
398        let writing = min(payload.len(), builder.available() - Header::SIZE);
399        header.payload_len.set(writing as u32);
400        builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
401        self.filled_packet_waker.wake();
402        writing
403    }
404
405    pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
406        let mut written = 0;
407        while written < payload.len() {
408            written += self.write_vsock_data(address, &payload[written..]).await;
409        }
410    }
411
412    /// Provides a packet builder for the state machine to write packets to. Returns a future that
413    /// will be fulfilled when there is data available to send on the packet.
414    ///
415    /// # Panics
416    ///
417    /// Panics if called while another [`Self::fill_usb_packet`] future is pending.
418    pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
419        FillUsbPacket(&self, Some(builder))
420    }
421}
422
423pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
424
425impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
426    type Output = UsbPacketBuilder<B>;
427
428    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
429        // if we're holding a `PacketBuilder` we haven't been waited on yet. Otherwise we want
430        // to return ready when there's a packet and it's got data in it.
431        if let Some(builder) = self.1.take() {
432            // if the packet we were handed for some reason already has data in it, hand it back
433            if builder.has_data() {
434                return Poll::Ready(builder);
435            }
436
437            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
438            assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
439            current_out_packet.replace(builder);
440            self.0.out_packet_waker.wake();
441            self.0.filled_packet_waker.register(cx.waker());
442            Poll::Pending
443        } else {
444            let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
445            let Some(builder) = current_out_packet.take() else {
446                panic!("Packet builder was somehow removed from connection prematurely");
447            };
448
449            if builder.has_data() {
450                self.0.filled_packet_waker.wake();
451                Poll::Ready(builder)
452            } else {
453                // if there hasn't been any data placed in the packet, put the builder back and
454                // return Pending.
455                current_out_packet.replace(builder);
456                Poll::Pending
457            }
458        }
459    }
460}
461
462pub(crate) struct WaitForFillable<'a, B> {
463    filler: &'a UsbPacketFiller<B>,
464    min_packet_size: usize,
465}
466
467impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
468    type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
469
470    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
471        let current_out_packet = self.filler.current_out_packet.lock().unwrap();
472        let Some(builder) = &*current_out_packet else {
473            self.filler.out_packet_waker.register(cx.waker());
474            return Poll::Pending;
475        };
476        if builder.available() >= self.min_packet_size {
477            Poll::Ready(current_out_packet)
478        } else {
479            self.filler.out_packet_waker.register(cx.waker());
480            Poll::Pending
481        }
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use std::sync::Arc;
488
489    use super::*;
490    use fuchsia_async::Task;
491    use futures::poll;
492
493    async fn assert_pending<F: Future>(fut: F) {
494        let fut = std::pin::pin!(fut);
495        if let Poll::Ready(_) = poll!(fut) {
496            panic!("Future was ready when it shouldn't have been");
497        }
498    }
499
500    #[fuchsia::test]
501    async fn roundtrip_packet() {
502        let payload = b"hello world!";
503        let packet = Packet {
504            payload,
505            header: &Header {
506                device_cid: 1.into(),
507                host_cid: 2.into(),
508                device_port: 3.into(),
509                host_port: 4.into(),
510                payload_len: little_endian::U32::from(payload.len() as u32),
511                ..Header::new(PacketType::Data)
512            },
513        };
514        let buffer = vec![0; packet.size()];
515        let builder = UsbPacketBuilder::new(buffer);
516        let filler = UsbPacketFiller::default();
517        let mut filled_fut = filler.fill_usb_packet(builder);
518        println!("we should not be ready to pull a usb packet off yet");
519        assert_pending(&mut filled_fut).await;
520
521        println!("we should be able to write a packet though ({} bytes)", packet.size());
522        filler.write_vsock_packet(&packet).await.unwrap();
523
524        println!("we shouldn't have any space for another packet now");
525        assert_pending(filler.wait_for_fillable(1)).await;
526
527        println!("but we should have a new usb packet available");
528        let mut builder = filled_fut.await;
529        let buffer = builder.take_usb_packet().unwrap();
530
531        println!("the packet we get back out should be the same one we put in");
532        let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
533        assert_eq!(packet, read_packet);
534        assert!(remain.is_empty());
535    }
536
537    #[fuchsia::test]
538    async fn many_packets() {
539        fn make_numbered_packet(num: u32) -> (Header, String) {
540            let payload = format!("packet #{num}!");
541            let header = Header {
542                device_cid: num.into(),
543                device_port: num.into(),
544                host_cid: num.into(),
545                host_port: num.into(),
546                payload_len: little_endian::U32::from(payload.len() as u32),
547                ..Header::new(PacketType::Data)
548            };
549            (header, payload)
550        }
551        const BUFFER_SIZE: usize = 256;
552        let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
553        let filler = Arc::new(UsbPacketFiller::default());
554
555        let send_filler = filler.clone();
556        let send_task = Task::spawn(async move {
557            for packet_num in 0..1024 {
558                let next_packet = make_numbered_packet(packet_num);
559                let next_packet =
560                    Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
561                send_filler.write_vsock_packet(&next_packet).await.unwrap();
562            }
563        });
564
565        let mut read_packet_num = 0;
566        while read_packet_num < 1024 {
567            builder = filler.fill_usb_packet(builder).await;
568            let buffer = builder.take_usb_packet().unwrap();
569            let mut num_packets = 0;
570            for packet in VsockPacketIterator::new(&buffer) {
571                let packet_compare = make_numbered_packet(read_packet_num);
572                let packet_compare =
573                    Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
574                assert_eq!(packet.unwrap(), packet_compare);
575                read_packet_num += 1;
576                num_packets += 1;
577            }
578            println!(
579                "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
580                count = BUFFER_SIZE - buffer.len()
581            );
582        }
583        send_task.await;
584        assert_eq!(1024, read_packet_num);
585    }
586
587    #[fuchsia::test]
588    async fn packet_fillable_futures() {
589        let filler = UsbPacketFiller::default();
590
591        for _ in 0..10 {
592            println!("register an interest in filling a usb packet");
593            let mut fillable_fut = filler.wait_for_fillable(1);
594            println!("make sure we have nothing to fill");
595            assert!(poll!(&mut fillable_fut).is_pending());
596
597            println!("register a packet for filling");
598            let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
599            println!("make sure we've registered the buffer");
600            assert!(poll!(&mut filled_fut).is_pending());
601
602            println!("now put some things in the packet");
603            let header = &mut Header::new(PacketType::Data);
604            header.payload_len.set(99);
605            let Poll::Ready(mut builder) = poll!(fillable_fut) else {
606                panic!("should have been ready to fill a packet")
607            };
608            builder
609                .as_mut()
610                .unwrap()
611                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
612                .unwrap();
613            drop(builder);
614            let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
615                panic!("should have been ready to fill a packet(2)")
616            };
617            builder
618                .as_mut()
619                .unwrap()
620                .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
621                .unwrap();
622            drop(builder);
623
624            println!("but if we ask for too much space we'll get pending");
625            assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
626
627            println!("and now resolve the filled future and get our data back");
628            let mut filled = filled_fut.await;
629            let packets =
630                Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
631            assert_eq!(packets.len(), 2);
632        }
633    }
634}