1use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{little_endian, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned};
15
16use crate::Address;
17
18#[repr(u8)]
21#[derive(
22 Debug,
23 TryFromBytes,
24 IntoBytes,
25 KnownLayout,
26 Immutable,
27 Unaligned,
28 PartialEq,
29 Eq,
30 PartialOrd,
31 Ord,
32 Hash,
33 Clone,
34 Copy,
35)]
36pub enum PacketType {
37 Sync = b'S',
43 Connect = b'C',
45 Finish = b'F',
49 Reset = b'R',
54 Accept = b'A',
57 Data = b'D',
62}
63
64#[repr(C, packed(1))]
67#[derive(
68 Debug,
69 TryFromBytes,
70 IntoBytes,
71 KnownLayout,
72 Immutable,
73 Unaligned,
74 PartialEq,
75 Eq,
76 PartialOrd,
77 Ord,
78 Hash,
79 Clone,
80)]
81pub struct Header {
82 magic: [u8; 3],
83 pub packet_type: PacketType,
85 pub device_cid: little_endian::U32,
90 pub host_cid: little_endian::U32,
95 pub device_port: little_endian::U32,
100 pub host_port: little_endian::U32,
105 pub payload_len: little_endian::U32,
108}
109
110impl Header {
111 pub const SIZE: usize = size_of::<Self>();
113 const MAGIC: &'static [u8; 3] = b"ffx";
114
115 pub fn new(packet_type: PacketType) -> Self {
118 let device_cid = 0.into();
119 let host_cid = 0.into();
120 let device_port = 0.into();
121 let host_port = 0.into();
122 let payload_len = 0.into();
123 Header {
124 magic: *Self::MAGIC,
125 packet_type,
126 device_cid,
127 host_cid,
128 device_port,
129 host_port,
130 payload_len,
131 }
132 }
133
134 pub fn packet_size(&self) -> usize {
137 Packet::size_with_payload(self.payload_len.get() as usize)
138 }
139
140 pub fn set_address(&mut self, addr: &Address) {
142 self.device_cid.set(addr.device_cid);
143 self.host_cid.set(addr.host_cid);
144 self.device_port.set(addr.device_port);
145 self.host_port.set(addr.host_port);
146 }
147}
148
149#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
151pub struct Packet<'a> {
152 pub header: &'a Header,
154 pub payload: &'a [u8],
156}
157
158impl<'a> Packet<'a> {
159 pub fn size(&self) -> usize {
162 self.header.packet_size()
163 }
164
165 fn size_with_payload(payload_size: usize) -> usize {
166 size_of::<Header>() + payload_size
167 }
168
169 fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
170 let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
172 return Err(std::io::Error::other("insufficient data for last packet"));
173 };
174 let header = Header::try_ref_from_bytes(header).map_err(|err| {
175 std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
176 })?;
177 if header.magic != *Header::MAGIC {
178 return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
179 }
180 let payload_len = Into::<u64>::into(header.payload_len) as usize;
182 let body_len = body.len();
183 if payload_len > body_len {
184 return Err(std::io::Error::other(format!("payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}")));
185 }
186
187 let (payload, remain) = body.split_at(payload_len);
188 Ok((Packet { header, payload }, remain))
189 }
190
191 pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
200 let (packet, remain) = buf.split_at_mut(self.size());
201 self.header.write_to_prefix(packet).unwrap();
202 self.payload.write_to_suffix(packet).unwrap();
203 remain
204 }
205}
206
207#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
209pub struct PacketMut<'a> {
210 pub header: &'a mut Header,
212 pub payload: &'a mut [u8],
214}
215
216impl<'a> PacketMut<'a> {
217 pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
229 Header::new(packet_type)
230 .write_to_prefix(buf)
231 .expect("not enough room in buffer for packet header");
232 let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
233 let header = Header::try_mut_from_bytes(header_bytes).unwrap();
234 PacketMut { header, payload }
235 }
236
237 pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
240 if payload_len <= self.payload.len() {
241 self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
242 Ok(Header::SIZE + payload_len)
243 } else {
244 Err(PacketTooBigError)
245 }
246 }
247}
248
249pub struct VsockPacketIterator<'a> {
251 buf: Option<&'a [u8]>,
252}
253
254impl<'a> VsockPacketIterator<'a> {
255 pub fn new(buf: &'a [u8]) -> Self {
258 Self { buf: Some(buf) }
259 }
260}
261
262impl<'a> FusedIterator for VsockPacketIterator<'a> {}
263impl<'a> Iterator for VsockPacketIterator<'a> {
264 type Item = Result<Packet<'a>, std::io::Error>;
265
266 fn next(&mut self) -> Option<Self::Item> {
267 let data = self.buf.take()?;
269
270 if data.len() == 0 {
272 return None;
273 }
274
275 match Packet::parse_next(data) {
276 Ok((header, rest)) => {
277 self.buf = Some(rest);
279 Some(Ok(header))
280 }
281 Err(err) => Some(Err(err)),
282 }
283 }
284}
285
286pub struct UsbPacketBuilder<B> {
289 buffer: B,
290 offset: usize,
291 space_waker: AtomicWaker,
292 packet_waker: AtomicWaker,
293}
294
295#[derive(Debug, Copy, Clone)]
297pub struct PacketTooBigError;
298
299impl<B> UsbPacketBuilder<B> {
300 pub fn new(buffer: B) -> Self {
304 let offset = 0;
305 let space_waker = AtomicWaker::default();
306 let packet_waker = AtomicWaker::default();
307 Self { buffer, offset, space_waker, packet_waker }
308 }
309
310 pub fn has_data(&self) -> bool {
312 self.offset > 0
313 }
314}
315
316impl<B> UsbPacketBuilder<B>
317where
318 B: std::ops::DerefMut<Target = [u8]>,
319{
320 pub fn available(&self) -> usize {
322 self.buffer.len() - self.offset
323 }
324
325 pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
328 let packet_size = packet.size();
329 if self.available() >= packet_size {
330 packet.write_to_unchecked(&mut self.buffer[self.offset..]);
331 self.offset += packet_size;
332 self.packet_waker.wake();
333 Ok(())
334 } else {
335 Err(PacketTooBigError)
336 }
337 }
338
339 pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
343 let written = self.offset;
344 if written == 0 {
345 return None;
346 }
347 self.offset = 0;
348 self.space_waker.wake();
349 Some(&mut self.buffer[0..written])
350 }
351}
352
353pub(crate) struct UsbPacketFiller<B> {
354 current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
355 out_packet_waker: AtomicWaker,
356 filled_packet_waker: AtomicWaker,
357}
358
359impl<B> Default for UsbPacketFiller<B> {
360 fn default() -> Self {
361 let current_out_packet = Mutex::default();
362 let out_packet_waker = AtomicWaker::default();
363 let filled_packet_waker = AtomicWaker::default();
364 Self { current_out_packet, out_packet_waker, filled_packet_waker }
365 }
366}
367
368impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
369 fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
370 WaitForFillable { filler: &self, min_packet_size }
371 }
372
373 pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
374 let mut builder = self.wait_for_fillable(packet.size()).await;
375 builder.as_mut().unwrap().write_vsock_packet(packet)?;
376 self.filled_packet_waker.wake();
377 Ok(())
378 }
379
380 pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
381 let header = &mut Header::new(PacketType::Data);
382 header.set_address(&address);
383 let mut builder = self.wait_for_fillable(1).await;
384 let builder = builder.as_mut().unwrap();
385 let writing = min(payload.len(), builder.available() - Header::SIZE);
386 header.payload_len.set(writing as u32);
387 builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
388 self.filled_packet_waker.wake();
389 writing
390 }
391
392 pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
393 let mut written = 0;
394 while written < payload.len() {
395 written += self.write_vsock_data(address, &payload[written..]).await;
396 }
397 }
398
399 pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
406 FillUsbPacket(&self, Some(builder))
407 }
408}
409
410pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
411
412impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
413 type Output = UsbPacketBuilder<B>;
414
415 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
416 if let Some(builder) = self.1.take() {
419 if builder.has_data() {
421 return Poll::Ready(builder);
422 }
423
424 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
425 assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
426 current_out_packet.replace(builder);
427 self.0.out_packet_waker.wake();
428 self.0.filled_packet_waker.register(cx.waker());
429 Poll::Pending
430 } else {
431 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
432 let Some(builder) = current_out_packet.take() else {
433 panic!("Packet builder was somehow removed from connection prematurely");
434 };
435
436 if builder.has_data() {
437 self.0.filled_packet_waker.wake();
438 Poll::Ready(builder)
439 } else {
440 current_out_packet.replace(builder);
443 Poll::Pending
444 }
445 }
446 }
447}
448
449pub(crate) struct WaitForFillable<'a, B> {
450 filler: &'a UsbPacketFiller<B>,
451 min_packet_size: usize,
452}
453
454impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
455 type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
456
457 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
458 let current_out_packet = self.filler.current_out_packet.lock().unwrap();
459 let Some(builder) = &*current_out_packet else {
460 self.filler.out_packet_waker.register(cx.waker());
461 return Poll::Pending;
462 };
463 if builder.available() >= self.min_packet_size {
464 Poll::Ready(current_out_packet)
465 } else {
466 self.filler.out_packet_waker.register(cx.waker());
467 Poll::Pending
468 }
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use std::sync::Arc;
475
476 use super::*;
477 use fuchsia_async::Task;
478 use futures::poll;
479
480 async fn assert_pending<F: Future>(fut: F) {
481 let fut = std::pin::pin!(fut);
482 if let Poll::Ready(_) = poll!(fut) {
483 panic!("Future was ready when it shouldn't have been");
484 }
485 }
486
487 #[fuchsia::test]
488 async fn roundtrip_packet() {
489 let payload = b"hello world!";
490 let packet = Packet {
491 payload,
492 header: &Header {
493 device_cid: 1.into(),
494 host_cid: 2.into(),
495 device_port: 3.into(),
496 host_port: 4.into(),
497 payload_len: little_endian::U32::from(payload.len() as u32),
498 ..Header::new(PacketType::Data)
499 },
500 };
501 let buffer = vec![0; packet.size()];
502 let builder = UsbPacketBuilder::new(buffer);
503 let filler = UsbPacketFiller::default();
504 let mut filled_fut = filler.fill_usb_packet(builder);
505 println!("we should not be ready to pull a usb packet off yet");
506 assert_pending(&mut filled_fut).await;
507
508 println!("we should be able to write a packet though ({} bytes)", packet.size());
509 filler.write_vsock_packet(&packet).await.unwrap();
510
511 println!("we shouldn't have any space for another packet now");
512 assert_pending(filler.wait_for_fillable(1)).await;
513
514 println!("but we should have a new usb packet available");
515 let mut builder = filled_fut.await;
516 let buffer = builder.take_usb_packet().unwrap();
517
518 println!("the packet we get back out should be the same one we put in");
519 let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
520 assert_eq!(packet, read_packet);
521 assert!(remain.is_empty());
522 }
523
524 #[fuchsia::test]
525 async fn many_packets() {
526 fn make_numbered_packet(num: u32) -> (Header, String) {
527 let payload = format!("packet #{num}!");
528 let header = Header {
529 device_cid: num.into(),
530 device_port: num.into(),
531 host_cid: num.into(),
532 host_port: num.into(),
533 payload_len: little_endian::U32::from(payload.len() as u32),
534 ..Header::new(PacketType::Data)
535 };
536 (header, payload)
537 }
538 const BUFFER_SIZE: usize = 256;
539 let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
540 let filler = Arc::new(UsbPacketFiller::default());
541
542 let send_filler = filler.clone();
543 let send_task = Task::spawn(async move {
544 for packet_num in 0..1024 {
545 let next_packet = make_numbered_packet(packet_num);
546 let next_packet =
547 Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
548 send_filler.write_vsock_packet(&next_packet).await.unwrap();
549 }
550 });
551
552 let mut read_packet_num = 0;
553 while read_packet_num < 1024 {
554 builder = filler.fill_usb_packet(builder).await;
555 let buffer = builder.take_usb_packet().unwrap();
556 let mut num_packets = 0;
557 for packet in VsockPacketIterator::new(&buffer) {
558 let packet_compare = make_numbered_packet(read_packet_num);
559 let packet_compare =
560 Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
561 assert_eq!(packet.unwrap(), packet_compare);
562 read_packet_num += 1;
563 num_packets += 1;
564 }
565 println!(
566 "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
567 count = BUFFER_SIZE - buffer.len()
568 );
569 }
570 send_task.await;
571 assert_eq!(1024, read_packet_num);
572 }
573
574 #[fuchsia::test]
575 async fn packet_fillable_futures() {
576 let filler = UsbPacketFiller::default();
577
578 for _ in 0..10 {
579 println!("register an interest in filling a usb packet");
580 let mut fillable_fut = filler.wait_for_fillable(1);
581 println!("make sure we have nothing to fill");
582 assert!(poll!(&mut fillable_fut).is_pending());
583
584 println!("register a packet for filling");
585 let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
586 println!("make sure we've registered the buffer");
587 assert!(poll!(&mut filled_fut).is_pending());
588
589 println!("now put some things in the packet");
590 let header = &mut Header::new(PacketType::Data);
591 header.payload_len.set(99);
592 let Poll::Ready(mut builder) = poll!(fillable_fut) else {
593 panic!("should have been ready to fill a packet")
594 };
595 builder
596 .as_mut()
597 .unwrap()
598 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
599 .unwrap();
600 drop(builder);
601 let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
602 panic!("should have been ready to fill a packet(2)")
603 };
604 builder
605 .as_mut()
606 .unwrap()
607 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
608 .unwrap();
609 drop(builder);
610
611 println!("but if we ask for too much space we'll get pending");
612 assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
613
614 println!("and now resolve the filled future and get our data back");
615 let mut filled = filled_fut.await;
616 let packets =
617 Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
618 assert_eq!(packets.len(), 2);
619 }
620 }
621}