1use crate::{
8 object_get_property, object_set_property, ok, sys, AsHandleRef, Handle, HandleBased, HandleRef,
9 Property, PropertyQuery, Status, Vmo,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(Handle);
21impl_handle_based!(Stream);
22
23bitflags! {
24 #[repr(transparent)]
25 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26 pub struct StreamOptions: u32 {
27 const MODE_READ = sys::ZX_STREAM_MODE_READ;
28 const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29 const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30 }
31}
32
33bitflags! {
34 #[repr(transparent)]
35 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36 pub struct StreamReadOptions: u32 {
37 }
38}
39
40bitflags! {
41 #[repr(transparent)]
42 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43 pub struct StreamWriteOptions: u32 {
44 const APPEND = sys::ZX_STREAM_APPEND;
45 }
46}
47
48impl Stream {
49 pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51 let mut handle = 0;
52 let status =
53 unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54 ok(status)?;
55 unsafe { Ok(Stream::from(Handle::from_raw(handle))) }
56 }
57
58 pub unsafe fn readv(
67 &self,
68 options: StreamReadOptions,
69 iovecs: &mut [sys::zx_iovec_t],
70 ) -> Result<usize, Status> {
71 let mut actual = 0;
72 let status = unsafe {
73 sys::zx_stream_readv(
74 self.raw_handle(),
75 options.bits(),
76 iovecs.as_mut_ptr(),
77 iovecs.len(),
78 &mut actual,
79 )
80 };
81 ok(status)?;
82 Ok(actual)
83 }
84
85 pub fn read_uninit(
91 &self,
92 options: StreamReadOptions,
93 buffer: &mut [MaybeUninit<u8>],
94 ) -> Result<usize, Status> {
95 let mut iovec =
97 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98 unsafe { self.readv(options, &mut iovec) }
101 }
102
103 pub fn read_to_vec(
108 &self,
109 options: StreamReadOptions,
110 length: usize,
111 ) -> Result<Vec<u8>, Status> {
112 let mut data = Vec::with_capacity(length);
113 let buffer = &mut data.spare_capacity_mut()[0..length];
114 let actual = self.read_uninit(options, buffer)?;
115 unsafe { data.set_len(actual) };
117 Ok(data)
118 }
119
120 pub unsafe fn readv_at(
129 &self,
130 options: StreamReadOptions,
131 offset: u64,
132 iovecs: &mut [sys::zx_iovec_t],
133 ) -> Result<usize, Status> {
134 let mut actual = 0;
135 let status = unsafe {
136 sys::zx_stream_readv_at(
137 self.raw_handle(),
138 options.bits(),
139 offset,
140 iovecs.as_mut_ptr(),
141 iovecs.len(),
142 &mut actual,
143 )
144 };
145 ok(status)?;
146 Ok(actual)
147 }
148
149 pub fn read_at_uninit(
156 &self,
157 options: StreamReadOptions,
158 offset: u64,
159 buffer: &mut [MaybeUninit<u8>],
160 ) -> Result<usize, Status> {
161 let mut iovec =
163 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164 unsafe { self.readv_at(options, offset, &mut iovec) }
167 }
168
169 pub fn read_at_to_vec(
175 &self,
176 options: StreamReadOptions,
177 offset: u64,
178 length: usize,
179 ) -> Result<Vec<u8>, Status> {
180 let mut data = Vec::with_capacity(length);
181 let buffer = &mut data.spare_capacity_mut()[0..length];
182 let actual = self.read_at_uninit(options, offset, buffer)?;
183 unsafe { data.set_len(actual) };
185 Ok(data)
186 }
187
188 pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190 let (whence, offset) = match pos {
191 SeekFrom::Start(start) => (
192 sys::ZX_STREAM_SEEK_ORIGIN_START,
193 start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194 ),
195 SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196 SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197 };
198 let mut pos = 0;
199 let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200 ok(status)?;
201 Ok(pos)
202 }
203
204 pub fn writev(
208 &self,
209 options: StreamWriteOptions,
210 iovecs: &[sys::zx_iovec_t],
211 ) -> Result<usize, Status> {
212 let mut actual = 0;
213 let status = unsafe {
214 sys::zx_stream_writev(
215 self.raw_handle(),
216 options.bits(),
217 iovecs.as_ptr(),
218 iovecs.len(),
219 &mut actual,
220 )
221 };
222 ok(status)?;
223 Ok(actual)
224 }
225
226 pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232 self.writev(options, &iovec)
233 }
234
235 pub fn writev_at(
239 &self,
240 options: StreamWriteOptions,
241 offset: u64,
242 iovecs: &[sys::zx_iovec_t],
243 ) -> Result<usize, Status> {
244 let mut actual = 0;
245 let status = unsafe {
246 sys::zx_stream_writev_at(
247 self.raw_handle(),
248 options.bits(),
249 offset,
250 iovecs.as_ptr(),
251 iovecs.len(),
252 &mut actual,
253 )
254 };
255 ok(status)?;
256 Ok(actual)
257 }
258
259 pub fn write_at(
264 &self,
265 options: StreamWriteOptions,
266 offset: u64,
267 buffer: &[u8],
268 ) -> Result<usize, Status> {
269 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270 self.writev_at(options, offset, &iovec)
271 }
272}
273
274unsafe_handle_properties!(object: Stream,
275 props: [
276 {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277 ]
278);
279
280impl std::io::Read for Stream {
281 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282 let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286 }
287
288 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289 let mut iovecs = unsafe {
291 std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292 };
293 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296 }
297}
298
299impl std::io::Seek for Stream {
300 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301 Ok(Self::seek(&self, pos)? as u64)
302 }
303}
304
305impl std::io::Write for Stream {
306 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307 Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308 }
309
310 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311 let iovecs = unsafe {
313 std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314 };
315 Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316 }
317
318 fn flush(&mut self) -> std::io::Result<()> {
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate as zx;
327
328 #[test]
329 fn create() {
330 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332 let stream =
333 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335 let basic_info = stream.basic_info().unwrap();
336 assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337 assert!(basic_info.rights.contains(zx::Rights::READ));
338 assert!(basic_info.rights.contains(zx::Rights::WRITE));
339 }
340
341 #[test]
342 fn create_readonly() {
343 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347 let basic_info = stream.basic_info().unwrap();
348 assert!(basic_info.rights.contains(zx::Rights::READ));
349 assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350 }
351
352 #[test]
353 fn create_offset() {
354 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356 assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357 }
358
359 #[test]
360 fn create_invalid() {
361 let result =
362 Stream::create(StreamOptions::MODE_READ, &zx::Vmo::from(zx::Handle::invalid()), 0);
363 assert_eq!(result, Err(zx::Status::BAD_HANDLE));
364 }
365
366 #[test]
367 fn create_with_mode_append() {
368 let size: u64 = zx::system_get_page_size().into();
369 let vmo = zx::Vmo::create(size).unwrap();
370 let stream =
371 Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
372 .unwrap();
373 assert_eq!(stream.get_mode_append().unwrap(), 1);
374 }
375
376 #[test]
377 fn get_and_set_mode_append() {
378 let size: u64 = zx::system_get_page_size().into();
379 let vmo = zx::Vmo::create(size).unwrap();
380 let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
381 assert_eq!(stream.get_mode_append().unwrap(), 0);
382 stream.set_mode_append(&1).unwrap();
383 assert_eq!(stream.get_mode_append().unwrap(), 1);
384 stream.set_mode_append(&0).unwrap();
385 assert_eq!(stream.get_mode_append().unwrap(), 0);
386 }
387
388 #[test]
389 fn read_uninit() {
390 const DATA: &'static [u8] = b"vmo-contents";
391 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
392 vmo.write(DATA, 0).unwrap();
393 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
394
395 let mut data = Vec::with_capacity(5);
397 let bytes_read =
398 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
399 assert_eq!(bytes_read, 5);
400 unsafe { data.set_len(5) };
401 assert_eq!(data, DATA[0..5]);
402
403 let mut data = Vec::with_capacity(10);
405 let bytes_read =
406 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
407 assert_eq!(bytes_read, 7);
408 unsafe { data.set_len(7) };
409 assert_eq!(data, DATA[5..]);
410
411 let mut data = Vec::with_capacity(10);
413 let bytes_read =
414 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
415 assert_eq!(bytes_read, 0);
416 }
417
418 #[test]
419 fn read_to_vec() {
420 const DATA: &'static [u8] = b"vmo-contents";
421 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
422 vmo.write(DATA, 0).unwrap();
423 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
424
425 let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
426 assert_eq!(data, DATA);
427 }
428
429 #[test]
430 fn read_at_uninit() {
431 const DATA: &'static [u8] = b"vmo-contents";
432 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
433 vmo.write(DATA, 0).unwrap();
434 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
435
436 let mut data = Vec::with_capacity(5);
438 let bytes_read = stream
439 .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
440 .unwrap();
441 assert_eq!(bytes_read, 5);
442 unsafe { data.set_len(5) };
443 assert_eq!(data, DATA[0..5]);
444
445 let mut data = Vec::with_capacity(10);
447 let bytes_read = stream
448 .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
449 .unwrap();
450 assert_eq!(bytes_read, 7);
451 unsafe { data.set_len(7) };
452 assert_eq!(data, DATA[5..]);
453
454 let mut data = Vec::with_capacity(10);
456 let bytes_read = stream
457 .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
458 .unwrap();
459 assert_eq!(bytes_read, 0);
460 }
461
462 #[test]
463 fn read_at_to_vec() {
464 const DATA: &'static [u8] = b"vmo-contents";
465 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
466 vmo.write(DATA, 0).unwrap();
467 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
468
469 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
470 assert_eq!(data, DATA[5..]);
471 }
472
473 #[test]
474 fn write() {
475 const DATA: &'static [u8] = b"vmo-contents";
476 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
477 let stream =
478 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
479
480 let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
481 assert_eq!(bytes_written, DATA.len());
482
483 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
484 assert_eq!(data, DATA);
485 }
486
487 #[test]
488 fn write_at() {
489 const DATA: &'static [u8] = b"vmo-contents";
490 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
491 let stream =
492 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
493
494 let bytes_written =
495 stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
496 assert_eq!(bytes_written, 3);
497
498 let bytes_written =
499 stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
500 assert_eq!(bytes_written, DATA.len() - 3);
501
502 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
503 assert_eq!(data, DATA);
504 }
505
506 #[test]
507 fn std_io_read_write_seek() {
508 const DATA: &'static str = "stream-contents";
509 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
510 let mut stream =
511 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
512
513 std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
514 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
515 std::io::Seek::rewind(&mut stream).unwrap();
516 assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
517 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
518 }
519
520 #[test]
521 fn std_io_read_vectored() {
522 const DATA: &'static [u8] = b"stream-contents";
523 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
524 let mut stream =
525 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
526 assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
527 std::io::Seek::rewind(&mut stream).unwrap();
528
529 let mut buf1 = [0; 6];
530 let mut buf2 = [0; 1];
531 let mut buf3 = [0; 8];
532 let mut bufs = [
533 std::io::IoSliceMut::new(&mut buf1),
534 std::io::IoSliceMut::new(&mut buf2),
535 std::io::IoSliceMut::new(&mut buf3),
536 ];
537 assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
538 assert_eq!(buf1, DATA[0..6]);
539 assert_eq!(buf2, DATA[6..7]);
540 assert_eq!(buf3, DATA[7..]);
541 }
542
543 #[test]
544 fn std_io_write_vectored() {
545 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
546 let mut stream =
547 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
548
549 let bufs = [
550 std::io::IoSlice::new(b"stream"),
551 std::io::IoSlice::new(b"-"),
552 std::io::IoSlice::new(b"contents"),
553 ];
554 assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
555 std::io::Seek::rewind(&mut stream).unwrap();
556 assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
557 }
558}