1use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18#[repr(u8)]
21#[derive(
22 Debug,
23 TryFromBytes,
24 IntoBytes,
25 KnownLayout,
26 Immutable,
27 Unaligned,
28 PartialEq,
29 Eq,
30 PartialOrd,
31 Ord,
32 Hash,
33 Clone,
34 Copy,
35)]
36pub enum PacketType {
37 Sync = b'S',
43 Echo = b'E',
46 EchoReply = b'e',
49 Connect = b'C',
51 Finish = b'F',
55 Reset = b'R',
60 Accept = b'A',
63 Data = b'D',
68 Pause = b'X',
73}
74
75#[repr(C, packed(1))]
78#[derive(
79 Debug,
80 TryFromBytes,
81 IntoBytes,
82 KnownLayout,
83 Immutable,
84 Unaligned,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Hash,
90 Clone,
91)]
92pub struct Header {
93 magic: [u8; 3],
94 pub packet_type: PacketType,
96 pub device_cid: little_endian::U32,
101 pub host_cid: little_endian::U32,
106 pub device_port: little_endian::U32,
111 pub host_port: little_endian::U32,
116 pub payload_len: little_endian::U32,
119}
120
121impl Header {
122 pub const SIZE: usize = size_of::<Self>();
124 const MAGIC: &'static [u8; 3] = b"ffx";
125
126 pub fn new(packet_type: PacketType) -> Self {
129 let device_cid = 0.into();
130 let host_cid = 0.into();
131 let device_port = 0.into();
132 let host_port = 0.into();
133 let payload_len = 0.into();
134 Header {
135 magic: *Self::MAGIC,
136 packet_type,
137 device_cid,
138 host_cid,
139 device_port,
140 host_port,
141 payload_len,
142 }
143 }
144
145 pub fn packet_size(&self) -> usize {
148 Packet::size_with_payload(self.payload_len.get() as usize)
149 }
150
151 pub fn set_address(&mut self, addr: &Address) {
153 self.device_cid.set(addr.device_cid);
154 self.host_cid.set(addr.host_cid);
155 self.device_port.set(addr.device_port);
156 self.host_port.set(addr.host_port);
157 }
158}
159
160#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
162pub struct Packet<'a> {
163 pub header: &'a Header,
165 pub payload: &'a [u8],
167}
168
169impl<'a> Packet<'a> {
170 pub fn size(&self) -> usize {
173 self.header.packet_size()
174 }
175
176 fn size_with_payload(payload_size: usize) -> usize {
177 size_of::<Header>() + payload_size
178 }
179
180 fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
181 let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
183 return Err(std::io::Error::other("insufficient data for last packet"));
184 };
185 let header = Header::try_ref_from_bytes(header).map_err(|err| {
186 std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
187 })?;
188 if header.magic != *Header::MAGIC {
189 return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
190 }
191 let payload_len = Into::<u64>::into(header.payload_len) as usize;
193 let body_len = body.len();
194 if payload_len > body_len {
195 return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
196 }
197
198 let (payload, remain) = body.split_at(payload_len);
199 Ok((Packet { header, payload }, remain))
200 }
201
202 pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
211 let (packet, remain) = buf.split_at_mut(self.size());
212 self.header.write_to_prefix(packet).unwrap();
213 self.payload.write_to_suffix(packet).unwrap();
214 remain
215 }
216}
217
218#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
220pub struct PacketMut<'a> {
221 pub header: &'a mut Header,
223 pub payload: &'a mut [u8],
225}
226
227impl<'a> PacketMut<'a> {
228 pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
240 Header::new(packet_type)
241 .write_to_prefix(buf)
242 .expect("not enough room in buffer for packet header");
243 let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
244 let header = Header::try_mut_from_bytes(header_bytes).unwrap();
245 PacketMut { header, payload }
246 }
247
248 pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
251 if payload_len <= self.payload.len() {
252 self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
253 Ok(Header::SIZE + payload_len)
254 } else {
255 Err(PacketTooBigError)
256 }
257 }
258}
259
260pub struct VsockPacketIterator<'a> {
262 buf: Option<&'a [u8]>,
263}
264
265impl<'a> VsockPacketIterator<'a> {
266 pub fn new(buf: &'a [u8]) -> Self {
269 Self { buf: Some(buf) }
270 }
271}
272
273impl<'a> FusedIterator for VsockPacketIterator<'a> {}
274impl<'a> Iterator for VsockPacketIterator<'a> {
275 type Item = Result<Packet<'a>, std::io::Error>;
276
277 fn next(&mut self) -> Option<Self::Item> {
278 let data = self.buf.take()?;
280
281 if data.len() == 0 {
283 return None;
284 }
285
286 match Packet::parse_next(data) {
287 Ok((header, rest)) => {
288 self.buf = Some(rest);
290 Some(Ok(header))
291 }
292 Err(err) => Some(Err(err)),
293 }
294 }
295}
296
297pub struct UsbPacketBuilder<B> {
300 buffer: B,
301 offset: usize,
302 space_waker: AtomicWaker,
303 packet_waker: AtomicWaker,
304}
305
306#[derive(Debug, Copy, Clone)]
308pub struct PacketTooBigError;
309
310impl<B> UsbPacketBuilder<B> {
311 pub fn new(buffer: B) -> Self {
315 let offset = 0;
316 let space_waker = AtomicWaker::default();
317 let packet_waker = AtomicWaker::default();
318 Self { buffer, offset, space_waker, packet_waker }
319 }
320
321 pub fn has_data(&self) -> bool {
323 self.offset > 0
324 }
325}
326
327impl<B> UsbPacketBuilder<B>
328where
329 B: std::ops::DerefMut<Target = [u8]>,
330{
331 pub fn available(&self) -> usize {
333 self.buffer.len() - self.offset
334 }
335
336 pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
339 let packet_size = packet.size();
340 if self.available() >= packet_size {
341 packet.write_to_unchecked(&mut self.buffer[self.offset..]);
342 self.offset += packet_size;
343 self.packet_waker.wake();
344 Ok(())
345 } else {
346 Err(PacketTooBigError)
347 }
348 }
349
350 pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
354 let written = self.offset;
355 if written == 0 {
356 return None;
357 }
358 self.offset = 0;
359 self.space_waker.wake();
360 Some(&mut self.buffer[0..written])
361 }
362}
363
364pub(crate) struct UsbPacketFiller<B> {
365 current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
366 out_packet_waker: AtomicWaker,
367 filled_packet_waker: AtomicWaker,
368}
369
370impl<B> Default for UsbPacketFiller<B> {
371 fn default() -> Self {
372 let current_out_packet = Mutex::default();
373 let out_packet_waker = AtomicWaker::default();
374 let filled_packet_waker = AtomicWaker::default();
375 Self { current_out_packet, out_packet_waker, filled_packet_waker }
376 }
377}
378
379impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
380 fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
381 WaitForFillable { filler: &self, min_packet_size }
382 }
383
384 pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
385 let mut builder = self.wait_for_fillable(packet.size()).await;
386 builder.as_mut().unwrap().write_vsock_packet(packet)?;
387 self.filled_packet_waker.wake();
388 Ok(())
389 }
390
391 pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
392 let header = &mut Header::new(PacketType::Data);
393 header.set_address(&address);
394 let mut builder = self.wait_for_fillable(1).await;
395 let builder = builder.as_mut().unwrap();
396 let writing = min(payload.len(), builder.available() - Header::SIZE);
397 header.payload_len.set(writing as u32);
398 builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
399 self.filled_packet_waker.wake();
400 writing
401 }
402
403 pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
404 let mut written = 0;
405 while written < payload.len() {
406 written += self.write_vsock_data(address, &payload[written..]).await;
407 }
408 }
409
410 pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
417 FillUsbPacket(&self, Some(builder))
418 }
419}
420
421pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
422
423impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
424 type Output = UsbPacketBuilder<B>;
425
426 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
427 if let Some(builder) = self.1.take() {
430 if builder.has_data() {
432 return Poll::Ready(builder);
433 }
434
435 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
436 assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
437 current_out_packet.replace(builder);
438 self.0.out_packet_waker.wake();
439 self.0.filled_packet_waker.register(cx.waker());
440 Poll::Pending
441 } else {
442 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
443 let Some(builder) = current_out_packet.take() else {
444 panic!("Packet builder was somehow removed from connection prematurely");
445 };
446
447 if builder.has_data() {
448 self.0.filled_packet_waker.wake();
449 Poll::Ready(builder)
450 } else {
451 current_out_packet.replace(builder);
454 Poll::Pending
455 }
456 }
457 }
458}
459
460pub(crate) struct WaitForFillable<'a, B> {
461 filler: &'a UsbPacketFiller<B>,
462 min_packet_size: usize,
463}
464
465impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
466 type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
467
468 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469 let current_out_packet = self.filler.current_out_packet.lock().unwrap();
470 let Some(builder) = &*current_out_packet else {
471 self.filler.out_packet_waker.register(cx.waker());
472 return Poll::Pending;
473 };
474 if builder.available() >= self.min_packet_size {
475 Poll::Ready(current_out_packet)
476 } else {
477 self.filler.out_packet_waker.register(cx.waker());
478 Poll::Pending
479 }
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use std::sync::Arc;
486
487 use super::*;
488 use fuchsia_async::Task;
489 use futures::poll;
490
491 async fn assert_pending<F: Future>(fut: F) {
492 let fut = std::pin::pin!(fut);
493 if let Poll::Ready(_) = poll!(fut) {
494 panic!("Future was ready when it shouldn't have been");
495 }
496 }
497
498 #[fuchsia::test]
499 async fn roundtrip_packet() {
500 let payload = b"hello world!";
501 let packet = Packet {
502 payload,
503 header: &Header {
504 device_cid: 1.into(),
505 host_cid: 2.into(),
506 device_port: 3.into(),
507 host_port: 4.into(),
508 payload_len: little_endian::U32::from(payload.len() as u32),
509 ..Header::new(PacketType::Data)
510 },
511 };
512 let buffer = vec![0; packet.size()];
513 let builder = UsbPacketBuilder::new(buffer);
514 let filler = UsbPacketFiller::default();
515 let mut filled_fut = filler.fill_usb_packet(builder);
516 println!("we should not be ready to pull a usb packet off yet");
517 assert_pending(&mut filled_fut).await;
518
519 println!("we should be able to write a packet though ({} bytes)", packet.size());
520 filler.write_vsock_packet(&packet).await.unwrap();
521
522 println!("we shouldn't have any space for another packet now");
523 assert_pending(filler.wait_for_fillable(1)).await;
524
525 println!("but we should have a new usb packet available");
526 let mut builder = filled_fut.await;
527 let buffer = builder.take_usb_packet().unwrap();
528
529 println!("the packet we get back out should be the same one we put in");
530 let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
531 assert_eq!(packet, read_packet);
532 assert!(remain.is_empty());
533 }
534
535 #[fuchsia::test]
536 async fn many_packets() {
537 fn make_numbered_packet(num: u32) -> (Header, String) {
538 let payload = format!("packet #{num}!");
539 let header = Header {
540 device_cid: num.into(),
541 device_port: num.into(),
542 host_cid: num.into(),
543 host_port: num.into(),
544 payload_len: little_endian::U32::from(payload.len() as u32),
545 ..Header::new(PacketType::Data)
546 };
547 (header, payload)
548 }
549 const BUFFER_SIZE: usize = 256;
550 let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
551 let filler = Arc::new(UsbPacketFiller::default());
552
553 let send_filler = filler.clone();
554 let send_task = Task::spawn(async move {
555 for packet_num in 0..1024 {
556 let next_packet = make_numbered_packet(packet_num);
557 let next_packet =
558 Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
559 send_filler.write_vsock_packet(&next_packet).await.unwrap();
560 }
561 });
562
563 let mut read_packet_num = 0;
564 while read_packet_num < 1024 {
565 builder = filler.fill_usb_packet(builder).await;
566 let buffer = builder.take_usb_packet().unwrap();
567 let mut num_packets = 0;
568 for packet in VsockPacketIterator::new(&buffer) {
569 let packet_compare = make_numbered_packet(read_packet_num);
570 let packet_compare =
571 Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
572 assert_eq!(packet.unwrap(), packet_compare);
573 read_packet_num += 1;
574 num_packets += 1;
575 }
576 println!(
577 "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
578 count = BUFFER_SIZE - buffer.len()
579 );
580 }
581 send_task.await;
582 assert_eq!(1024, read_packet_num);
583 }
584
585 #[fuchsia::test]
586 async fn packet_fillable_futures() {
587 let filler = UsbPacketFiller::default();
588
589 for _ in 0..10 {
590 println!("register an interest in filling a usb packet");
591 let mut fillable_fut = filler.wait_for_fillable(1);
592 println!("make sure we have nothing to fill");
593 assert!(poll!(&mut fillable_fut).is_pending());
594
595 println!("register a packet for filling");
596 let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
597 println!("make sure we've registered the buffer");
598 assert!(poll!(&mut filled_fut).is_pending());
599
600 println!("now put some things in the packet");
601 let header = &mut Header::new(PacketType::Data);
602 header.payload_len.set(99);
603 let Poll::Ready(mut builder) = poll!(fillable_fut) else {
604 panic!("should have been ready to fill a packet")
605 };
606 builder
607 .as_mut()
608 .unwrap()
609 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
610 .unwrap();
611 drop(builder);
612 let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
613 panic!("should have been ready to fill a packet(2)")
614 };
615 builder
616 .as_mut()
617 .unwrap()
618 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
619 .unwrap();
620 drop(builder);
621
622 println!("but if we ask for too much space we'll get pending");
623 assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
624
625 println!("and now resolve the filled future and get our data back");
626 let mut filled = filled_fut.await;
627 let packets =
628 Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
629 assert_eq!(packets.len(), 2);
630 }
631 }
632}