1use std::cmp::min;
6use std::future::Future;
7use std::iter::FusedIterator;
8use std::ops::DerefMut;
9use std::pin::Pin;
10use std::sync::{Mutex, MutexGuard};
11use std::task::{Context, Poll};
12
13use futures::task::AtomicWaker;
14use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned, little_endian};
15
16use crate::Address;
17
18#[repr(u8)]
21#[derive(
22 Debug,
23 TryFromBytes,
24 IntoBytes,
25 KnownLayout,
26 Immutable,
27 Unaligned,
28 PartialEq,
29 Eq,
30 PartialOrd,
31 Ord,
32 Hash,
33 Clone,
34 Copy,
35)]
36pub enum PacketType {
37 Sync = b'S',
43 Echo = b'E',
46 EchoReply = b'e',
49 Connect = b'C',
51 Finish = b'F',
55 Reset = b'R',
60 Accept = b'A',
63 Data = b'D',
68 Pause = b'X',
73}
74
75#[repr(C, packed(1))]
78#[derive(
79 Debug,
80 TryFromBytes,
81 IntoBytes,
82 KnownLayout,
83 Immutable,
84 Unaligned,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Hash,
90 Clone,
91)]
92pub struct Header {
93 magic: [u8; 3],
94 pub packet_type: PacketType,
96 pub device_cid: little_endian::U32,
101 pub host_cid: little_endian::U32,
106 pub device_port: little_endian::U32,
111 pub host_port: little_endian::U32,
116 pub payload_len: little_endian::U32,
119}
120
121impl Header {
122 pub const SIZE: usize = size_of::<Self>();
124 const MAGIC: &'static [u8; 3] = b"ffx";
125
126 pub fn new(packet_type: PacketType) -> Self {
129 let device_cid = 0.into();
130 let host_cid = 0.into();
131 let device_port = 0.into();
132 let host_port = 0.into();
133 let payload_len = 0.into();
134 Header {
135 magic: *Self::MAGIC,
136 packet_type,
137 device_cid,
138 host_cid,
139 device_port,
140 host_port,
141 payload_len,
142 }
143 }
144
145 pub fn packet_size(&self) -> usize {
148 Packet::size_with_payload(self.payload_len.get() as usize)
149 }
150
151 pub fn set_address(&mut self, addr: &Address) {
153 self.device_cid.set(addr.device_cid);
154 self.host_cid.set(addr.host_cid);
155 self.device_port.set(addr.device_port);
156 self.host_port.set(addr.host_port);
157 }
158}
159
160#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
162pub struct Packet<'a> {
163 pub header: &'a Header,
165 pub payload: &'a [u8],
167}
168
169impl<'a> Packet<'a> {
170 pub fn size(&self) -> usize {
173 self.header.packet_size()
174 }
175
176 fn size_with_payload(payload_size: usize) -> usize {
177 size_of::<Header>() + payload_size
178 }
179
180 fn parse_next(buf: &'a [u8]) -> Result<(Self, &'a [u8]), std::io::Error> {
181 let Some((header, body)) = buf.split_at_checked(size_of::<Header>()) else {
183 return Err(std::io::Error::other("insufficient data for last packet"));
184 };
185 let header = Header::try_ref_from_bytes(header).map_err(|err| {
186 std::io::Error::other(format!("failed to parse usb vsock header: {err:?}"))
187 })?;
188 if header.magic != *Header::MAGIC {
189 return Err(std::io::Error::other(format!("invalid magic bytes on usb vsock header")));
190 }
191 let payload_len = Into::<u64>::into(header.payload_len) as usize;
193 let body_len = body.len();
194 if payload_len > body_len {
195 return Err(std::io::Error::other(format!(
196 "payload length on usb vsock header ({payload_len}) was larger than available in buffer {body_len}"
197 )));
198 }
199
200 let (payload, remain) = body.split_at(payload_len);
201 Ok((Packet { header, payload }, remain))
202 }
203
204 pub fn write_to_unchecked(&'a self, buf: &'a mut [u8]) -> &'a mut [u8] {
213 let (packet, remain) = buf.split_at_mut(self.size());
214 self.header.write_to_prefix(packet).unwrap();
215 self.payload.write_to_suffix(packet).unwrap();
216 remain
217 }
218}
219
220#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)]
222pub struct PacketMut<'a> {
223 pub header: &'a mut Header,
225 pub payload: &'a mut [u8],
227}
228
229impl<'a> PacketMut<'a> {
230 pub fn new_in(packet_type: PacketType, buf: &'a mut [u8]) -> Self {
242 Header::new(packet_type)
243 .write_to_prefix(buf)
244 .expect("not enough room in buffer for packet header");
245 let (header_bytes, payload) = buf.split_at_mut(Header::SIZE);
246 let header = Header::try_mut_from_bytes(header_bytes).unwrap();
247 PacketMut { header, payload }
248 }
249
250 pub fn finish(self, payload_len: usize) -> Result<usize, PacketTooBigError> {
253 if payload_len <= self.payload.len() {
254 self.header.payload_len.set(u32::try_from(payload_len).map_err(|_| PacketTooBigError)?);
255 Ok(Header::SIZE + payload_len)
256 } else {
257 Err(PacketTooBigError)
258 }
259 }
260}
261
262pub struct VsockPacketIterator<'a> {
264 buf: Option<&'a [u8]>,
265}
266
267impl<'a> VsockPacketIterator<'a> {
268 pub fn new(buf: &'a [u8]) -> Self {
271 Self { buf: Some(buf) }
272 }
273}
274
275impl<'a> FusedIterator for VsockPacketIterator<'a> {}
276impl<'a> Iterator for VsockPacketIterator<'a> {
277 type Item = Result<Packet<'a>, std::io::Error>;
278
279 fn next(&mut self) -> Option<Self::Item> {
280 let data = self.buf.take()?;
282
283 if data.len() == 0 {
285 return None;
286 }
287
288 match Packet::parse_next(data) {
289 Ok((header, rest)) => {
290 self.buf = Some(rest);
292 Some(Ok(header))
293 }
294 Err(err) => Some(Err(err)),
295 }
296 }
297}
298
299pub struct UsbPacketBuilder<B> {
302 buffer: B,
303 offset: usize,
304 space_waker: AtomicWaker,
305 packet_waker: AtomicWaker,
306}
307
308#[derive(Debug, Copy, Clone)]
310pub struct PacketTooBigError;
311
312impl<B> UsbPacketBuilder<B> {
313 pub fn new(buffer: B) -> Self {
317 let offset = 0;
318 let space_waker = AtomicWaker::default();
319 let packet_waker = AtomicWaker::default();
320 Self { buffer, offset, space_waker, packet_waker }
321 }
322
323 pub fn has_data(&self) -> bool {
325 self.offset > 0
326 }
327}
328
329impl<B> UsbPacketBuilder<B>
330where
331 B: std::ops::DerefMut<Target = [u8]>,
332{
333 pub fn available(&self) -> usize {
335 self.buffer.len() - self.offset
336 }
337
338 pub fn write_vsock_packet(&mut self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
341 let packet_size = packet.size();
342 if self.available() >= packet_size {
343 packet.write_to_unchecked(&mut self.buffer[self.offset..]);
344 self.offset += packet_size;
345 self.packet_waker.wake();
346 Ok(())
347 } else {
348 Err(PacketTooBigError)
349 }
350 }
351
352 pub fn take_usb_packet(&mut self) -> Option<&mut [u8]> {
356 let written = self.offset;
357 if written == 0 {
358 return None;
359 }
360 self.offset = 0;
361 self.space_waker.wake();
362 Some(&mut self.buffer[0..written])
363 }
364}
365
366pub(crate) struct UsbPacketFiller<B> {
367 current_out_packet: Mutex<Option<UsbPacketBuilder<B>>>,
368 out_packet_waker: AtomicWaker,
369 filled_packet_waker: AtomicWaker,
370}
371
372impl<B> Default for UsbPacketFiller<B> {
373 fn default() -> Self {
374 let current_out_packet = Mutex::default();
375 let out_packet_waker = AtomicWaker::default();
376 let filled_packet_waker = AtomicWaker::default();
377 Self { current_out_packet, out_packet_waker, filled_packet_waker }
378 }
379}
380
381impl<B: DerefMut<Target = [u8]> + Unpin> UsbPacketFiller<B> {
382 fn wait_for_fillable(&self, min_packet_size: usize) -> WaitForFillable<'_, B> {
383 WaitForFillable { filler: &self, min_packet_size }
384 }
385
386 pub async fn write_vsock_packet(&self, packet: &Packet<'_>) -> Result<(), PacketTooBigError> {
387 let mut builder = self.wait_for_fillable(packet.size()).await;
388 builder.as_mut().unwrap().write_vsock_packet(packet)?;
389 self.filled_packet_waker.wake();
390 Ok(())
391 }
392
393 pub async fn write_vsock_data(&self, address: &Address, payload: &[u8]) -> usize {
394 let header = &mut Header::new(PacketType::Data);
395 header.set_address(&address);
396 let mut builder = self.wait_for_fillable(Header::SIZE + 1).await;
397 let builder = builder.as_mut().unwrap();
398 let writing = min(payload.len(), builder.available() - Header::SIZE);
399 header.payload_len.set(writing as u32);
400 builder.write_vsock_packet(&Packet { header, payload: &payload[..writing] }).unwrap();
401 self.filled_packet_waker.wake();
402 writing
403 }
404
405 pub async fn write_vsock_data_all(&self, address: &Address, payload: &[u8]) {
406 let mut written = 0;
407 while written < payload.len() {
408 written += self.write_vsock_data(address, &payload[written..]).await;
409 }
410 }
411
412 pub fn fill_usb_packet(&self, builder: UsbPacketBuilder<B>) -> FillUsbPacket<'_, B> {
419 FillUsbPacket(&self, Some(builder))
420 }
421}
422
423pub(crate) struct FillUsbPacket<'a, B>(&'a UsbPacketFiller<B>, Option<UsbPacketBuilder<B>>);
424
425impl<'a, B: Unpin> Future for FillUsbPacket<'a, B> {
426 type Output = UsbPacketBuilder<B>;
427
428 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
429 if let Some(builder) = self.1.take() {
432 if builder.has_data() {
434 return Poll::Ready(builder);
435 }
436
437 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
438 assert!(current_out_packet.is_none(), "Can't fill more than one packet at a time");
439 current_out_packet.replace(builder);
440 self.0.out_packet_waker.wake();
441 self.0.filled_packet_waker.register(cx.waker());
442 Poll::Pending
443 } else {
444 let mut current_out_packet = self.0.current_out_packet.lock().unwrap();
445 let Some(builder) = current_out_packet.take() else {
446 panic!("Packet builder was somehow removed from connection prematurely");
447 };
448
449 if builder.has_data() {
450 self.0.filled_packet_waker.wake();
451 Poll::Ready(builder)
452 } else {
453 current_out_packet.replace(builder);
456 Poll::Pending
457 }
458 }
459 }
460}
461
462pub(crate) struct WaitForFillable<'a, B> {
463 filler: &'a UsbPacketFiller<B>,
464 min_packet_size: usize,
465}
466
467impl<'a, B: DerefMut<Target = [u8]> + Unpin> Future for WaitForFillable<'a, B> {
468 type Output = MutexGuard<'a, Option<UsbPacketBuilder<B>>>;
469
470 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
471 let current_out_packet = self.filler.current_out_packet.lock().unwrap();
472 let Some(builder) = &*current_out_packet else {
473 self.filler.out_packet_waker.register(cx.waker());
474 return Poll::Pending;
475 };
476 if builder.available() >= self.min_packet_size {
477 Poll::Ready(current_out_packet)
478 } else {
479 self.filler.out_packet_waker.register(cx.waker());
480 Poll::Pending
481 }
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use std::sync::Arc;
488
489 use super::*;
490 use fuchsia_async::Task;
491 use futures::poll;
492
493 async fn assert_pending<F: Future>(fut: F) {
494 let fut = std::pin::pin!(fut);
495 if let Poll::Ready(_) = poll!(fut) {
496 panic!("Future was ready when it shouldn't have been");
497 }
498 }
499
500 #[fuchsia::test]
501 async fn roundtrip_packet() {
502 let payload = b"hello world!";
503 let packet = Packet {
504 payload,
505 header: &Header {
506 device_cid: 1.into(),
507 host_cid: 2.into(),
508 device_port: 3.into(),
509 host_port: 4.into(),
510 payload_len: little_endian::U32::from(payload.len() as u32),
511 ..Header::new(PacketType::Data)
512 },
513 };
514 let buffer = vec![0; packet.size()];
515 let builder = UsbPacketBuilder::new(buffer);
516 let filler = UsbPacketFiller::default();
517 let mut filled_fut = filler.fill_usb_packet(builder);
518 println!("we should not be ready to pull a usb packet off yet");
519 assert_pending(&mut filled_fut).await;
520
521 println!("we should be able to write a packet though ({} bytes)", packet.size());
522 filler.write_vsock_packet(&packet).await.unwrap();
523
524 println!("we shouldn't have any space for another packet now");
525 assert_pending(filler.wait_for_fillable(1)).await;
526
527 println!("but we should have a new usb packet available");
528 let mut builder = filled_fut.await;
529 let buffer = builder.take_usb_packet().unwrap();
530
531 println!("the packet we get back out should be the same one we put in");
532 let (read_packet, remain) = Packet::parse_next(buffer).unwrap();
533 assert_eq!(packet, read_packet);
534 assert!(remain.is_empty());
535 }
536
537 #[fuchsia::test]
538 async fn many_packets() {
539 fn make_numbered_packet(num: u32) -> (Header, String) {
540 let payload = format!("packet #{num}!");
541 let header = Header {
542 device_cid: num.into(),
543 device_port: num.into(),
544 host_cid: num.into(),
545 host_port: num.into(),
546 payload_len: little_endian::U32::from(payload.len() as u32),
547 ..Header::new(PacketType::Data)
548 };
549 (header, payload)
550 }
551 const BUFFER_SIZE: usize = 256;
552 let mut builder = UsbPacketBuilder::new(vec![0; BUFFER_SIZE]);
553 let filler = Arc::new(UsbPacketFiller::default());
554
555 let send_filler = filler.clone();
556 let send_task = Task::spawn(async move {
557 for packet_num in 0..1024 {
558 let next_packet = make_numbered_packet(packet_num);
559 let next_packet =
560 Packet { header: &next_packet.0, payload: next_packet.1.as_ref() };
561 send_filler.write_vsock_packet(&next_packet).await.unwrap();
562 }
563 });
564
565 let mut read_packet_num = 0;
566 while read_packet_num < 1024 {
567 builder = filler.fill_usb_packet(builder).await;
568 let buffer = builder.take_usb_packet().unwrap();
569 let mut num_packets = 0;
570 for packet in VsockPacketIterator::new(&buffer) {
571 let packet_compare = make_numbered_packet(read_packet_num);
572 let packet_compare =
573 Packet { header: &packet_compare.0, payload: &packet_compare.1.as_ref() };
574 assert_eq!(packet.unwrap(), packet_compare);
575 read_packet_num += 1;
576 num_packets += 1;
577 }
578 println!(
579 "Read {num_packets} vsock packets from usb packet buffer, had {count} bytes left",
580 count = BUFFER_SIZE - buffer.len()
581 );
582 }
583 send_task.await;
584 assert_eq!(1024, read_packet_num);
585 }
586
587 #[fuchsia::test]
588 async fn packet_fillable_futures() {
589 let filler = UsbPacketFiller::default();
590
591 for _ in 0..10 {
592 println!("register an interest in filling a usb packet");
593 let mut fillable_fut = filler.wait_for_fillable(1);
594 println!("make sure we have nothing to fill");
595 assert!(poll!(&mut fillable_fut).is_pending());
596
597 println!("register a packet for filling");
598 let mut filled_fut = filler.fill_usb_packet(UsbPacketBuilder::new(vec![0; 1024]));
599 println!("make sure we've registered the buffer");
600 assert!(poll!(&mut filled_fut).is_pending());
601
602 println!("now put some things in the packet");
603 let header = &mut Header::new(PacketType::Data);
604 header.payload_len.set(99);
605 let Poll::Ready(mut builder) = poll!(fillable_fut) else {
606 panic!("should have been ready to fill a packet")
607 };
608 builder
609 .as_mut()
610 .unwrap()
611 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
612 .unwrap();
613 drop(builder);
614 let Poll::Ready(mut builder) = poll!(filler.wait_for_fillable(1)) else {
615 panic!("should have been ready to fill a packet(2)")
616 };
617 builder
618 .as_mut()
619 .unwrap()
620 .write_vsock_packet(&Packet { header, payload: &[b'a'; 99] })
621 .unwrap();
622 drop(builder);
623
624 println!("but if we ask for too much space we'll get pending");
625 assert!(poll!(filler.wait_for_fillable(1024 - (99 * 2) + 1)).is_pending());
626
627 println!("and now resolve the filled future and get our data back");
628 let mut filled = filled_fut.await;
629 let packets =
630 Vec::from_iter(VsockPacketIterator::new(filled.take_usb_packet().unwrap()));
631 assert_eq!(packets.len(), 2);
632 }
633 }
634}