fuchsia_fs/directory/
watcher.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Stream-based Fuchsia VFS directory watcher
6
7#![deny(missing_docs)]
8
9use flex_client::{MessageBuf, ProxyHasDomain};
10use flex_fuchsia_io as fio;
11use futures::stream::{FusedStream, Stream};
12use std::ffi::OsStr;
13use std::os::unix::ffi::OsStrExt;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use thiserror::Error;
18use zx_status::assoc_values;
19
20#[cfg(not(feature = "fdomain"))]
21use fuchsia_async as fasync;
22
23#[derive(Debug, Error, Clone)]
24#[allow(missing_docs)]
25pub enum WatcherCreateError {
26    #[error("while sending watch request: {0}")]
27    SendWatchRequest(#[source] fidl::Error),
28
29    #[error("watch failed with status: {0}")]
30    WatchError(#[source] zx_status::Status),
31
32    #[error("while converting client end to fasync channel: {0}")]
33    ChannelConversion(#[source] zx_status::Status),
34}
35
36#[derive(Debug, Error)]
37#[cfg_attr(not(feature = "fdomain"), derive(Eq, PartialEq))]
38#[allow(missing_docs)]
39pub enum WatcherStreamError {
40    #[cfg(not(feature = "fdomain"))]
41    #[error("read from watch channel failed with status: {0}")]
42    ChannelRead(#[from] zx_status::Status),
43    #[cfg(feature = "fdomain")]
44    #[error("read from watch channel failed: {0}")]
45    ChannelRead(#[from] flex_client::Error),
46}
47
48/// Describes the type of event that occurred in the directory being watched.
49#[repr(C)]
50#[derive(Copy, Clone, Eq, PartialEq)]
51pub struct WatchEvent(fio::WatchEvent);
52
53assoc_values!(WatchEvent, [
54    /// The directory being watched has been deleted. The name returned for this event
55    /// will be `.` (dot), as it is referring to the directory itself.
56    DELETED     = fio::WatchEvent::Deleted;
57    /// A file was added.
58    ADD_FILE    = fio::WatchEvent::Added;
59    /// A file was removed.
60    REMOVE_FILE = fio::WatchEvent::Removed;
61    /// A file existed at the time the Watcher was created.
62    EXISTING    = fio::WatchEvent::Existing;
63    /// All existing files have been enumerated.
64    IDLE        = fio::WatchEvent::Idle;
65]);
66
67/// A message containing a `WatchEvent` and the filename (relative to the directory being watched)
68/// that triggered the event.
69#[derive(Debug, Eq, PartialEq)]
70pub struct WatchMessage {
71    /// The event that occurred.
72    pub event: WatchEvent,
73    /// The filename that triggered the message.
74    pub filename: PathBuf,
75}
76
77#[derive(Debug, Eq, PartialEq)]
78enum WatcherState {
79    Watching,
80    TerminateOnNextPoll,
81    Terminated,
82}
83
84/// Provides a Stream of WatchMessages corresponding to filesystem events for a given directory.
85/// After receiving an error, the stream will return the error, and then will terminate. After it's
86/// terminated, the stream is fused and will continue to return None when polled.
87#[derive(Debug)]
88#[must_use = "futures/streams must be polled"]
89pub struct Watcher {
90    ch: flex_client::AsyncChannel,
91    // If idx >= buf.bytes().len(), you must call reset_buf() before get_next_msg().
92    buf: MessageBuf,
93    idx: usize,
94    state: WatcherState,
95}
96
97impl Unpin for Watcher {}
98
99impl Watcher {
100    /// Creates a new `Watcher` for the directory given by `dir`.
101    pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
102        Self::new_with_mask(dir, fio::WatchMask::all()).await
103    }
104
105    /// Creates a new `Watcher` for the directory given by `dir`, only returning events specified
106    /// by `mask`.
107    pub async fn new_with_mask(
108        dir: &fio::DirectoryProxy,
109        mask: fio::WatchMask,
110    ) -> Result<Watcher, WatcherCreateError> {
111        let (client_end, server_end) = dir.domain().create_endpoints();
112        let options = 0u32;
113        let status = dir
114            .watch(mask, options, server_end)
115            .await
116            .map_err(WatcherCreateError::SendWatchRequest)?;
117        zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
118        let mut buf = MessageBuf::new();
119        buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
120        Ok(Watcher {
121            #[cfg(not(feature = "fdomain"))]
122            ch: fasync::Channel::from_channel(client_end.into_channel()),
123            #[cfg(feature = "fdomain")]
124            ch: client_end.into_channel(),
125            buf,
126            idx: 0,
127            state: WatcherState::Watching,
128        })
129    }
130
131    fn reset_buf(&mut self) {
132        self.idx = 0;
133        self.buf.clear();
134    }
135
136    fn get_next_msg(&mut self) -> WatchMessage {
137        assert!(self.idx < self.buf.bytes().len());
138        let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
139            .expect("Invalid buffer received by Watcher!");
140        self.idx += next_msg.len();
141
142        let mut pathbuf = PathBuf::new();
143        pathbuf.push(OsStr::from_bytes(next_msg.name()));
144        let event = next_msg.event();
145        WatchMessage { event, filename: pathbuf }
146    }
147}
148
149impl FusedStream for Watcher {
150    fn is_terminated(&self) -> bool {
151        self.state == WatcherState::Terminated
152    }
153}
154
155impl Stream for Watcher {
156    type Item = Result<WatchMessage, WatcherStreamError>;
157
158    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159        let this = &mut *self;
160        // Once this stream has hit an error, it's likely unrecoverable at this level and should be
161        // closed. Clients can attempt to recover by creating a new Watcher.
162        if this.state == WatcherState::TerminateOnNextPoll {
163            this.state = WatcherState::Terminated;
164        }
165        if this.state == WatcherState::Terminated {
166            return Poll::Ready(None);
167        }
168        if this.idx >= this.buf.bytes().len() {
169            this.reset_buf();
170        }
171        if this.idx == 0 {
172            match this.ch.recv_from(cx, &mut this.buf) {
173                Poll::Ready(Ok(())) => {}
174                Poll::Ready(Err(e)) => {
175                    self.state = WatcherState::TerminateOnNextPoll;
176                    return Poll::Ready(Some(Err(e.into())));
177                }
178                Poll::Pending => return Poll::Pending,
179            }
180        }
181        Poll::Ready(Some(Ok(this.get_next_msg())))
182    }
183}
184
185#[repr(C)]
186#[derive(Default)]
187struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
188impl<T> IncompleteArrayField<T> {
189    #[inline]
190    pub unsafe fn as_ptr(&self) -> *const T {
191        ::std::mem::transmute(self)
192    }
193    #[inline]
194    pub unsafe fn as_slice(&self, len: usize) -> &[T] {
195        ::std::slice::from_raw_parts(self.as_ptr(), len)
196    }
197}
198impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
199    fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
200        fmt.write_str("IncompleteArrayField")
201    }
202}
203
204#[repr(C)]
205#[derive(Debug)]
206struct vfs_watch_msg_t {
207    event: fio::WatchEvent,
208    len: u8,
209    name: IncompleteArrayField<u8>,
210}
211
212#[derive(Debug)]
213struct VfsWatchMsg<'a> {
214    inner: &'a vfs_watch_msg_t,
215}
216
217impl<'a> VfsWatchMsg<'a> {
218    fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
219        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
220            return None;
221        }
222        // This is safe as long as the buffer is at least as large as a vfs_watch_msg_t, which we
223        // just verified. Further, we verify that the buffer has enough bytes to hold the
224        // "incomplete array field" member.
225        let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
226        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
227            return None;
228        }
229        Some(m)
230    }
231
232    fn len(&self) -> usize {
233        ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
234    }
235
236    fn event(&self) -> WatchEvent {
237        WatchEvent(self.inner.event)
238    }
239
240    fn namelen(&self) -> usize {
241        self.inner.len as usize
242    }
243
244    fn name(&self) -> &'a [u8] {
245        // This is safe because we verified during construction that the inner name field has at
246        // least namelen() bytes in it.
247        unsafe { self.inner.name.as_slice(self.namelen()) }
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use assert_matches::assert_matches;
255    use fuchsia_async::{DurationExt, TimeoutExt};
256
257    use futures::prelude::*;
258    use std::fmt::Debug;
259    use std::fs::File;
260    use std::path::Path;
261    use std::sync::Arc;
262    use tempfile::tempdir;
263    use vfs::directory::dirents_sink;
264    use vfs::directory::entry::{EntryInfo, GetEntryInfo};
265    use vfs::directory::entry_container::{Directory, DirectoryWatcher};
266    use vfs::directory::immutable::connection::ImmutableConnection;
267    use vfs::directory::traversal_position::TraversalPosition;
268    use vfs::execution_scope::ExecutionScope;
269    use vfs::node::Node;
270    use vfs::ObjectRequestRef;
271
272    fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
273    where
274        S: Stream<Item = Result<OK, ERR>> + Unpin,
275        ERR: Debug,
276    {
277        let f = s.next();
278        let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
279            panic!("timeout waiting for watcher")
280        });
281        f.map(|next| {
282            next.expect("the stream yielded no next item")
283                .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
284        })
285    }
286
287    #[fuchsia::test]
288    async fn test_existing() {
289        let tmp_dir = tempdir().unwrap();
290        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
291
292        let dir = crate::directory::open_in_namespace(
293            tmp_dir.path().to_str().unwrap(),
294            fio::PERM_READABLE,
295        )
296        .unwrap();
297        let mut w = Watcher::new(&dir).await.unwrap();
298
299        let msg = one_step(&mut w).await;
300        assert_eq!(WatchEvent::EXISTING, msg.event);
301        assert_eq!(Path::new("."), msg.filename);
302
303        let msg = one_step(&mut w).await;
304        assert_eq!(WatchEvent::EXISTING, msg.event);
305        assert_eq!(Path::new("file1"), msg.filename);
306
307        let msg = one_step(&mut w).await;
308        assert_eq!(WatchEvent::IDLE, msg.event);
309    }
310
311    #[fuchsia::test]
312    async fn test_add() {
313        let tmp_dir = tempdir().unwrap();
314
315        let dir = crate::directory::open_in_namespace(
316            tmp_dir.path().to_str().unwrap(),
317            fio::PERM_READABLE,
318        )
319        .unwrap();
320        let mut w = Watcher::new(&dir).await.unwrap();
321
322        loop {
323            let msg = one_step(&mut w).await;
324            match msg.event {
325                WatchEvent::EXISTING => continue,
326                WatchEvent::IDLE => break,
327                _ => panic!("Unexpected watch event!"),
328            }
329        }
330
331        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
332        let msg = one_step(&mut w).await;
333        assert_eq!(WatchEvent::ADD_FILE, msg.event);
334        assert_eq!(Path::new("file1"), msg.filename);
335    }
336
337    #[fuchsia::test]
338    async fn test_remove() {
339        let tmp_dir = tempdir().unwrap();
340
341        let filename = "file1";
342        let filepath = tmp_dir.path().join(filename);
343        let _ = File::create(&filepath).unwrap();
344
345        let dir = crate::directory::open_in_namespace(
346            tmp_dir.path().to_str().unwrap(),
347            fio::PERM_READABLE,
348        )
349        .unwrap();
350        let mut w = Watcher::new(&dir).await.unwrap();
351
352        loop {
353            let msg = one_step(&mut w).await;
354            match msg.event {
355                WatchEvent::EXISTING => continue,
356                WatchEvent::IDLE => break,
357                _ => panic!("Unexpected watch event!"),
358            }
359        }
360
361        ::std::fs::remove_file(&filepath).unwrap();
362        let msg = one_step(&mut w).await;
363        assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
364        assert_eq!(Path::new(filename), msg.filename);
365    }
366
367    struct MockDirectory;
368
369    impl MockDirectory {
370        fn new() -> Arc<Self> {
371            Arc::new(Self)
372        }
373    }
374
375    impl GetEntryInfo for MockDirectory {
376        fn entry_info(&self) -> EntryInfo {
377            EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
378        }
379    }
380
381    impl Node for MockDirectory {
382        async fn get_attributes(
383            &self,
384            _query: fio::NodeAttributesQuery,
385        ) -> Result<fio::NodeAttributes2, zx::Status> {
386            unimplemented!();
387        }
388
389        fn close(self: Arc<Self>) {}
390    }
391
392    impl Directory for MockDirectory {
393        fn open(
394            self: Arc<Self>,
395            scope: ExecutionScope,
396            _path: vfs::path::Path,
397            flags: fio::Flags,
398            object_request: ObjectRequestRef<'_>,
399        ) -> Result<(), zx::Status> {
400            object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
401                scope,
402                self.clone(),
403                flags,
404            );
405            Ok(())
406        }
407
408        async fn read_dirents<'a>(
409            &'a self,
410            _pos: &'a TraversalPosition,
411            _sink: Box<dyn dirents_sink::Sink>,
412        ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
413            unimplemented!("Not implemented");
414        }
415
416        fn register_watcher(
417            self: Arc<Self>,
418            _scope: ExecutionScope,
419            _mask: fio::WatchMask,
420            _watcher: DirectoryWatcher,
421        ) -> Result<(), zx::Status> {
422            // Don't do anything, just throw out the watcher, which should close the channel, to
423            // generate a PEER_CLOSED error.
424            Ok(())
425        }
426
427        fn unregister_watcher(self: Arc<Self>, _key: usize) {
428            unimplemented!("Not implemented");
429        }
430    }
431
432    #[fuchsia::test]
433    async fn test_error() {
434        let test_dir = MockDirectory::new();
435        let client = vfs::directory::serve_read_only(test_dir);
436        let mut w = Watcher::new(&client).await.unwrap();
437        let msg = w.next().await.expect("the stream yielded no next item");
438        assert!(!w.is_terminated());
439        assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
440        assert!(!w.is_terminated());
441        assert_matches!(w.next().await, None);
442        assert!(w.is_terminated());
443    }
444}