vfs/directory/watchers/watcher.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A task that is run to process communication with an individual watcher.
6
7use crate::directory::entry_container::DirectoryWatcher;
8use crate::directory::watchers::event_producers::EventProducer;
9use crate::execution_scope::ExecutionScope;
10
11use fidl_fuchsia_io as fio;
12use futures::channel::mpsc::{self, UnboundedSender};
13use futures::{FutureExt, select};
14
15#[cfg(not(target_os = "fuchsia"))]
16use fuchsia_async::emulated_handle::MessageBuf;
17#[cfg(target_os = "fuchsia")]
18use zx::MessageBuf;
19
20#[derive(Clone)]
21pub struct Controller {
22    mask: fio::WatchMask,
23    messages: UnboundedSender<Vec<u8>>,
24}
25
26impl Controller {
27    /// `done` is not guaranteed to be called if the task failed to start.  It should only happen
28    /// in case the return value is an `Err`.  Unfortunately, there is no way to return the `done`
29    /// object itself, as the [`futures::Spawn::spawn_obj`] does not return the ownership in case
30    /// of a failure.
31    pub(crate) fn new(
32        scope: ExecutionScope,
33        mask: fio::WatchMask,
34        watcher: DirectoryWatcher,
35        done: impl FnOnce() + Send + 'static,
36    ) -> Controller {
37        use futures::StreamExt as _;
38
39        let (sender, mut receiver) = mpsc::unbounded::<Vec<u8>>();
40
41        let task = async move {
42            let _done = CallOnDrop(Some(done));
43            let mut buf = MessageBuf::new();
44            let mut recv_msg = watcher.channel().recv_msg(&mut buf).fuse();
45            loop {
46                select! {
47                    command = receiver.next() => match command {
48                        Some(message) => {
49                            let result = watcher.channel().write(&*message, &mut []);
50                            if result.is_err() {
51                                break;
52                            }
53                        },
54                        None => break,
55                    },
56                    _ = recv_msg => {
57                        // We do not expect any messages to be received over the watcher connection.
58                        // Should we receive a message we will close the connection to indicate an
59                        // error.  If any error occurs, we also close the connection.  And if the
60                        // connection is closed, we just stop the command processing as well.
61                        break;
62                    },
63                }
64            }
65        };
66
67        scope.spawn(task);
68        Controller { mask, messages: sender }
69    }
70
71    /// Sends a buffer to the connected watcher.  `mask` specifies the type of the event the buffer
72    /// is for.  If the watcher mask does not include the event specified by the `mask` then the
73    /// buffer is not sent and `buffer` is not even invoked.
74    pub(crate) fn send_buffer(&self, mask: fio::WatchMask, buffer: impl FnOnce() -> Vec<u8>) {
75        if !self.mask.intersects(mask) {
76            return;
77        }
78
79        if self.messages.unbounded_send(buffer()).is_ok() {
80            return;
81        }
82
83        // An error to send indicates the execution task has been disconnected.  Controller should
84        // always be removed from the watchers list before it is destroyed.  So this is some
85        // logical bug.
86        debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
87    }
88
89    /// Uses a `producer` to generate one or more buffers and send them all to the connected
90    /// watcher.  `producer.mask()` is used to determine the type of the event - in case the
91    /// watcher mask does not specify that it needs to receive this event, then the producer is not
92    /// used and `false` is returned.  If the producers mask and the watcher mask overlap, then
93    /// `true` is returned (even if the producer did not generate a single buffer).
94    pub fn send_event(&self, producer: &mut dyn EventProducer) -> bool {
95        if !self.mask.intersects(producer.mask()) {
96            return false;
97        }
98
99        while producer.prepare_for_next_buffer() {
100            let buffer = producer.buffer();
101            if self.messages.unbounded_send(buffer).is_ok() {
102                continue;
103            }
104
105            // An error to send indicates the execution task has been disconnected.  Controller
106            // should always be removed from the watchers list before it is destroyed.  So this is
107            // some logical bug.
108            debug_assert!(false, "Watcher controller failed to send a command to the watcher.");
109        }
110
111        return true;
112    }
113}
114
115/// Calls the function when this object is dropped.
116struct CallOnDrop<F: FnOnce()>(Option<F>);
117
118impl<F: FnOnce()> Drop for CallOnDrop<F> {
119    fn drop(&mut self) {
120        self.0.take().unwrap()();
121    }
122}