1#![deny(missing_docs)]
8
9use futures::stream::{FusedStream, Stream};
10use std::ffi::OsStr;
11use std::os::unix::ffi::OsStrExt;
12use std::path::PathBuf;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use thiserror::Error;
16use zx_status::assoc_values;
17use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
18
19#[cfg(target_os = "fuchsia")]
20use zx::MessageBuf;
21
22#[cfg(not(target_os = "fuchsia"))]
23use fasync::emulated_handle::MessageBuf;
24
25#[derive(Debug, Error, Clone)]
26#[allow(missing_docs)]
27pub enum WatcherCreateError {
28 #[error("while sending watch request: {0}")]
29 SendWatchRequest(#[source] fidl::Error),
30
31 #[error("watch failed with status: {0}")]
32 WatchError(#[source] zx_status::Status),
33
34 #[error("while converting client end to fasync channel: {0}")]
35 ChannelConversion(#[source] zx_status::Status),
36}
37
38#[derive(Debug, Error, Eq, PartialEq)]
39#[allow(missing_docs)]
40pub enum WatcherStreamError {
41 #[error("read from watch channel failed with status: {0}")]
42 ChannelRead(#[from] zx_status::Status),
43}
44
45#[repr(C)]
47#[derive(Copy, Clone, Eq, PartialEq)]
48pub struct WatchEvent(fio::WatchEvent);
49
50assoc_values!(WatchEvent, [
51 DELETED = fio::WatchEvent::Deleted;
54 ADD_FILE = fio::WatchEvent::Added;
56 REMOVE_FILE = fio::WatchEvent::Removed;
58 EXISTING = fio::WatchEvent::Existing;
60 IDLE = fio::WatchEvent::Idle;
62]);
63
64#[derive(Debug, Eq, PartialEq)]
67pub struct WatchMessage {
68 pub event: WatchEvent,
70 pub filename: PathBuf,
72}
73
74#[derive(Debug, Eq, PartialEq)]
75enum WatcherState {
76 Watching,
77 TerminateOnNextPoll,
78 Terminated,
79}
80
81#[derive(Debug)]
85#[must_use = "futures/streams must be polled"]
86pub struct Watcher {
87 ch: fasync::Channel,
88 buf: MessageBuf,
90 idx: usize,
91 state: WatcherState,
92}
93
94impl Unpin for Watcher {}
95
96impl Watcher {
97 pub async fn new(dir: &fio::DirectoryProxy) -> Result<Watcher, WatcherCreateError> {
99 let (client_end, server_end) = fidl::endpoints::create_endpoints();
100 let options = 0u32;
101 let status = dir
102 .watch(fio::WatchMask::all(), options, server_end)
103 .await
104 .map_err(WatcherCreateError::SendWatchRequest)?;
105 zx_status::Status::ok(status).map_err(WatcherCreateError::WatchError)?;
106 let mut buf = MessageBuf::new();
107 buf.ensure_capacity_bytes(fio::MAX_BUF as usize);
108 Ok(Watcher {
109 ch: fasync::Channel::from_channel(client_end.into_channel()),
110 buf,
111 idx: 0,
112 state: WatcherState::Watching,
113 })
114 }
115
116 fn reset_buf(&mut self) {
117 self.idx = 0;
118 self.buf.clear();
119 }
120
121 fn get_next_msg(&mut self) -> WatchMessage {
122 assert!(self.idx < self.buf.bytes().len());
123 let next_msg = VfsWatchMsg::from_raw(&self.buf.bytes()[self.idx..])
124 .expect("Invalid buffer received by Watcher!");
125 self.idx += next_msg.len();
126
127 let mut pathbuf = PathBuf::new();
128 pathbuf.push(OsStr::from_bytes(next_msg.name()));
129 let event = next_msg.event();
130 WatchMessage { event, filename: pathbuf }
131 }
132}
133
134impl FusedStream for Watcher {
135 fn is_terminated(&self) -> bool {
136 self.state == WatcherState::Terminated
137 }
138}
139
140impl Stream for Watcher {
141 type Item = Result<WatchMessage, WatcherStreamError>;
142
143 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 let this = &mut *self;
145 if this.state == WatcherState::TerminateOnNextPoll {
148 this.state = WatcherState::Terminated;
149 }
150 if this.state == WatcherState::Terminated {
151 return Poll::Ready(None);
152 }
153 if this.idx >= this.buf.bytes().len() {
154 this.reset_buf();
155 }
156 if this.idx == 0 {
157 match this.ch.recv_from(cx, &mut this.buf) {
158 Poll::Ready(Ok(())) => {}
159 Poll::Ready(Err(e)) => {
160 self.state = WatcherState::TerminateOnNextPoll;
161 return Poll::Ready(Some(Err(e.into())));
162 }
163 Poll::Pending => return Poll::Pending,
164 }
165 }
166 Poll::Ready(Some(Ok(this.get_next_msg())))
167 }
168}
169
170#[repr(C)]
171#[derive(Default)]
172struct IncompleteArrayField<T>(::std::marker::PhantomData<T>);
173impl<T> IncompleteArrayField<T> {
174 #[inline]
175 pub unsafe fn as_ptr(&self) -> *const T {
176 ::std::mem::transmute(self)
177 }
178 #[inline]
179 pub unsafe fn as_slice(&self, len: usize) -> &[T] {
180 ::std::slice::from_raw_parts(self.as_ptr(), len)
181 }
182}
183impl<T> ::std::fmt::Debug for IncompleteArrayField<T> {
184 fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
185 fmt.write_str("IncompleteArrayField")
186 }
187}
188
189#[repr(C)]
190#[derive(Debug)]
191struct vfs_watch_msg_t {
192 event: fio::WatchEvent,
193 len: u8,
194 name: IncompleteArrayField<u8>,
195}
196
197#[derive(Debug)]
198struct VfsWatchMsg<'a> {
199 inner: &'a vfs_watch_msg_t,
200}
201
202impl<'a> VfsWatchMsg<'a> {
203 fn from_raw(buf: &'a [u8]) -> Option<VfsWatchMsg<'a>> {
204 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() {
205 return None;
206 }
207 let m = unsafe { VfsWatchMsg { inner: &*(buf.as_ptr() as *const vfs_watch_msg_t) } };
211 if buf.len() < ::std::mem::size_of::<vfs_watch_msg_t>() + m.namelen() {
212 return None;
213 }
214 Some(m)
215 }
216
217 fn len(&self) -> usize {
218 ::std::mem::size_of::<vfs_watch_msg_t>() + self.namelen()
219 }
220
221 fn event(&self) -> WatchEvent {
222 WatchEvent(self.inner.event)
223 }
224
225 fn namelen(&self) -> usize {
226 self.inner.len as usize
227 }
228
229 fn name(&self) -> &'a [u8] {
230 unsafe { self.inner.name.as_slice(self.namelen()) }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use assert_matches::assert_matches;
240 use fuchsia_async::{DurationExt, TimeoutExt};
241
242 use futures::prelude::*;
243 use std::fmt::Debug;
244 use std::fs::File;
245 use std::path::Path;
246 use std::sync::Arc;
247 use tempfile::tempdir;
248 use vfs::directory::dirents_sink;
249 use vfs::directory::entry::{EntryInfo, GetEntryInfo};
250 use vfs::directory::entry_container::{Directory, DirectoryWatcher};
251 use vfs::directory::immutable::connection::ImmutableConnection;
252 use vfs::directory::traversal_position::TraversalPosition;
253 use vfs::execution_scope::ExecutionScope;
254 use vfs::node::Node;
255 use vfs::ObjectRequestRef;
256
257 fn one_step<'a, S, OK, ERR>(s: &'a mut S) -> impl Future<Output = OK> + 'a
258 where
259 S: Stream<Item = Result<OK, ERR>> + Unpin,
260 ERR: Debug,
261 {
262 let f = s.next();
263 let f = f.on_timeout(zx::MonotonicDuration::from_millis(500).after_now(), || {
264 panic!("timeout waiting for watcher")
265 });
266 f.map(|next| {
267 next.expect("the stream yielded no next item")
268 .unwrap_or_else(|e| panic!("Error waiting for watcher: {:?}", e))
269 })
270 }
271
272 #[fuchsia::test]
273 async fn test_existing() {
274 let tmp_dir = tempdir().unwrap();
275 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
276
277 let dir = crate::directory::open_in_namespace(
278 tmp_dir.path().to_str().unwrap(),
279 fio::PERM_READABLE,
280 )
281 .unwrap();
282 let mut w = Watcher::new(&dir).await.unwrap();
283
284 let msg = one_step(&mut w).await;
285 assert_eq!(WatchEvent::EXISTING, msg.event);
286 assert_eq!(Path::new("."), msg.filename);
287
288 let msg = one_step(&mut w).await;
289 assert_eq!(WatchEvent::EXISTING, msg.event);
290 assert_eq!(Path::new("file1"), msg.filename);
291
292 let msg = one_step(&mut w).await;
293 assert_eq!(WatchEvent::IDLE, msg.event);
294 }
295
296 #[fuchsia::test]
297 async fn test_add() {
298 let tmp_dir = tempdir().unwrap();
299
300 let dir = crate::directory::open_in_namespace(
301 tmp_dir.path().to_str().unwrap(),
302 fio::PERM_READABLE,
303 )
304 .unwrap();
305 let mut w = Watcher::new(&dir).await.unwrap();
306
307 loop {
308 let msg = one_step(&mut w).await;
309 match msg.event {
310 WatchEvent::EXISTING => continue,
311 WatchEvent::IDLE => break,
312 _ => panic!("Unexpected watch event!"),
313 }
314 }
315
316 let _ = File::create(tmp_dir.path().join("file1")).unwrap();
317 let msg = one_step(&mut w).await;
318 assert_eq!(WatchEvent::ADD_FILE, msg.event);
319 assert_eq!(Path::new("file1"), msg.filename);
320 }
321
322 #[fuchsia::test]
323 async fn test_remove() {
324 let tmp_dir = tempdir().unwrap();
325
326 let filename = "file1";
327 let filepath = tmp_dir.path().join(filename);
328 let _ = File::create(&filepath).unwrap();
329
330 let dir = crate::directory::open_in_namespace(
331 tmp_dir.path().to_str().unwrap(),
332 fio::PERM_READABLE,
333 )
334 .unwrap();
335 let mut w = Watcher::new(&dir).await.unwrap();
336
337 loop {
338 let msg = one_step(&mut w).await;
339 match msg.event {
340 WatchEvent::EXISTING => continue,
341 WatchEvent::IDLE => break,
342 _ => panic!("Unexpected watch event!"),
343 }
344 }
345
346 ::std::fs::remove_file(&filepath).unwrap();
347 let msg = one_step(&mut w).await;
348 assert_eq!(WatchEvent::REMOVE_FILE, msg.event);
349 assert_eq!(Path::new(filename), msg.filename);
350 }
351
352 struct MockDirectory;
353
354 impl MockDirectory {
355 fn new() -> Arc<Self> {
356 Arc::new(Self)
357 }
358 }
359
360 impl GetEntryInfo for MockDirectory {
361 fn entry_info(&self) -> EntryInfo {
362 EntryInfo::new(fio::INO_UNKNOWN, fio::DirentType::Directory)
363 }
364 }
365
366 impl Node for MockDirectory {
367 async fn get_attributes(
368 &self,
369 _query: fio::NodeAttributesQuery,
370 ) -> Result<fio::NodeAttributes2, zx::Status> {
371 unimplemented!();
372 }
373
374 fn close(self: Arc<Self>) {}
375 }
376
377 impl Directory for MockDirectory {
378 fn open(
379 self: Arc<Self>,
380 _scope: ExecutionScope,
381 _flags: fio::OpenFlags,
382 _path: vfs::path::Path,
383 _server_end: fidl::endpoints::ServerEnd<fio::NodeMarker>,
384 ) {
385 unimplemented!("Not implemented!");
386 }
387
388 fn open3(
389 self: Arc<Self>,
390 scope: ExecutionScope,
391 _path: vfs::path::Path,
392 flags: fio::Flags,
393 object_request: ObjectRequestRef<'_>,
394 ) -> Result<(), zx::Status> {
395 object_request.take().create_connection_sync::<ImmutableConnection<_>, _>(
396 scope,
397 self.clone(),
398 flags,
399 );
400 Ok(())
401 }
402
403 async fn read_dirents<'a>(
404 &'a self,
405 _pos: &'a TraversalPosition,
406 _sink: Box<dyn dirents_sink::Sink>,
407 ) -> Result<(TraversalPosition, Box<dyn dirents_sink::Sealed>), zx::Status> {
408 unimplemented!("Not implemented");
409 }
410
411 fn register_watcher(
412 self: Arc<Self>,
413 _scope: ExecutionScope,
414 _mask: fio::WatchMask,
415 _watcher: DirectoryWatcher,
416 ) -> Result<(), zx::Status> {
417 Ok(())
420 }
421
422 fn unregister_watcher(self: Arc<Self>, _key: usize) {
423 unimplemented!("Not implemented");
424 }
425 }
426
427 #[fuchsia::test]
428 async fn test_error() {
429 let test_dir = MockDirectory::new();
430 let client = vfs::directory::serve_read_only(test_dir);
431 let mut w = Watcher::new(&client).await.unwrap();
432 let msg = w.next().await.expect("the stream yielded no next item");
433 assert!(!w.is_terminated());
434 assert_matches!(msg, Err(WatcherStreamError::ChannelRead(zx::Status::PEER_CLOSED)));
435 assert!(!w.is_terminated());
436 assert_matches!(w.next().await, None);
437 assert!(w.is_terminated());
438 }
439}