1#![deny(missing_docs)]
8
9use flex_client::{MessageBuf, ProxyHasDomain};
10use flex_fuchsia_io as fio;
11use futures::stream::{FusedStream, Stream};
12use std::ffi::OsStr;
13use std::os::unix::ffi::OsStrExt;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use thiserror::Error;
18use zx_status::assoc_values;
19
20#[cfg(not(feature = "fdomain"))]
21use fuchsia_async as fasync;
22
23#[derive(Debug, Error, Clone)]
24#[allow(missing_docs)]
25pub enum WatcherCreateError {
26 #[error("while sending watch request: {0}")]
27 SendWatchRequest(#[source] fidl::Error),
28
29 #[error("watch failed with status: {0}")]
30 WatchError(#[source] zx_status::Status),
31
32 #[error("while converting client end to fasync channel: {0}")]
33 ChannelConversion(#[source] zx_status::Status),
34}
35
36#[derive(Debug, Error)]
37#[cfg_attr(not(feature = "fdomain"), derive(Eq, PartialEq))]
38#[allow(missing_docs)]
39pub enum WatcherStreamError {
40 #[cfg(not(feature = "fdomain"))]
41 #[error("read from watch channel failed with status: {0}")]
42 ChannelRead(#[from] zx_status::Status),
43 #[cfg(feature = "fdomain")]
44 #[error("read from watch channel failed: {0}")]
45 ChannelRead(#[from] flex_client::Error),
46}
47
48#[repr(C)]
50#[derive(Copy, Clone, Eq, PartialEq)]
51pub struct WatchEvent(fio::WatchEvent);
52
53assoc_values!(WatchEvent, [
54 DELETED = fio::WatchEvent::Deleted;
57 ADD_FILE = fio::WatchEvent::Added;
59 REMOVE_FILE = fio::WatchEvent::Removed;
61 EXISTING = fio::WatchEvent::Existing;
63 IDLE = fio::WatchEvent::Idle;
65]);
66
67#[derive(Debug, Eq, PartialEq)]
70pub struct WatchMessage {
71 pub event: WatchEvent,
73 pub filename: PathBuf,
75}
76
77#[derive(Debug, Eq, PartialEq)]
78enum WatcherState {
79 Watching,
80 TerminateOnNextPoll,
81 Terminated,
82}
83
84#[derive(Debug)]
88#[must_use = "futures/streams must be polled"]
89pub struct Watcher {
90 ch: flex_client::AsyncChannel,
91 buf: MessageBuf,
93 idx: usize,
94 state: WatcherState,
95}
96
97impl Unpin for Watcher {}
98
99impl Watcher {
100 pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
102 Self::new_with_mask(dir, fio::WatchMask::all()).await
103 }
104
105 pub async fn new_with_mask(
108 dir: &fio::DirectoryProxy,
109 mask: fio::WatchMask,
110 ) -> Result<Watcher, WatcherCreateError> {
111 let (client_end, server_end) = dir.domain().create_endpoints();
112 let options = 0u32;
113 let status = dir
114 .watch(mask, options, server_end)
115 .await
116 .map_err(WatcherCreateError::SendWatchRequest)?;
117 zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
118 let mut buf = MessageBuf::new();
119 buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
120 Ok(Watcher {
121 #[cfg(not(feature = "fdomain"))]
122 ch: fasync::Channel::from_channel(client_end.into_channel()),
123 #[cfg(feature = "fdomain")]
124 ch: client_end.into_channel(),
125 buf,
126 idx: 0,
127 state: WatcherState::Watching,
128 })
129 }
130
131 fn reset_buf(&mut self) {
132 self.idx = 0;
133 self.buf.clear();
134 }
135
136 fn get_next_msg(&mut self) -> WatchMessage {
137 assert!(self.idx < self.buf.bytes().len());
138 let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
139 .expect("Invalid buffer received by Watcher!");
140 self.idx += next_msg.len();
141
142 let mut pathbuf = PathBuf::new();
143 pathbuf.push(OsStr::from_bytes(next_msg.name()));
144 let event = next_msg.event();
145 WatchMessage { event, filename: pathbuf }
146 }
147}
148
149impl FusedStream for Watcher {
150 fn is_terminated(&self) -> bool {
151 self.state == WatcherState::Terminated
152 }
153}
154
155impl Stream for Watcher {
156 type Item = Result<WatchMessage, WatcherStreamError>;
157
158 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159 let this = &mut *self;
160 if this.state == WatcherState::TerminateOnNextPoll {
163 this.state = WatcherState::Terminated;
164 }
165 if this.state == WatcherState::Terminated {
166 return Poll::Ready(None);
167 }
168 if this.idx >= this.buf.bytes().len() {
169 this.reset_buf();
170 }
171 if this.idx == 0 {
172 match this.ch.recv_from(cx, &mut this.buf) {
173 Poll::Ready(Ok(())) => {}
174 Poll::Ready(Err(e)) => {
175 self.state = WatcherState::TerminateOnNextPoll;
176 return Poll::Ready(Some(Err(e.into())));
177 }
178 Poll::Pending => return Poll::Pending,
179 }
180 }
181 Poll::Ready(Some(Ok(this.get_next_msg())))
182 }
183}
184
185#[repr(C)]
186#[derive(Default)]
187struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
188impl<T> IncompleteArrayField<T> {
189 #[inline]
190 pub unsafe fn as_ptr(&self) -> *const T {
191 ::std::mem::transmute(self)
192 }
193 #[inline]
194 pub unsafe fn as_slice(&self, len: usize) -> &[T] {
195 ::std::slice::from_raw_parts(self.as_ptr(), len)
196 }
197}
198impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
199 fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
200 fmt.write_str("IncompleteArrayField")
201 }
202}
203
204#[repr(C)]
205#[derive(Debug)]
206struct vfs_watch_msg_t {
207 event: fio::WatchEvent,
208 len: u8,
209 name: IncompleteArrayField<u8>,
210}
211
212#[derive(Debug)]
213struct VfsWatchMsg<'a> {
214 inner: &'a vfs_watch_msg_t,
215}
216
217impl<'a> VfsWatchMsg<'a> {
218 fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
219 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
220 return None;
221 }
222 let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
226 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
227 return None;
228 }
229 Some(m)
230 }
231
232 fn len(&self) -> usize {
233 ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
234 }
235
236 fn event(&self) -> WatchEvent {
237 WatchEvent(self.inner.event)
238 }
239
240 fn namelen(&self) -> usize {
241 self.inner.len as usize
242 }
243
244 fn name(&self) -> &'a [u8] {
245 unsafe { self.inner.name.as_slice(self.namelen()) }
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use assert_matches::assert_matches;
255 use fuchsia_async::{DurationExt, TimeoutExt};
256
257 use futures::prelude::*;
258 use std::fmt::Debug;
259 use std::fs::File;
260 use std::path::Path;
261 use std::sync::Arc;
262 use tempfile::tempdir;
263 use vfs::directory::dirents_sink;
264 use vfs::directory::entry::{EntryInfo, GetEntryInfo};
265 use vfs::directory::entry_container::{Directory, DirectoryWatcher};
266 use vfs::directory::immutable::connection::ImmutableConnection;
267 use vfs::directory::traversal_position::TraversalPosition;
268 use vfs::execution_scope::ExecutionScope;
269 use vfs::node::Node;
270 use vfs::ObjectRequestRef;
271
272 fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
273 where
274 S: Stream<Item = Result<OK, ERR>> + Unpin,
275 ERR: Debug,
276 {
277 let f = s.next();
278 let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
279 panic!("timeout waiting for watcher")
280 });
281 f.map(|next| {
282 next.expect("the stream yielded no next item")
283 .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
284 })
285 }
286
287 #[fuchsia::test]
288 async fn test_existing() {
289 let tmp_dir = tempdir().unwrap();
290 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
291
292 let dir = crate::directory::open_in_namespace(
293 tmp_dir.path().to_str().unwrap(),
294 fio::PERM_READABLE,
295 )
296 .unwrap();
297 let mut w = Watcher::new(&dir).await.unwrap();
298
299 let msg = one_step(&mut w).await;
300 assert_eq!(WatchEvent::EXISTING, msg.event);
301 assert_eq!(Path::new("."), msg.filename);
302
303 let msg = one_step(&mut w).await;
304 assert_eq!(WatchEvent::EXISTING, msg.event);
305 assert_eq!(Path::new("file1"), msg.filename);
306
307 let msg = one_step(&mut w).await;
308 assert_eq!(WatchEvent::IDLE, msg.event);
309 }
310
311 #[fuchsia::test]
312 async fn test_add() {
313 let tmp_dir = tempdir().unwrap();
314
315 let dir = crate::directory::open_in_namespace(
316 tmp_dir.path().to_str().unwrap(),
317 fio::PERM_READABLE,
318 )
319 .unwrap();
320 let mut w = Watcher::new(&dir).await.unwrap();
321
322 loop {
323 let msg = one_step(&mut w).await;
324 match msg.event {
325 WatchEvent::EXISTING => continue,
326 WatchEvent::IDLE => break,
327 _ => panic!("Unexpected watch event!"),
328 }
329 }
330
331 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
332 let msg = one_step(&mut w).await;
333 assert_eq!(WatchEvent::ADD_FILE, msg.event);
334 assert_eq!(Path::new("file1"), msg.filename);
335 }
336
337 #[fuchsia::test]
338 async fn test_remove() {
339 let tmp_dir = tempdir().unwrap();
340
341 let filename = "file1";
342 let filepath = tmp_dir.path().join(filename);
343 let _ = File::create(&filepath).unwrap();
344
345 let dir = crate::directory::open_in_namespace(
346 tmp_dir.path().to_str().unwrap(),
347 fio::PERM_READABLE,
348 )
349 .unwrap();
350 let mut w = Watcher::new(&dir).await.unwrap();
351
352 loop {
353 let msg = one_step(&mut w).await;
354 match msg.event {
355 WatchEvent::EXISTING => continue,
356 WatchEvent::IDLE => break,
357 _ => panic!("Unexpected watch event!"),
358 }
359 }
360
361 ::std::fs::remove_file(&filepath).unwrap();
362 let msg = one_step(&mut w).await;
363 assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
364 assert_eq!(Path::new(filename), msg.filename);
365 }
366
367 struct MockDirectory;
368
369 impl MockDirectory {
370 fn new() -> Arc<Self> {
371 Arc::new(Self)
372 }
373 }
374
375 impl GetEntryInfo for MockDirectory {
376 fn entry_info(&self) -> EntryInfo {
377 EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
378 }
379 }
380
381 impl Node for MockDirectory {
382 async fn get_attributes(
383 &self,
384 _query: fio::NodeAttributesQuery,
385 ) -> Result<fio::NodeAttributes2, zx::Status> {
386 unimplemented!();
387 }
388
389 fn close(self: Arc<Self>) {}
390 }
391
392 impl Directory for MockDirectory {
393 fn open(
394 self: Arc<Self>,
395 scope: ExecutionScope,
396 _path: vfs::path::Path,
397 flags: fio::Flags,
398 object_request: ObjectRequestRef<'_>,
399 ) -> Result<(), zx::Status> {
400 object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
401 scope,
402 self.clone(),
403 flags,
404 );
405 Ok(())
406 }
407
408 async fn read_dirents<'a>(
409 &'a self,
410 _pos: &'a TraversalPosition,
411 _sink: Box<dyn dirents_sink::Sink>,
412 ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
413 unimplemented!("Not implemented");
414 }
415
416 fn register_watcher(
417 self: Arc<Self>,
418 _scope: ExecutionScope,
419 _mask: fio::WatchMask,
420 _watcher: DirectoryWatcher,
421 ) -> Result<(), zx::Status> {
422 Ok(())
425 }
426
427 fn unregister_watcher(self: Arc<Self>, _key: usize) {
428 unimplemented!("Not implemented");
429 }
430 }
431
432 #[fuchsia::test]
433 async fn test_error() {
434 let test_dir = MockDirectory::new();
435 let client = vfs::directory::serve_read_only(test_dir);
436 let mut w = Watcher::new(&client).await.unwrap();
437 let msg = w.next().await.expect("the stream yielded no next item");
438 assert!(!w.is_terminated());
439 assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
440 assert!(!w.is_terminated());
441 assert_matches!(w.next().await, None);
442 assert!(w.is_terminated());
443 }
444}