1use crate::{NullableHandle, Status, StatusExt, Vmo, ok, sys};
8use bitflags::bitflags;
9use std::io::SeekFrom;
10use std::mem::MaybeUninit;
11
12#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
16#[repr(transparent)]
17pub struct Stream(NullableHandle);
18impl_handle_based!(Stream);
19
20bitflags! {
21 #[repr(transparent)]
22 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
23 pub struct StreamOptions: u32 {
24 const MODE_READ = sys::ZX_STREAM_MODE_READ;
25 const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
26 const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
27 }
28}
29
30bitflags! {
31 #[repr(transparent)]
32 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
33 pub struct StreamReadOptions: u32 {
34 }
35}
36
37bitflags! {
38 #[repr(transparent)]
39 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
40 pub struct StreamWriteOptions: u32 {
41 const APPEND = sys::ZX_STREAM_APPEND;
42 }
43}
44
45impl Stream {
46 pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
48 let mut handle = 0;
49 let status =
50 unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
51 ok(status)?;
52 unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
53 }
54
55 pub unsafe fn readv(
64 &self,
65 options: StreamReadOptions,
66 iovecs: &mut [sys::zx_iovec_t],
67 ) -> Result<usize, Status> {
68 let mut actual = 0;
69 let status = unsafe {
70 sys::zx_stream_readv(
71 self.raw_handle(),
72 options.bits(),
73 iovecs.as_mut_ptr(),
74 iovecs.len(),
75 &mut actual,
76 )
77 };
78 ok(status)?;
79 Ok(actual)
80 }
81
82 pub fn read_uninit(
88 &self,
89 options: StreamReadOptions,
90 buffer: &mut [MaybeUninit<u8>],
91 ) -> Result<usize, Status> {
92 let mut iovec =
94 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
95 unsafe { self.readv(options, &mut iovec) }
98 }
99
100 pub fn read_to_vec(
105 &self,
106 options: StreamReadOptions,
107 length: usize,
108 ) -> Result<Vec<u8>, Status> {
109 let mut data = Vec::with_capacity(length);
110 let buffer = &mut data.spare_capacity_mut()[0..length];
111 let actual = self.read_uninit(options, buffer)?;
112 unsafe { data.set_len(actual) };
114 Ok(data)
115 }
116
117 pub unsafe fn readv_at(
126 &self,
127 options: StreamReadOptions,
128 offset: u64,
129 iovecs: &mut [sys::zx_iovec_t],
130 ) -> Result<usize, Status> {
131 let mut actual = 0;
132 let status = unsafe {
133 sys::zx_stream_readv_at(
134 self.raw_handle(),
135 options.bits(),
136 offset,
137 iovecs.as_mut_ptr(),
138 iovecs.len(),
139 &mut actual,
140 )
141 };
142 ok(status)?;
143 Ok(actual)
144 }
145
146 pub fn read_at_uninit(
153 &self,
154 options: StreamReadOptions,
155 offset: u64,
156 buffer: &mut [MaybeUninit<u8>],
157 ) -> Result<usize, Status> {
158 let mut iovec =
160 [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
161 unsafe { self.readv_at(options, offset, &mut iovec) }
164 }
165
166 pub fn read_at_to_vec(
172 &self,
173 options: StreamReadOptions,
174 offset: u64,
175 length: usize,
176 ) -> Result<Vec<u8>, Status> {
177 let mut data = Vec::with_capacity(length);
178 let buffer = &mut data.spare_capacity_mut()[0..length];
179 let actual = self.read_at_uninit(options, offset, buffer)?;
180 unsafe { data.set_len(actual) };
182 Ok(data)
183 }
184
185 pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
187 let (whence, offset) = match pos {
188 SeekFrom::Start(start) => (
189 sys::ZX_STREAM_SEEK_ORIGIN_START,
190 start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
191 ),
192 SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
193 SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
194 };
195 let mut pos = 0;
196 let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
197 ok(status)?;
198 Ok(pos)
199 }
200
201 pub fn writev(
205 &self,
206 options: StreamWriteOptions,
207 iovecs: &[sys::zx_iovec_t],
208 ) -> Result<usize, Status> {
209 let mut actual = 0;
210 let status = unsafe {
211 sys::zx_stream_writev(
212 self.raw_handle(),
213 options.bits(),
214 iovecs.as_ptr(),
215 iovecs.len(),
216 &mut actual,
217 )
218 };
219 ok(status)?;
220 Ok(actual)
221 }
222
223 pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
228 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
229 self.writev(options, &iovec)
230 }
231
232 pub fn writev_at(
236 &self,
237 options: StreamWriteOptions,
238 offset: u64,
239 iovecs: &[sys::zx_iovec_t],
240 ) -> Result<usize, Status> {
241 let mut actual = 0;
242 let status = unsafe {
243 sys::zx_stream_writev_at(
244 self.raw_handle(),
245 options.bits(),
246 offset,
247 iovecs.as_ptr(),
248 iovecs.len(),
249 &mut actual,
250 )
251 };
252 ok(status)?;
253 Ok(actual)
254 }
255
256 pub fn write_at(
261 &self,
262 options: StreamWriteOptions,
263 offset: u64,
264 buffer: &[u8],
265 ) -> Result<usize, Status> {
266 let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
267 self.writev_at(options, offset, &iovec)
268 }
269}
270
271unsafe_handle_properties!(object: Stream,
272 props: [
273 {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
274 ]
275);
276
277impl std::io::Read for Stream {
278 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
279 let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
280 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }
283 .map_err(|s| s.into_io_error())?)
284 }
285
286 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
287 let mut iovecs = unsafe {
289 std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
290 };
291 Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }
294 .map_err(|s| s.into_io_error())?)
295 }
296}
297
298impl std::io::Seek for Stream {
299 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
300 Ok(Self::seek(&self, pos).map_err(|s| s.into_io_error())? as u64)
301 }
302}
303
304impl std::io::Write for Stream {
305 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
306 Ok(Self::write(&self, StreamWriteOptions::empty(), buf).map_err(|s| s.into_io_error())?)
307 }
308
309 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
310 let iovecs = unsafe {
312 std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
313 };
314 Ok(self.writev(StreamWriteOptions::empty(), &iovecs).map_err(|s| s.into_io_error())?)
315 }
316
317 fn flush(&mut self) -> std::io::Result<()> {
318 Ok(())
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate as zx;
326
327 #[test]
328 fn create() {
329 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
330
331 let stream =
332 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
333
334 let basic_info = stream.basic_info().unwrap();
335 assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
336 assert!(basic_info.rights.contains(zx::Rights::READ));
337 assert!(basic_info.rights.contains(zx::Rights::WRITE));
338 }
339
340 #[test]
341 fn create_readonly() {
342 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
343
344 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
345
346 let basic_info = stream.basic_info().unwrap();
347 assert!(basic_info.rights.contains(zx::Rights::READ));
348 assert!(!basic_info.rights.contains(zx::Rights::WRITE));
349 }
350
351 #[test]
352 fn create_offset() {
353 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
354 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
355 assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
356 }
357
358 #[test]
359 fn create_with_mode_append() {
360 let size: u64 = zx::system_get_page_size().into();
361 let vmo = zx::Vmo::create(size).unwrap();
362 let stream =
363 Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
364 .unwrap();
365 assert_eq!(stream.get_mode_append().unwrap(), 1);
366 }
367
368 #[test]
369 fn get_and_set_mode_append() {
370 let size: u64 = zx::system_get_page_size().into();
371 let vmo = zx::Vmo::create(size).unwrap();
372 let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
373 assert_eq!(stream.get_mode_append().unwrap(), 0);
374 stream.set_mode_append(&1).unwrap();
375 assert_eq!(stream.get_mode_append().unwrap(), 1);
376 stream.set_mode_append(&0).unwrap();
377 assert_eq!(stream.get_mode_append().unwrap(), 0);
378 }
379
380 #[test]
381 fn read_uninit() {
382 const DATA: &'static [u8] = b"vmo-contents";
383 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
384 vmo.write(DATA, 0).unwrap();
385 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
386
387 let mut data = Vec::with_capacity(5);
389 let bytes_read =
390 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
391 assert_eq!(bytes_read, 5);
392 unsafe { data.set_len(5) };
393 assert_eq!(data, DATA[0..5]);
394
395 let mut data = Vec::with_capacity(10);
397 let bytes_read =
398 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
399 assert_eq!(bytes_read, 7);
400 unsafe { data.set_len(7) };
401 assert_eq!(data, DATA[5..]);
402
403 let mut data = Vec::with_capacity(10);
405 let bytes_read =
406 stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
407 assert_eq!(bytes_read, 0);
408 }
409
410 #[test]
411 fn read_to_vec() {
412 const DATA: &'static [u8] = b"vmo-contents";
413 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
414 vmo.write(DATA, 0).unwrap();
415 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
416
417 let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
418 assert_eq!(data, DATA);
419 }
420
421 #[test]
422 fn read_at_uninit() {
423 const DATA: &'static [u8] = b"vmo-contents";
424 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
425 vmo.write(DATA, 0).unwrap();
426 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
427
428 let mut data = Vec::with_capacity(5);
430 let bytes_read = stream
431 .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
432 .unwrap();
433 assert_eq!(bytes_read, 5);
434 unsafe { data.set_len(5) };
435 assert_eq!(data, DATA[0..5]);
436
437 let mut data = Vec::with_capacity(10);
439 let bytes_read = stream
440 .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
441 .unwrap();
442 assert_eq!(bytes_read, 7);
443 unsafe { data.set_len(7) };
444 assert_eq!(data, DATA[5..]);
445
446 let mut data = Vec::with_capacity(10);
448 let bytes_read = stream
449 .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
450 .unwrap();
451 assert_eq!(bytes_read, 0);
452 }
453
454 #[test]
455 fn read_at_to_vec() {
456 const DATA: &'static [u8] = b"vmo-contents";
457 let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
458 vmo.write(DATA, 0).unwrap();
459 let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
460
461 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
462 assert_eq!(data, DATA[5..]);
463 }
464
465 #[test]
466 fn write() {
467 const DATA: &'static [u8] = b"vmo-contents";
468 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
469 let stream =
470 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
471
472 let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
473 assert_eq!(bytes_written, DATA.len());
474
475 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
476 assert_eq!(data, DATA);
477 }
478
479 #[test]
480 fn write_at() {
481 const DATA: &'static [u8] = b"vmo-contents";
482 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
483 let stream =
484 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
485
486 let bytes_written =
487 stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
488 assert_eq!(bytes_written, 3);
489
490 let bytes_written =
491 stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
492 assert_eq!(bytes_written, DATA.len() - 3);
493
494 let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
495 assert_eq!(data, DATA);
496 }
497
498 #[test]
499 fn std_io_read_write_seek() {
500 const DATA: &'static str = "stream-contents";
501 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
502 let mut stream =
503 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
504
505 std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
506 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
507 std::io::Seek::rewind(&mut stream).unwrap();
508 assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
509 assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
510 }
511
512 #[test]
513 fn std_io_read_vectored() {
514 const DATA: &'static [u8] = b"stream-contents";
515 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
516 let mut stream =
517 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
518 assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
519 std::io::Seek::rewind(&mut stream).unwrap();
520
521 let mut buf1 = [0; 6];
522 let mut buf2 = [0; 1];
523 let mut buf3 = [0; 8];
524 let mut bufs = [
525 std::io::IoSliceMut::new(&mut buf1),
526 std::io::IoSliceMut::new(&mut buf2),
527 std::io::IoSliceMut::new(&mut buf3),
528 ];
529 assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
530 assert_eq!(buf1, DATA[0..6]);
531 assert_eq!(buf2, DATA[6..7]);
532 assert_eq!(buf3, DATA[7..]);
533 }
534
535 #[test]
536 fn std_io_write_vectored() {
537 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
538 let mut stream =
539 Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
540
541 let bufs = [
542 std::io::IoSlice::new(b"stream"),
543 std::io::IoSlice::new(b"-"),
544 std::io::IoSlice::new(b"contents"),
545 ];
546 assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
547 std::io::Seek::rewind(&mut stream).unwrap();
548 assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
549 }
550}