fuchsia_fs/directory/
watcher.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Stream-based Fuchsia VFS directory watcher
6
7#![deny(missing_docs)]
8
9use futures::stream::{FusedStream, Stream};
10use std::ffi::OsStr;
11use std::os::unix::ffi::OsStrExt;
12use std::path::PathBuf;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use thiserror::Error;
16use zx_status::assoc_values;
17use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
18
19#[cfg(target_os = "fuchsia")]
20use zx::MessageBuf;
21
22#[cfg(not(target_os = "fuchsia"))]
23use fasync::emulated_handle::MessageBuf;
24
25#[derive(Debug, Error, Clone)]
26#[allow(missing_docs)]
27pub enum WatcherCreateError {
28    #[error("while sending watch request: {0}")]
29    SendWatchRequest(#[source] fidl::Error),
30
31    #[error("watch failed with status: {0}")]
32    WatchError(#[source] zx_status::Status),
33
34    #[error("while converting client end to fasync channel: {0}")]
35    ChannelConversion(#[source] zx_status::Status),
36}
37
38#[derive(Debug, Error, Eq, PartialEq)]
39#[allow(missing_docs)]
40pub enum WatcherStreamError {
41    #[error("read from watch channel failed with status: {0}")]
42    ChannelRead(#[from] zx_status::Status),
43}
44
45/// Describes the type of event that occurred in the directory being watched.
46#[repr(C)]
47#[derive(Copy, Clone, Eq, PartialEq)]
48pub struct WatchEvent(fio::WatchEvent);
49
50assoc_values!(WatchEvent, [
51    /// The directory being watched has been deleted. The name returned for this event
52    /// will be `.` (dot), as it is referring to the directory itself.
53    DELETED     = fio::WatchEvent::Deleted;
54    /// A file was added.
55    ADD_FILE    = fio::WatchEvent::Added;
56    /// A file was removed.
57    REMOVE_FILE = fio::WatchEvent::Removed;
58    /// A file existed at the time the Watcher was created.
59    EXISTING    = fio::WatchEvent::Existing;
60    /// All existing files have been enumerated.
61    IDLE        = fio::WatchEvent::Idle;
62]);
63
64/// A message containing a `WatchEvent` and the filename (relative to the directory being watched)
65/// that triggered the event.
66#[derive(Debug, Eq, PartialEq)]
67pub struct WatchMessage {
68    /// The event that occurred.
69    pub event: WatchEvent,
70    /// The filename that triggered the message.
71    pub filename: PathBuf,
72}
73
74#[derive(Debug, Eq, PartialEq)]
75enum WatcherState {
76    Watching,
77    TerminateOnNextPoll,
78    Terminated,
79}
80
81/// Provides a Stream of WatchMessages corresponding to filesystem events for a given directory.
82/// After receiving an error, the stream will return the error, and then will terminate. After it's
83/// terminated, the stream is fused and will continue to return None when polled.
84#[derive(Debug)]
85#[must_use = "futures/streams must be polled"]
86pub struct Watcher {
87    ch: fasync::Channel,
88    // If idx >= buf.bytes().len(), you must call reset_buf() before get_next_msg().
89    buf: MessageBuf,
90    idx: usize,
91    state: WatcherState,
92}
93
94impl Unpin for Watcher {}
95
96impl Watcher {
97    /// Creates a new `Watcher` for the directory given by `dir`.
98    pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
99        let (client_end, server_end) = fidl::endpoints::create_endpoints();
100        let options = 0u32;
101        let status = dir
102            .watch(fio::WatchMask::all(), options, server_end)
103            .await
104            .map_err(WatcherCreateError::SendWatchRequest)?;
105        zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
106        let mut buf = MessageBuf::new();
107        buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
108        Ok(Watcher {
109            ch: fasync::Channel::from_channel(client_end.into_channel()),
110            buf,
111            idx: 0,
112            state: WatcherState::Watching,
113        })
114    }
115
116    fn reset_buf(&mut self) {
117        self.idx = 0;
118        self.buf.clear();
119    }
120
121    fn get_next_msg(&mut self) -> WatchMessage {
122        assert!(self.idx < self.buf.bytes().len());
123        let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
124            .expect("Invalid buffer received by Watcher!");
125        self.idx += next_msg.len();
126
127        let mut pathbuf = PathBuf::new();
128        pathbuf.push(OsStr::from_bytes(next_msg.name()));
129        let event = next_msg.event();
130        WatchMessage { event, filename: pathbuf }
131    }
132}
133
134impl FusedStream for Watcher {
135    fn is_terminated(&self) -> bool {
136        self.state == WatcherState::Terminated
137    }
138}
139
140impl Stream for Watcher {
141    type Item = Result<WatchMessage, WatcherStreamError>;
142
143    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144        let this = &mut *self;
145        // Once this stream has hit an error, it's likely unrecoverable at this level and should be
146        // closed. Clients can attempt to recover by creating a new Watcher.
147        if this.state == WatcherState::TerminateOnNextPoll {
148            this.state = WatcherState::Terminated;
149        }
150        if this.state == WatcherState::Terminated {
151            return Poll::Ready(None);
152        }
153        if this.idx >= this.buf.bytes().len() {
154            this.reset_buf();
155        }
156        if this.idx == 0 {
157            match this.ch.recv_from(cx, &mut this.buf) {
158                Poll::Ready(Ok(())) => {}
159                Poll::Ready(Err(e)) => {
160                    self.state = WatcherState::TerminateOnNextPoll;
161                    return Poll::Ready(Some(Err(e.into())));
162                }
163                Poll::Pending => return Poll::Pending,
164            }
165        }
166        Poll::Ready(Some(Ok(this.get_next_msg())))
167    }
168}
169
170#[repr(C)]
171#[derive(Default)]
172struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
173impl<T> IncompleteArrayField<T> {
174    #[inline]
175    pub unsafe fn as_ptr(&self) -> *const T {
176        ::std::mem::transmute(self)
177    }
178    #[inline]
179    pub unsafe fn as_slice(&self, len: usize) -> &[T] {
180        ::std::slice::from_raw_parts(self.as_ptr(), len)
181    }
182}
183impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
184    fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
185        fmt.write_str("IncompleteArrayField")
186    }
187}
188
189#[repr(C)]
190#[derive(Debug)]
191struct vfs_watch_msg_t {
192    event: fio::WatchEvent,
193    len: u8,
194    name: IncompleteArrayField<u8>,
195}
196
197#[derive(Debug)]
198struct VfsWatchMsg<'a> {
199    inner: &'a vfs_watch_msg_t,
200}
201
202impl<'a> VfsWatchMsg<'a> {
203    fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
204        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
205            return None;
206        }
207        // This is safe as long as the buffer is at least as large as a vfs_watch_msg_t, which we
208        // just verified. Further, we verify that the buffer has enough bytes to hold the
209        // "incomplete array field" member.
210        let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
211        if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
212            return None;
213        }
214        Some(m)
215    }
216
217    fn len(&self) -> usize {
218        ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
219    }
220
221    fn event(&self) -> WatchEvent {
222        WatchEvent(self.inner.event)
223    }
224
225    fn namelen(&self) -> usize {
226        self.inner.len as usize
227    }
228
229    fn name(&self) -> &'a [u8] {
230        // This is safe because we verified during construction that the inner name field has at
231        // least namelen() bytes in it.
232        unsafe { self.inner.name.as_slice(self.namelen()) }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use assert_matches::assert_matches;
240    use fuchsia_async::{DurationExt, TimeoutExt};
241
242    use futures::prelude::*;
243    use std::fmt::Debug;
244    use std::fs::File;
245    use std::path::Path;
246    use std::sync::Arc;
247    use tempfile::tempdir;
248    use vfs::directory::dirents_sink;
249    use vfs::directory::entry::{EntryInfo, GetEntryInfo};
250    use vfs::directory::entry_container::{Directory, DirectoryWatcher};
251    use vfs::directory::immutable::connection::ImmutableConnection;
252    use vfs::directory::traversal_position::TraversalPosition;
253    use vfs::execution_scope::ExecutionScope;
254    use vfs::node::Node;
255    use vfs::ObjectRequestRef;
256
257    fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
258    where
259        S: Stream<Item = Result<OK, ERR>> + Unpin,
260        ERR: Debug,
261    {
262        let f = s.next();
263        let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
264            panic!("timeout waiting for watcher")
265        });
266        f.map(|next| {
267            next.expect("the stream yielded no next item")
268                .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
269        })
270    }
271
272    #[fuchsia::test]
273    async fn test_existing() {
274        let tmp_dir = tempdir().unwrap();
275        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
276
277        let dir = crate::directory::open_in_namespace(
278            tmp_dir.path().to_str().unwrap(),
279            fio::PERM_READABLE,
280        )
281        .unwrap();
282        let mut w = Watcher::new(&dir).await.unwrap();
283
284        let msg = one_step(&mut w).await;
285        assert_eq!(WatchEvent::EXISTING, msg.event);
286        assert_eq!(Path::new("."), msg.filename);
287
288        let msg = one_step(&mut w).await;
289        assert_eq!(WatchEvent::EXISTING, msg.event);
290        assert_eq!(Path::new("file1"), msg.filename);
291
292        let msg = one_step(&mut w).await;
293        assert_eq!(WatchEvent::IDLE, msg.event);
294    }
295
296    #[fuchsia::test]
297    async fn test_add() {
298        let tmp_dir = tempdir().unwrap();
299
300        let dir = crate::directory::open_in_namespace(
301            tmp_dir.path().to_str().unwrap(),
302            fio::PERM_READABLE,
303        )
304        .unwrap();
305        let mut w = Watcher::new(&dir).await.unwrap();
306
307        loop {
308            let msg = one_step(&mut w).await;
309            match msg.event {
310                WatchEvent::EXISTING => continue,
311                WatchEvent::IDLE => break,
312                _ => panic!("Unexpected watch event!"),
313            }
314        }
315
316        let _ = File::create(tmp_dir.path().join("file1")).unwrap();
317        let msg = one_step(&mut w).await;
318        assert_eq!(WatchEvent::ADD_FILE, msg.event);
319        assert_eq!(Path::new("file1"), msg.filename);
320    }
321
322    #[fuchsia::test]
323    async fn test_remove() {
324        let tmp_dir = tempdir().unwrap();
325
326        let filename = "file1";
327        let filepath = tmp_dir.path().join(filename);
328        let _ = File::create(&filepath).unwrap();
329
330        let dir = crate::directory::open_in_namespace(
331            tmp_dir.path().to_str().unwrap(),
332            fio::PERM_READABLE,
333        )
334        .unwrap();
335        let mut w = Watcher::new(&dir).await.unwrap();
336
337        loop {
338            let msg = one_step(&mut w).await;
339            match msg.event {
340                WatchEvent::EXISTING => continue,
341                WatchEvent::IDLE => break,
342                _ => panic!("Unexpected watch event!"),
343            }
344        }
345
346        ::std::fs::remove_file(&filepath).unwrap();
347        let msg = one_step(&mut w).await;
348        assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
349        assert_eq!(Path::new(filename), msg.filename);
350    }
351
352    struct MockDirectory;
353
354    impl MockDirectory {
355        fn new() -> Arc<Self> {
356            Arc::new(Self)
357        }
358    }
359
360    impl GetEntryInfo for MockDirectory {
361        fn entry_info(&self) -> EntryInfo {
362            EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
363        }
364    }
365
366    impl Node for MockDirectory {
367        async fn get_attributes(
368            &self,
369            _query: fio::NodeAttributesQuery,
370        ) -> Result<fio::NodeAttributes2, zx::Status> {
371            unimplemented!();
372        }
373
374        fn close(self: Arc<Self>) {}
375    }
376
377    impl Directory for MockDirectory {
378        fn open(
379            self: Arc<Self>,
380            _scope: ExecutionScope,
381            _flags: fio::OpenFlags,
382            _path: vfs::path::Path,
383            _server_end: fidl::endpoints::ServerEnd<fio::NodeMarker>,
384        ) {
385            unimplemented!("Not implemented!");
386        }
387
388        fn open3(
389            self: Arc<Self>,
390            scope: ExecutionScope,
391            _path: vfs::path::Path,
392            flags: fio::Flags,
393            object_request: ObjectRequestRef<'_>,
394        ) -> Result<(), zx::Status> {
395            object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
396                scope,
397                self.clone(),
398                flags,
399            );
400            Ok(())
401        }
402
403        async fn read_dirents<'a>(
404            &'a self,
405            _pos: &'a TraversalPosition,
406            _sink: Box<dyn dirents_sink::Sink>,
407        ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
408            unimplemented!("Not implemented");
409        }
410
411        fn register_watcher(
412            self: Arc<Self>,
413            _scope: ExecutionScope,
414            _mask: fio::WatchMask,
415            _watcher: DirectoryWatcher,
416        ) -> Result<(), zx::Status> {
417            // Don't do anything, just throw out the watcher, which should close the channel, to
418            // generate a PEER_CLOSED error.
419            Ok(())
420        }
421
422        fn unregister_watcher(self: Arc<Self>, _key: usize) {
423            unimplemented!("Not implemented");
424        }
425    }
426
427    #[fuchsia::test]
428    async fn test_error() {
429        let test_dir = MockDirectory::new();
430        let client = vfs::directory::serve_read_only(test_dir);
431        let mut w = Watcher::new(&client).await.unwrap();
432        let msg = w.next().await.expect("the stream yielded no next item");
433        assert!(!w.is_terminated());
434        assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
435        assert!(!w.is_terminated());
436        assert_matches!(w.next().await, None);
437        assert!(w.is_terminated());
438    }
439}