Skip to main content

zx/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Type-safe bindings for Zircon stream objects.
6
7use crate::{NullableHandle, Status, StatusExt, Vmo, ok, sys};
8use bitflags::bitflags;
9use std::io::SeekFrom;
10use std::mem::MaybeUninit;
11
12/// An object representing a Zircon [stream](https://fuchsia.dev/fuchsia-src/concepts/objects/stream.md).
13///
14/// As essentially a subtype of `NullableHandle`, it can be freely interconverted.
15#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
16#[repr(transparent)]
17pub struct Stream(NullableHandle);
18impl_handle_based!(Stream);
19
20bitflags! {
21    #[repr(transparent)]
22    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
23    pub struct StreamOptions: u32 {
24        const MODE_READ = sys::ZX_STREAM_MODE_READ;
25        const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
26        const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
27    }
28}
29
30bitflags! {
31    #[repr(transparent)]
32    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
33    pub struct StreamReadOptions: u32 {
34    }
35}
36
37bitflags! {
38    #[repr(transparent)]
39    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
40    pub struct StreamWriteOptions: u32 {
41        const APPEND = sys::ZX_STREAM_APPEND;
42    }
43}
44
45impl Stream {
46    /// See [zx_stream_create](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_create)
47    pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
48        let mut handle = 0;
49        let status =
50            unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
51        ok(status)?;
52        unsafe { Ok(Stream::from(NullableHandle::from_raw(handle))) }
53    }
54
55    /// Wraps the
56    /// [`zx_stream_readv`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv)
57    /// syscall.
58    ///
59    /// # Safety
60    ///
61    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
62    /// not necessarily initialized) memory.
63    pub unsafe fn readv(
64        &self,
65        options: StreamReadOptions,
66        iovecs: &mut [sys::zx_iovec_t],
67    ) -> Result<usize, Status> {
68        let mut actual = 0;
69        let status = unsafe {
70            sys::zx_stream_readv(
71                self.raw_handle(),
72                options.bits(),
73                iovecs.as_mut_ptr(),
74                iovecs.len(),
75                &mut actual,
76            )
77        };
78        ok(status)?;
79        Ok(actual)
80    }
81
82    /// Attempts to read `buffer.len()` bytes from the stream starting at the stream's current seek
83    /// offset. Only the number of bytes read from the stream will be initialized in `buffer`.
84    /// Returns the number of bytes read from the stream.
85    ///
86    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
87    pub fn read_uninit(
88        &self,
89        options: StreamReadOptions,
90        buffer: &mut [MaybeUninit<u8>],
91    ) -> Result<usize, Status> {
92        // TODO(https://fxbug.dev/42079723) use MaybeUninit::slice_as_mut_ptr when stable
93        let mut iovec =
94            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
95        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
96        // to `readv`.
97        unsafe { self.readv(options, &mut iovec) }
98    }
99
100    /// Attempts to read `length` bytes from the stream starting at the stream's current seek
101    /// offset. Returns the read bytes as a `Vec`.
102    ///
103    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
104    pub fn read_to_vec(
105        &self,
106        options: StreamReadOptions,
107        length: usize,
108    ) -> Result<Vec<u8>, Status> {
109        let mut data = Vec::with_capacity(length);
110        let buffer = &mut data.spare_capacity_mut()[0..length];
111        let actual = self.read_uninit(options, buffer)?;
112        // SAFETY: read_uninit returns the number of bytes that were initialized.
113        unsafe { data.set_len(actual) };
114        Ok(data)
115    }
116
117    /// Wraps the
118    /// [`zx_stream_readv_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at)
119    /// syscall.
120    ///
121    /// # Safety
122    ///
123    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
124    /// not necessarily initialized) memory.
125    pub unsafe fn readv_at(
126        &self,
127        options: StreamReadOptions,
128        offset: u64,
129        iovecs: &mut [sys::zx_iovec_t],
130    ) -> Result<usize, Status> {
131        let mut actual = 0;
132        let status = unsafe {
133            sys::zx_stream_readv_at(
134                self.raw_handle(),
135                options.bits(),
136                offset,
137                iovecs.as_mut_ptr(),
138                iovecs.len(),
139                &mut actual,
140            )
141        };
142        ok(status)?;
143        Ok(actual)
144    }
145
146    /// Attempts to read `buffer.len()` bytes from the stream starting at `offset`. Only the number
147    /// of bytes read from the stream will be initialized in `buffer`. Returns the number of bytes
148    /// read from the stream.
149    ///
150    /// See
151    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
152    pub fn read_at_uninit(
153        &self,
154        options: StreamReadOptions,
155        offset: u64,
156        buffer: &mut [MaybeUninit<u8>],
157    ) -> Result<usize, Status> {
158        // TODO(https://fxbug.dev/42079723) Use MaybeUninit::slice_as_mut_ptr when stable.
159        let mut iovec =
160            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
161        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
162        // to `readv_at`.
163        unsafe { self.readv_at(options, offset, &mut iovec) }
164    }
165
166    /// Attempts to read `length` bytes from the stream starting at `offset`. Returns the read bytes
167    /// as a `Vec`.
168    ///
169    /// See
170    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
171    pub fn read_at_to_vec(
172        &self,
173        options: StreamReadOptions,
174        offset: u64,
175        length: usize,
176    ) -> Result<Vec<u8>, Status> {
177        let mut data = Vec::with_capacity(length);
178        let buffer = &mut data.spare_capacity_mut()[0..length];
179        let actual = self.read_at_uninit(options, offset, buffer)?;
180        // SAFETY: read_at_uninit returns the number of bytes that were initialized.
181        unsafe { data.set_len(actual) };
182        Ok(data)
183    }
184
185    /// See [zx_stream_seek](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_seek)
186    pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
187        let (whence, offset) = match pos {
188            SeekFrom::Start(start) => (
189                sys::ZX_STREAM_SEEK_ORIGIN_START,
190                start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
191            ),
192            SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
193            SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
194        };
195        let mut pos = 0;
196        let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
197        ok(status)?;
198        Ok(pos)
199    }
200
201    /// Wraps the
202    /// [`zx_stream_writev`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev)
203    /// syscall.
204    pub fn writev(
205        &self,
206        options: StreamWriteOptions,
207        iovecs: &[sys::zx_iovec_t],
208    ) -> Result<usize, Status> {
209        let mut actual = 0;
210        let status = unsafe {
211            sys::zx_stream_writev(
212                self.raw_handle(),
213                options.bits(),
214                iovecs.as_ptr(),
215                iovecs.len(),
216                &mut actual,
217            )
218        };
219        ok(status)?;
220        Ok(actual)
221    }
222
223    /// Writes `buffer` to the stream at the stream's current seek offset. Returns the number of
224    /// bytes written.
225    ///
226    /// See [zx_stream_writev](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev).
227    pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
228        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
229        self.writev(options, &iovec)
230    }
231
232    /// Wraps the
233    /// [`zx_stream_writev_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at)
234    /// syscall.
235    pub fn writev_at(
236        &self,
237        options: StreamWriteOptions,
238        offset: u64,
239        iovecs: &[sys::zx_iovec_t],
240    ) -> Result<usize, Status> {
241        let mut actual = 0;
242        let status = unsafe {
243            sys::zx_stream_writev_at(
244                self.raw_handle(),
245                options.bits(),
246                offset,
247                iovecs.as_ptr(),
248                iovecs.len(),
249                &mut actual,
250            )
251        };
252        ok(status)?;
253        Ok(actual)
254    }
255
256    /// Writes `buffer` to the stream at `offset``. Returns the number of bytes written.
257    ///
258    /// See
259    /// [zx_stream_writev_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at).
260    pub fn write_at(
261        &self,
262        options: StreamWriteOptions,
263        offset: u64,
264        buffer: &[u8],
265    ) -> Result<usize, Status> {
266        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
267        self.writev_at(options, offset, &iovec)
268    }
269}
270
271unsafe_handle_properties!(object: Stream,
272    props: [
273        {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
274    ]
275);
276
277impl std::io::Read for Stream {
278    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
279        let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
280        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
281        // to `readv`.
282        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }
283            .map_err(|s| s.into_io_error())?)
284    }
285
286    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
287        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
288        let mut iovecs = unsafe {
289            std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
290        };
291        // SAFETY: `IoSliceMut` can only be constructed from a mutable slice so we know it's safe to
292        // pass to `readv`.
293        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }
294            .map_err(|s| s.into_io_error())?)
295    }
296}
297
298impl std::io::Seek for Stream {
299    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
300        Ok(Self::seek(&self, pos).map_err(|s| s.into_io_error())? as u64)
301    }
302}
303
304impl std::io::Write for Stream {
305    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
306        Ok(Self::write(&self, StreamWriteOptions::empty(), buf).map_err(|s| s.into_io_error())?)
307    }
308
309    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
310        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
311        let iovecs = unsafe {
312            std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
313        };
314        Ok(self.writev(StreamWriteOptions::empty(), &iovecs).map_err(|s| s.into_io_error())?)
315    }
316
317    fn flush(&mut self) -> std::io::Result<()> {
318        Ok(())
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate as zx;
326
327    #[test]
328    fn create() {
329        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
330
331        let stream =
332            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
333
334        let basic_info = stream.basic_info().unwrap();
335        assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
336        assert!(basic_info.rights.contains(zx::Rights::READ));
337        assert!(basic_info.rights.contains(zx::Rights::WRITE));
338    }
339
340    #[test]
341    fn create_readonly() {
342        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
343
344        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
345
346        let basic_info = stream.basic_info().unwrap();
347        assert!(basic_info.rights.contains(zx::Rights::READ));
348        assert!(!basic_info.rights.contains(zx::Rights::WRITE));
349    }
350
351    #[test]
352    fn create_offset() {
353        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
354        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
355        assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
356    }
357
358    #[test]
359    fn create_with_mode_append() {
360        let size: u64 = zx::system_get_page_size().into();
361        let vmo = zx::Vmo::create(size).unwrap();
362        let stream =
363            Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
364                .unwrap();
365        assert_eq!(stream.get_mode_append().unwrap(), 1);
366    }
367
368    #[test]
369    fn get_and_set_mode_append() {
370        let size: u64 = zx::system_get_page_size().into();
371        let vmo = zx::Vmo::create(size).unwrap();
372        let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
373        assert_eq!(stream.get_mode_append().unwrap(), 0);
374        stream.set_mode_append(&1).unwrap();
375        assert_eq!(stream.get_mode_append().unwrap(), 1);
376        stream.set_mode_append(&0).unwrap();
377        assert_eq!(stream.get_mode_append().unwrap(), 0);
378    }
379
380    #[test]
381    fn read_uninit() {
382        const DATA: &'static [u8] = b"vmo-contents";
383        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
384        vmo.write(DATA, 0).unwrap();
385        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
386
387        // Read from the stream.
388        let mut data = Vec::with_capacity(5);
389        let bytes_read =
390            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
391        assert_eq!(bytes_read, 5);
392        unsafe { data.set_len(5) };
393        assert_eq!(data, DATA[0..5]);
394
395        // Try to read more data than is available in the stream.
396        let mut data = Vec::with_capacity(10);
397        let bytes_read =
398            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
399        assert_eq!(bytes_read, 7);
400        unsafe { data.set_len(7) };
401        assert_eq!(data, DATA[5..]);
402
403        // Try to read at the end of the stream.
404        let mut data = Vec::with_capacity(10);
405        let bytes_read =
406            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
407        assert_eq!(bytes_read, 0);
408    }
409
410    #[test]
411    fn read_to_vec() {
412        const DATA: &'static [u8] = b"vmo-contents";
413        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
414        vmo.write(DATA, 0).unwrap();
415        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
416
417        let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
418        assert_eq!(data, DATA);
419    }
420
421    #[test]
422    fn read_at_uninit() {
423        const DATA: &'static [u8] = b"vmo-contents";
424        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
425        vmo.write(DATA, 0).unwrap();
426        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
427
428        // Read from the stream.
429        let mut data = Vec::with_capacity(5);
430        let bytes_read = stream
431            .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
432            .unwrap();
433        assert_eq!(bytes_read, 5);
434        unsafe { data.set_len(5) };
435        assert_eq!(data, DATA[0..5]);
436
437        // Try to read beyond the end of the stream.
438        let mut data = Vec::with_capacity(10);
439        let bytes_read = stream
440            .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
441            .unwrap();
442        assert_eq!(bytes_read, 7);
443        unsafe { data.set_len(7) };
444        assert_eq!(data, DATA[5..]);
445
446        // Try to read starting beyond the end of the stream.
447        let mut data = Vec::with_capacity(10);
448        let bytes_read = stream
449            .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
450            .unwrap();
451        assert_eq!(bytes_read, 0);
452    }
453
454    #[test]
455    fn read_at_to_vec() {
456        const DATA: &'static [u8] = b"vmo-contents";
457        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
458        vmo.write(DATA, 0).unwrap();
459        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
460
461        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
462        assert_eq!(data, DATA[5..]);
463    }
464
465    #[test]
466    fn write() {
467        const DATA: &'static [u8] = b"vmo-contents";
468        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
469        let stream =
470            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
471
472        let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
473        assert_eq!(bytes_written, DATA.len());
474
475        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
476        assert_eq!(data, DATA);
477    }
478
479    #[test]
480    fn write_at() {
481        const DATA: &'static [u8] = b"vmo-contents";
482        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
483        let stream =
484            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
485
486        let bytes_written =
487            stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
488        assert_eq!(bytes_written, 3);
489
490        let bytes_written =
491            stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
492        assert_eq!(bytes_written, DATA.len() - 3);
493
494        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
495        assert_eq!(data, DATA);
496    }
497
498    #[test]
499    fn std_io_read_write_seek() {
500        const DATA: &'static str = "stream-contents";
501        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
502        let mut stream =
503            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
504
505        std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
506        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
507        std::io::Seek::rewind(&mut stream).unwrap();
508        assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
509        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
510    }
511
512    #[test]
513    fn std_io_read_vectored() {
514        const DATA: &'static [u8] = b"stream-contents";
515        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
516        let mut stream =
517            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
518        assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
519        std::io::Seek::rewind(&mut stream).unwrap();
520
521        let mut buf1 = [0; 6];
522        let mut buf2 = [0; 1];
523        let mut buf3 = [0; 8];
524        let mut bufs = [
525            std::io::IoSliceMut::new(&mut buf1),
526            std::io::IoSliceMut::new(&mut buf2),
527            std::io::IoSliceMut::new(&mut buf3),
528        ];
529        assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
530        assert_eq!(buf1, DATA[0..6]);
531        assert_eq!(buf2, DATA[6..7]);
532        assert_eq!(buf3, DATA[7..]);
533    }
534
535    #[test]
536    fn std_io_write_vectored() {
537        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::UNBOUNDED, 0).unwrap();
538        let mut stream =
539            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
540
541        let bufs = [
542            std::io::IoSlice::new(b"stream"),
543            std::io::IoSlice::new(b"-"),
544            std::io::IoSlice::new(b"contents"),
545        ];
546        assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
547        std::io::Seek::rewind(&mut stream).unwrap();
548        assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
549    }
550}