zx/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Type-safe bindings for Zircon stream objects.
6
7use crate::{
8    object_get_property, object_set_property, ok, sys, AsHandleRef, Handle, HandleBased, HandleRef,
9    Property, PropertyQuery, Status, Vmo,
10};
11use bitflags::bitflags;
12use std::io::SeekFrom;
13use std::mem::MaybeUninit;
14
15/// An object representing a Zircon [stream](https://fuchsia.dev/fuchsia-src/concepts/objects/stream.md).
16///
17/// As essentially a subtype of `Handle`, it can be freely interconverted.
18#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
19#[repr(transparent)]
20pub struct Stream(Handle);
21impl_handle_based!(Stream);
22
23bitflags! {
24    #[repr(transparent)]
25    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26    pub struct StreamOptions: u32 {
27        const MODE_READ = sys::ZX_STREAM_MODE_READ;
28        const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
29        const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
30    }
31}
32
33bitflags! {
34    #[repr(transparent)]
35    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
36    pub struct StreamReadOptions: u32 {
37    }
38}
39
40bitflags! {
41    #[repr(transparent)]
42    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43    pub struct StreamWriteOptions: u32 {
44        const APPEND = sys::ZX_STREAM_APPEND;
45    }
46}
47
48impl Stream {
49    /// See [zx_stream_create](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_create)
50    pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
51        let mut handle = 0;
52        let status =
53            unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
54        ok(status)?;
55        unsafe { Ok(Stream::from(Handle::from_raw(handle))) }
56    }
57
58    /// Wraps the
59    /// [`zx_stream_readv`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv)
60    /// syscall.
61    ///
62    /// # Safety
63    ///
64    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
65    /// not necessarily initialized) memory.
66    pub unsafe fn readv(
67        &self,
68        options: StreamReadOptions,
69        iovecs: &mut [sys::zx_iovec_t],
70    ) -> Result<usize, Status> {
71        let mut actual = 0;
72        let status = unsafe {
73            sys::zx_stream_readv(
74                self.raw_handle(),
75                options.bits(),
76                iovecs.as_mut_ptr(),
77                iovecs.len(),
78                &mut actual,
79            )
80        };
81        ok(status)?;
82        Ok(actual)
83    }
84
85    /// Attempts to read `buffer.len()` bytes from the stream starting at the stream's current seek
86    /// offset. Only the number of bytes read from the stream will be initialized in `buffer`.
87    /// Returns the number of bytes read from the stream.
88    ///
89    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
90    pub fn read_uninit(
91        &self,
92        options: StreamReadOptions,
93        buffer: &mut [MaybeUninit<u8>],
94    ) -> Result<usize, Status> {
95        // TODO(https://fxbug.dev/42079723) use MaybeUninit::slice_as_mut_ptr when stable
96        let mut iovec =
97            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
98        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
99        // to `readv`.
100        unsafe { self.readv(options, &mut iovec) }
101    }
102
103    /// Attempts to read `length` bytes from the stream starting at the stream's current seek
104    /// offset. Returns the read bytes as a `Vec`.
105    ///
106    /// See [zx_stream_readv](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv).
107    pub fn read_to_vec(
108        &self,
109        options: StreamReadOptions,
110        length: usize,
111    ) -> Result<Vec<u8>, Status> {
112        let mut data = Vec::with_capacity(length);
113        let buffer = &mut data.spare_capacity_mut()[0..length];
114        let actual = self.read_uninit(options, buffer)?;
115        // SAFETY: read_uninit returns the number of bytes that were initialized.
116        unsafe { data.set_len(actual) };
117        Ok(data)
118    }
119
120    /// Wraps the
121    /// [`zx_stream_readv_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at)
122    /// syscall.
123    ///
124    /// # Safety
125    ///
126    /// The caller is responsible for ensuring that the buffers in `iovecs` point to valid (albeit
127    /// not necessarily initialized) memory.
128    pub unsafe fn readv_at(
129        &self,
130        options: StreamReadOptions,
131        offset: u64,
132        iovecs: &mut [sys::zx_iovec_t],
133    ) -> Result<usize, Status> {
134        let mut actual = 0;
135        let status = unsafe {
136            sys::zx_stream_readv_at(
137                self.raw_handle(),
138                options.bits(),
139                offset,
140                iovecs.as_mut_ptr(),
141                iovecs.len(),
142                &mut actual,
143            )
144        };
145        ok(status)?;
146        Ok(actual)
147    }
148
149    /// Attempts to read `buffer.len()` bytes from the stream starting at `offset`. Only the number
150    /// of bytes read from the stream will be initialized in `buffer`. Returns the number of bytes
151    /// read from the stream.
152    ///
153    /// See
154    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
155    pub fn read_at_uninit(
156        &self,
157        options: StreamReadOptions,
158        offset: u64,
159        buffer: &mut [MaybeUninit<u8>],
160    ) -> Result<usize, Status> {
161        // TODO(https://fxbug.dev/42079723) Use MaybeUninit::slice_as_mut_ptr when stable.
162        let mut iovec =
163            [sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
164        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
165        // to `readv_at`.
166        unsafe { self.readv_at(options, offset, &mut iovec) }
167    }
168
169    /// Attempts to read `length` bytes from the stream starting at `offset`. Returns the read bytes
170    /// as a `Vec`.
171    ///
172    /// See
173    /// [zx_stream_readv_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_readv_at).
174    pub fn read_at_to_vec(
175        &self,
176        options: StreamReadOptions,
177        offset: u64,
178        length: usize,
179    ) -> Result<Vec<u8>, Status> {
180        let mut data = Vec::with_capacity(length);
181        let buffer = &mut data.spare_capacity_mut()[0..length];
182        let actual = self.read_at_uninit(options, offset, buffer)?;
183        // SAFETY: read_at_uninit returns the number of bytes that were initialized.
184        unsafe { data.set_len(actual) };
185        Ok(data)
186    }
187
188    /// See [zx_stream_seek](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_seek)
189    pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
190        let (whence, offset) = match pos {
191            SeekFrom::Start(start) => (
192                sys::ZX_STREAM_SEEK_ORIGIN_START,
193                start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
194            ),
195            SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
196            SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
197        };
198        let mut pos = 0;
199        let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
200        ok(status)?;
201        Ok(pos)
202    }
203
204    /// Wraps the
205    /// [`zx_stream_writev`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev)
206    /// syscall.
207    pub fn writev(
208        &self,
209        options: StreamWriteOptions,
210        iovecs: &[sys::zx_iovec_t],
211    ) -> Result<usize, Status> {
212        let mut actual = 0;
213        let status = unsafe {
214            sys::zx_stream_writev(
215                self.raw_handle(),
216                options.bits(),
217                iovecs.as_ptr(),
218                iovecs.len(),
219                &mut actual,
220            )
221        };
222        ok(status)?;
223        Ok(actual)
224    }
225
226    /// Writes `buffer` to the stream at the stream's current seek offset. Returns the number of
227    /// bytes written.
228    ///
229    /// See [zx_stream_writev](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev).
230    pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
231        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
232        self.writev(options, &iovec)
233    }
234
235    /// Wraps the
236    /// [`zx_stream_writev_at`](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at)
237    /// syscall.
238    pub fn writev_at(
239        &self,
240        options: StreamWriteOptions,
241        offset: u64,
242        iovecs: &[sys::zx_iovec_t],
243    ) -> Result<usize, Status> {
244        let mut actual = 0;
245        let status = unsafe {
246            sys::zx_stream_writev_at(
247                self.raw_handle(),
248                options.bits(),
249                offset,
250                iovecs.as_ptr(),
251                iovecs.len(),
252                &mut actual,
253            )
254        };
255        ok(status)?;
256        Ok(actual)
257    }
258
259    /// Writes `buffer` to the stream at `offset``. Returns the number of bytes written.
260    ///
261    /// See
262    /// [zx_stream_writev_at](https://fuchsia.dev/fuchsia-src/reference/syscalls/stream_writev_at).
263    pub fn write_at(
264        &self,
265        options: StreamWriteOptions,
266        offset: u64,
267        buffer: &[u8],
268    ) -> Result<usize, Status> {
269        let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
270        self.writev_at(options, offset, &iovec)
271    }
272}
273
274unsafe_handle_properties!(object: Stream,
275    props: [
276        {query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
277    ]
278);
279
280impl std::io::Read for Stream {
281    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
282        let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
283        // SAFETY: The buffer in `iovec` comes from a mutable slice so we know it's safe to pass it
284        // to `readv`.
285        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
286    }
287
288    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
289        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
290        let mut iovecs = unsafe {
291            std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
292        };
293        // SAFETY: `IoSliceMut` can only be constructed from a mutable slice so we know it's safe to
294        // pass to `readv`.
295        Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
296    }
297}
298
299impl std::io::Seek for Stream {
300    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
301        Ok(Self::seek(&self, pos)? as u64)
302    }
303}
304
305impl std::io::Write for Stream {
306    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
307        Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
308    }
309
310    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
311        // SAFETY: `zx_iovec_t` and `IoSliceMut` have the same layout.
312        let iovecs = unsafe {
313            std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
314        };
315        Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
316    }
317
318    fn flush(&mut self) -> std::io::Result<()> {
319        Ok(())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate as zx;
327
328    #[test]
329    fn create() {
330        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
331
332        let stream =
333            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
334
335        let basic_info = stream.basic_info().unwrap();
336        assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
337        assert!(basic_info.rights.contains(zx::Rights::READ));
338        assert!(basic_info.rights.contains(zx::Rights::WRITE));
339    }
340
341    #[test]
342    fn create_readonly() {
343        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
344
345        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
346
347        let basic_info = stream.basic_info().unwrap();
348        assert!(basic_info.rights.contains(zx::Rights::READ));
349        assert!(!basic_info.rights.contains(zx::Rights::WRITE));
350    }
351
352    #[test]
353    fn create_offset() {
354        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
355        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
356        assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
357    }
358
359    #[test]
360    fn create_invalid() {
361        let result =
362            Stream::create(StreamOptions::MODE_READ, &zx::Vmo::from(zx::Handle::invalid()), 0);
363        assert_eq!(result, Err(zx::Status::BAD_HANDLE));
364    }
365
366    #[test]
367    fn create_with_mode_append() {
368        let size: u64 = zx::system_get_page_size().into();
369        let vmo = zx::Vmo::create(size).unwrap();
370        let stream =
371            Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
372                .unwrap();
373        assert_eq!(stream.get_mode_append().unwrap(), 1);
374    }
375
376    #[test]
377    fn get_and_set_mode_append() {
378        let size: u64 = zx::system_get_page_size().into();
379        let vmo = zx::Vmo::create(size).unwrap();
380        let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
381        assert_eq!(stream.get_mode_append().unwrap(), 0);
382        stream.set_mode_append(&1).unwrap();
383        assert_eq!(stream.get_mode_append().unwrap(), 1);
384        stream.set_mode_append(&0).unwrap();
385        assert_eq!(stream.get_mode_append().unwrap(), 0);
386    }
387
388    #[test]
389    fn read_uninit() {
390        const DATA: &'static [u8] = b"vmo-contents";
391        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
392        vmo.write(DATA, 0).unwrap();
393        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
394
395        // Read from the stream.
396        let mut data = Vec::with_capacity(5);
397        let bytes_read =
398            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
399        assert_eq!(bytes_read, 5);
400        unsafe { data.set_len(5) };
401        assert_eq!(data, DATA[0..5]);
402
403        // Try to read more data than is available in the stream.
404        let mut data = Vec::with_capacity(10);
405        let bytes_read =
406            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
407        assert_eq!(bytes_read, 7);
408        unsafe { data.set_len(7) };
409        assert_eq!(data, DATA[5..]);
410
411        // Try to read at the end of the stream.
412        let mut data = Vec::with_capacity(10);
413        let bytes_read =
414            stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
415        assert_eq!(bytes_read, 0);
416    }
417
418    #[test]
419    fn read_to_vec() {
420        const DATA: &'static [u8] = b"vmo-contents";
421        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
422        vmo.write(DATA, 0).unwrap();
423        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
424
425        let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
426        assert_eq!(data, DATA);
427    }
428
429    #[test]
430    fn read_at_uninit() {
431        const DATA: &'static [u8] = b"vmo-contents";
432        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
433        vmo.write(DATA, 0).unwrap();
434        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
435
436        // Read from the stream.
437        let mut data = Vec::with_capacity(5);
438        let bytes_read = stream
439            .read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
440            .unwrap();
441        assert_eq!(bytes_read, 5);
442        unsafe { data.set_len(5) };
443        assert_eq!(data, DATA[0..5]);
444
445        // Try to read beyond the end of the stream.
446        let mut data = Vec::with_capacity(10);
447        let bytes_read = stream
448            .read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
449            .unwrap();
450        assert_eq!(bytes_read, 7);
451        unsafe { data.set_len(7) };
452        assert_eq!(data, DATA[5..]);
453
454        // Try to read starting beyond the end of the stream.
455        let mut data = Vec::with_capacity(10);
456        let bytes_read = stream
457            .read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
458            .unwrap();
459        assert_eq!(bytes_read, 0);
460    }
461
462    #[test]
463    fn read_at_to_vec() {
464        const DATA: &'static [u8] = b"vmo-contents";
465        let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
466        vmo.write(DATA, 0).unwrap();
467        let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
468
469        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
470        assert_eq!(data, DATA[5..]);
471    }
472
473    #[test]
474    fn write() {
475        const DATA: &'static [u8] = b"vmo-contents";
476        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
477        let stream =
478            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
479
480        let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
481        assert_eq!(bytes_written, DATA.len());
482
483        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
484        assert_eq!(data, DATA);
485    }
486
487    #[test]
488    fn write_at() {
489        const DATA: &'static [u8] = b"vmo-contents";
490        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
491        let stream =
492            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
493
494        let bytes_written =
495            stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
496        assert_eq!(bytes_written, 3);
497
498        let bytes_written =
499            stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
500        assert_eq!(bytes_written, DATA.len() - 3);
501
502        let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
503        assert_eq!(data, DATA);
504    }
505
506    #[test]
507    fn std_io_read_write_seek() {
508        const DATA: &'static str = "stream-contents";
509        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
510        let mut stream =
511            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
512
513        std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
514        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
515        std::io::Seek::rewind(&mut stream).unwrap();
516        assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
517        assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
518    }
519
520    #[test]
521    fn std_io_read_vectored() {
522        const DATA: &'static [u8] = b"stream-contents";
523        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
524        let mut stream =
525            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
526        assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
527        std::io::Seek::rewind(&mut stream).unwrap();
528
529        let mut buf1 = [0; 6];
530        let mut buf2 = [0; 1];
531        let mut buf3 = [0; 8];
532        let mut bufs = [
533            std::io::IoSliceMut::new(&mut buf1),
534            std::io::IoSliceMut::new(&mut buf2),
535            std::io::IoSliceMut::new(&mut buf3),
536        ];
537        assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
538        assert_eq!(buf1, DATA[0..6]);
539        assert_eq!(buf2, DATA[6..7]);
540        assert_eq!(buf3, DATA[7..]);
541    }
542
543    #[test]
544    fn std_io_write_vectored() {
545        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
546        let mut stream =
547            Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
548
549        let bufs = [
550            std::io::IoSlice::new(b"stream"),
551            std::io::IoSlice::new(b"-"),
552            std::io::IoSlice::new(b"contents"),
553        ];
554        assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
555        std::io::Seek::rewind(&mut stream).unwrap();
556        assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
557    }
558}