use crate::{
object_get_property, object_set_property, ok, sys, AsHandleRef, Handle, HandleBased, HandleRef,
Property, PropertyQuery, Status, Vmo,
};
use bitflags::bitflags;
use std::io::SeekFrom;
use std::mem::MaybeUninit;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[repr(transparent)]
pub struct Stream(Handle);
impl_handle_based!(Stream);
bitflags! {
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct StreamOptions: u32 {
const MODE_READ = sys::ZX_STREAM_MODE_READ;
const MODE_WRITE = sys::ZX_STREAM_MODE_WRITE;
const MODE_APPEND = sys::ZX_STREAM_MODE_APPEND;
}
}
bitflags! {
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct StreamReadOptions: u32 {
}
}
bitflags! {
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct StreamWriteOptions: u32 {
const APPEND = sys::ZX_STREAM_APPEND;
}
}
impl Stream {
pub fn create(options: StreamOptions, vmo: &Vmo, offset: u64) -> Result<Self, Status> {
let mut handle = 0;
let status =
unsafe { sys::zx_stream_create(options.bits(), vmo.raw_handle(), offset, &mut handle) };
ok(status)?;
unsafe { Ok(Stream::from(Handle::from_raw(handle))) }
}
pub unsafe fn readv(
&self,
options: StreamReadOptions,
iovecs: &mut [sys::zx_iovec_t],
) -> Result<usize, Status> {
let mut actual = 0;
let status = unsafe {
sys::zx_stream_readv(
self.raw_handle(),
options.bits(),
iovecs.as_mut_ptr(),
iovecs.len(),
&mut actual,
)
};
ok(status)?;
Ok(actual)
}
pub fn read_uninit(
&self,
options: StreamReadOptions,
buffer: &mut [MaybeUninit<u8>],
) -> Result<usize, Status> {
let mut iovec =
[sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
unsafe { self.readv(options, &mut iovec) }
}
pub fn read_to_vec(
&self,
options: StreamReadOptions,
length: usize,
) -> Result<Vec<u8>, Status> {
let mut data = Vec::with_capacity(length);
let buffer = &mut data.spare_capacity_mut()[0..length];
let actual = self.read_uninit(options, buffer)?;
unsafe { data.set_len(actual) };
Ok(data)
}
pub unsafe fn readv_at(
&self,
options: StreamReadOptions,
offset: u64,
iovecs: &mut [sys::zx_iovec_t],
) -> Result<usize, Status> {
let mut actual = 0;
let status = unsafe {
sys::zx_stream_readv_at(
self.raw_handle(),
options.bits(),
offset,
iovecs.as_mut_ptr(),
iovecs.len(),
&mut actual,
)
};
ok(status)?;
Ok(actual)
}
pub fn read_at_uninit(
&self,
options: StreamReadOptions,
offset: u64,
buffer: &mut [MaybeUninit<u8>],
) -> Result<usize, Status> {
let mut iovec =
[sys::zx_iovec_t { buffer: buffer.as_mut_ptr().cast::<u8>(), capacity: buffer.len() }];
unsafe { self.readv_at(options, offset, &mut iovec) }
}
pub fn read_at_to_vec(
&self,
options: StreamReadOptions,
offset: u64,
length: usize,
) -> Result<Vec<u8>, Status> {
let mut data = Vec::with_capacity(length);
let buffer = &mut data.spare_capacity_mut()[0..length];
let actual = self.read_at_uninit(options, offset, buffer)?;
unsafe { data.set_len(actual) };
Ok(data)
}
pub fn seek(&self, pos: SeekFrom) -> Result<u64, Status> {
let (whence, offset) = match pos {
SeekFrom::Start(start) => (
sys::ZX_STREAM_SEEK_ORIGIN_START,
start.try_into().map_err(|_| Status::OUT_OF_RANGE)?,
),
SeekFrom::End(end) => (sys::ZX_STREAM_SEEK_ORIGIN_END, end),
SeekFrom::Current(current) => (sys::ZX_STREAM_SEEK_ORIGIN_CURRENT, current),
};
let mut pos = 0;
let status = unsafe { sys::zx_stream_seek(self.raw_handle(), whence, offset, &mut pos) };
ok(status)?;
Ok(pos)
}
pub fn writev(
&self,
options: StreamWriteOptions,
iovecs: &[sys::zx_iovec_t],
) -> Result<usize, Status> {
let mut actual = 0;
let status = unsafe {
sys::zx_stream_writev(
self.raw_handle(),
options.bits(),
iovecs.as_ptr(),
iovecs.len(),
&mut actual,
)
};
ok(status)?;
Ok(actual)
}
pub fn write(&self, options: StreamWriteOptions, buffer: &[u8]) -> Result<usize, Status> {
let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
self.writev(options, &iovec)
}
pub fn writev_at(
&self,
options: StreamWriteOptions,
offset: u64,
iovecs: &[sys::zx_iovec_t],
) -> Result<usize, Status> {
let mut actual = 0;
let status = unsafe {
sys::zx_stream_writev_at(
self.raw_handle(),
options.bits(),
offset,
iovecs.as_ptr(),
iovecs.len(),
&mut actual,
)
};
ok(status)?;
Ok(actual)
}
pub fn write_at(
&self,
options: StreamWriteOptions,
offset: u64,
buffer: &[u8],
) -> Result<usize, Status> {
let iovec = [sys::zx_iovec_t { buffer: buffer.as_ptr(), capacity: buffer.len() }];
self.writev_at(options, offset, &iovec)
}
}
unsafe_handle_properties!(object: Stream,
props: [
{query_ty: STREAM_MODE_APPEND, tag: StreamModeAppendTag, prop_ty: u8, get: get_mode_append, set: set_mode_append},
]
);
impl std::io::Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut iovec = [sys::zx_iovec_t { buffer: buf.as_mut_ptr(), capacity: buf.len() }];
Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovec) }?)
}
fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
let mut iovecs = unsafe {
std::slice::from_raw_parts_mut(bufs.as_mut_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
};
Ok(unsafe { self.readv(StreamReadOptions::empty(), &mut iovecs) }?)
}
}
impl std::io::Seek for Stream {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
Ok(Self::seek(&self, pos)? as u64)
}
}
impl std::io::Write for Stream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(Self::write(&self, StreamWriteOptions::empty(), buf)?)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
let iovecs = unsafe {
std::slice::from_raw_parts(bufs.as_ptr().cast::<sys::zx_iovec_t>(), bufs.len())
};
Ok(self.writev(StreamWriteOptions::empty(), &iovecs)?)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate as zx;
#[test]
fn create() {
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
let basic_info = stream.basic_info().unwrap();
assert_eq!(basic_info.object_type, zx::ObjectType::STREAM);
assert!(basic_info.rights.contains(zx::Rights::READ));
assert!(basic_info.rights.contains(zx::Rights::WRITE));
}
#[test]
fn create_readonly() {
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
let basic_info = stream.basic_info().unwrap();
assert!(basic_info.rights.contains(zx::Rights::READ));
assert!(!basic_info.rights.contains(zx::Rights::WRITE));
}
#[test]
fn create_offset() {
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 24).unwrap();
assert_eq!(stream.seek(SeekFrom::Current(0)).unwrap(), 24);
}
#[test]
fn create_invalid() {
let result =
Stream::create(StreamOptions::MODE_READ, &zx::Vmo::from(zx::Handle::invalid()), 0);
assert_eq!(result, Err(zx::Status::BAD_HANDLE));
}
#[test]
fn create_with_mode_append() {
let size: u64 = zx::system_get_page_size().into();
let vmo = zx::Vmo::create(size).unwrap();
let stream =
Stream::create(StreamOptions::MODE_WRITE | StreamOptions::MODE_APPEND, &vmo, 0)
.unwrap();
assert_eq!(stream.get_mode_append().unwrap(), 1);
}
#[test]
fn get_and_set_mode_append() {
let size: u64 = zx::system_get_page_size().into();
let vmo = zx::Vmo::create(size).unwrap();
let stream = Stream::create(StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
assert_eq!(stream.get_mode_append().unwrap(), 0);
stream.set_mode_append(&1).unwrap();
assert_eq!(stream.get_mode_append().unwrap(), 1);
stream.set_mode_append(&0).unwrap();
assert_eq!(stream.get_mode_append().unwrap(), 0);
}
#[test]
fn read_uninit() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
vmo.write(DATA, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
let mut data = Vec::with_capacity(5);
let bytes_read =
stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
assert_eq!(bytes_read, 5);
unsafe { data.set_len(5) };
assert_eq!(data, DATA[0..5]);
let mut data = Vec::with_capacity(10);
let bytes_read =
stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
assert_eq!(bytes_read, 7);
unsafe { data.set_len(7) };
assert_eq!(data, DATA[5..]);
let mut data = Vec::with_capacity(10);
let bytes_read =
stream.read_uninit(StreamReadOptions::empty(), data.spare_capacity_mut()).unwrap();
assert_eq!(bytes_read, 0);
}
#[test]
fn read_to_vec() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
vmo.write(DATA, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
let data = stream.read_to_vec(StreamReadOptions::empty(), DATA.len()).unwrap();
assert_eq!(data, DATA);
}
#[test]
fn read_at_uninit() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
vmo.write(DATA, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
let mut data = Vec::with_capacity(5);
let bytes_read = stream
.read_at_uninit(StreamReadOptions::empty(), 0, data.spare_capacity_mut())
.unwrap();
assert_eq!(bytes_read, 5);
unsafe { data.set_len(5) };
assert_eq!(data, DATA[0..5]);
let mut data = Vec::with_capacity(10);
let bytes_read = stream
.read_at_uninit(StreamReadOptions::empty(), 5, data.spare_capacity_mut())
.unwrap();
assert_eq!(bytes_read, 7);
unsafe { data.set_len(7) };
assert_eq!(data, DATA[5..]);
let mut data = Vec::with_capacity(10);
let bytes_read = stream
.read_at_uninit(StreamReadOptions::empty(), 20, data.spare_capacity_mut())
.unwrap();
assert_eq!(bytes_read, 0);
}
#[test]
fn read_at_to_vec() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create(DATA.len() as u64).unwrap();
vmo.write(DATA, 0).unwrap();
let stream = Stream::create(StreamOptions::MODE_READ, &vmo, 0).unwrap();
let data = stream.read_at_to_vec(StreamReadOptions::empty(), 5, DATA.len()).unwrap();
assert_eq!(data, DATA[5..]);
}
#[test]
fn write() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
let bytes_written = stream.write(zx::StreamWriteOptions::empty(), DATA).unwrap();
assert_eq!(bytes_written, DATA.len());
let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
assert_eq!(data, DATA);
}
#[test]
fn write_at() {
const DATA: &'static [u8] = b"vmo-contents";
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
let bytes_written =
stream.write_at(zx::StreamWriteOptions::empty(), 0, &DATA[0..3]).unwrap();
assert_eq!(bytes_written, 3);
let bytes_written =
stream.write_at(zx::StreamWriteOptions::empty(), 3, &DATA[3..]).unwrap();
assert_eq!(bytes_written, DATA.len() - 3);
let data = stream.read_at_to_vec(StreamReadOptions::empty(), 0, DATA.len()).unwrap();
assert_eq!(data, DATA);
}
#[test]
fn std_io_read_write_seek() {
const DATA: &'static str = "stream-contents";
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let mut stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
std::io::Write::write_all(&mut stream, DATA.as_bytes()).unwrap();
assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
std::io::Seek::rewind(&mut stream).unwrap();
assert_eq!(std::io::read_to_string(&mut stream).unwrap(), DATA);
assert_eq!(std::io::Seek::stream_position(&mut stream).unwrap(), DATA.len() as u64);
}
#[test]
fn std_io_read_vectored() {
const DATA: &'static [u8] = b"stream-contents";
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let mut stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
assert_eq!(stream.write(StreamWriteOptions::empty(), DATA).unwrap(), DATA.len());
std::io::Seek::rewind(&mut stream).unwrap();
let mut buf1 = [0; 6];
let mut buf2 = [0; 1];
let mut buf3 = [0; 8];
let mut bufs = [
std::io::IoSliceMut::new(&mut buf1),
std::io::IoSliceMut::new(&mut buf2),
std::io::IoSliceMut::new(&mut buf3),
];
assert_eq!(std::io::Read::read_vectored(&mut stream, &mut bufs).unwrap(), DATA.len());
assert_eq!(buf1, DATA[0..6]);
assert_eq!(buf2, DATA[6..7]);
assert_eq!(buf3, DATA[7..]);
}
#[test]
fn std_io_write_vectored() {
let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, 0).unwrap();
let mut stream =
Stream::create(StreamOptions::MODE_READ | StreamOptions::MODE_WRITE, &vmo, 0).unwrap();
let bufs = [
std::io::IoSlice::new(b"stream"),
std::io::IoSlice::new(b"-"),
std::io::IoSlice::new(b"contents"),
];
assert_eq!(std::io::Write::write_vectored(&mut stream, &bufs).unwrap(), 15);
std::io::Seek::rewind(&mut stream).unwrap();
assert_eq!(stream.read_to_vec(StreamReadOptions::empty(), 15).unwrap(), b"stream-contents");
}
}