settings/ingress/
watch.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Watch request handling.
6//!
7//! This mod defines the components for handling hanging-get, or "watch", [Requests](Request). These
8//! requests return a value to the requester when a value different from the previously returned /
9//! value is available. This pattern is common across the various setting service interfaces.
10//! Since there is context involved between watch requests, these job workloads are [Sequential].
11//!
12//! Users of these components define three implementations to create "watch"-related jobs. First,
13//! implementations of [From<SettingInfo>] and [From<Error>] are needed. Since these requests will
14//! always return a value on success, the request handling automatically converts the [SettingInfo].
15//! The built-in conversion to the user type with the [From] trait implementation helps reduce the
16//! explicit conversion in the responding code. Lastly, the user must implement [Responder], which
17//! returns a [Result] converted from the [Response](crate::handler::base::Response) returned from
18//! the setting service.
19
20use crate::base::{SettingInfo, SettingType};
21use crate::handler::base::{Error, Payload, Request};
22use crate::job::data::{self, Data, Key};
23use crate::job::work::{Error as WorkError, Load, Sequential};
24use crate::job::{Job, Signature};
25use crate::message::base::Audience;
26use crate::message::receptor::Receptor;
27use crate::service::{Address, message};
28use crate::trace;
29use async_trait::async_trait;
30use fuchsia_trace as ftrace;
31use futures::FutureExt;
32use futures::channel::oneshot;
33use std::collections::HashMap;
34use std::marker::PhantomData;
35
36/// The key used to store the last value sent. This cache is scoped to the
37/// [Job's Signature](Signature).
38const LAST_VALUE_KEY: &str = "LAST_VALUE";
39
40/// A custom function used to compare an existing setting value with a new one to determine if
41/// listeners should be notified. If true is returned, listeners will be notified.
42pub(crate) struct ChangeFunction {
43    /// The function that will be used to evaluate whether or not a setting has changed.
44    #[allow(clippy::type_complexity)]
45    function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
46
47    /// An identifier for the change function that is used to group hanging gets. This identifier
48    /// should be the same for all change functions in a given sequence. For example, when a client
49    /// calls a setting API that allows a Watch method to take parameters, subsequent calls with the
50    /// same parameters should create ChangeFunctions with the same ID.
51    id: u64,
52}
53
54impl ChangeFunction {
55    #[allow(clippy::type_complexity)]
56    pub fn new(
57        id: u64,
58        function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
59    ) -> ChangeFunction {
60        ChangeFunction { function, id }
61    }
62}
63
64/// [Responder] is a trait for handing back results of a watch request. It is unique from other
65/// work responders, since [Work] consumers expect a value to be present on success. The Responder
66/// specifies the conversions for [Response](crate::handler::base::Response).
67pub trait Responder<R: From<SettingInfo>, E: From<Error>> {
68    fn respond(self, response: Result<R, E>);
69}
70
71pub struct Work<R: From<SettingInfo>, E: From<Error>, T: Responder<R, E>> {
72    setting_type: SettingType,
73    signature: Signature,
74    responder: T,
75    cancelation_rx: oneshot::Receiver<()>,
76    change_function: Option<ChangeFunction>,
77    _response_type: PhantomData<R>,
78    _error_type: PhantomData<E>,
79}
80
81impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
82    Work<R, E, T>
83{
84    #[allow(dead_code)]
85    fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
86    where
87        T: 'static,
88    {
89        Self {
90            setting_type,
91            signature: Signature::new::<T>(),
92            responder,
93            cancelation_rx,
94            change_function: None,
95            _response_type: PhantomData,
96            _error_type: PhantomData,
97        }
98    }
99
100    #[allow(dead_code)]
101    pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
102    where
103        T: 'static,
104    {
105        let (cancelation_tx, cancelation_rx) = oneshot::channel();
106        let work = Self::new(setting_type, responder, cancelation_rx);
107        Job::from((work, cancelation_tx))
108    }
109
110    pub(crate) fn new_job_with_change_function(
111        setting_type: SettingType,
112        responder: T,
113        change_function: ChangeFunction,
114    ) -> Job
115    where
116        T: 'static,
117    {
118        let (cancelation_tx, cancelation_rx) = oneshot::channel();
119        let work =
120            Self::with_change_function(setting_type, responder, cancelation_rx, change_function);
121        Job::from((work, cancelation_tx))
122    }
123
124    pub(crate) fn with_change_function(
125        setting_type: SettingType,
126        responder: T,
127        cancelation_rx: oneshot::Receiver<()>,
128        change_function: ChangeFunction,
129    ) -> Self {
130        Self {
131            setting_type,
132            signature: Signature::with::<T>(change_function.id),
133            responder,
134            cancelation_rx,
135            change_function: Some(change_function),
136            _response_type: PhantomData,
137            _error_type: PhantomData,
138        }
139    }
140
141    async fn get_next(
142        &mut self,
143        receptor: &mut Receptor,
144    ) -> Result<Result<Payload, anyhow::Error>, WorkError> {
145        let receptor = receptor.next_of::<Payload>().fuse();
146        let mut cancelation_rx = &mut self.cancelation_rx;
147        futures::pin_mut!(receptor);
148        futures::select! {
149            result = receptor => Ok(result.map(|(payload, _)| payload)),
150            _ = cancelation_rx => Err(WorkError::Canceled),
151        }
152    }
153
154    /// Returns a non-empty value when the last response should be returned to the caller. The lack
155    /// of a response indicates the watched value has not changed and watching will continue.
156    fn process_response(
157        &self,
158        response: Result<Payload, anyhow::Error>,
159        store: &mut HashMap<Key, Data>,
160    ) -> Option<Result<SettingInfo, Error>> {
161        match response {
162            Ok(Payload::Response(Ok(Some(setting_info)))) => {
163                let key = Key::Identifier(LAST_VALUE_KEY);
164
165                let return_val = match (store.get(&key), self.change_function.as_ref()) {
166                    // Apply the change function to determine if we should notify listeners.
167                    (Some(Data::SettingInfo(info)), Some(change_function))
168                        if !(change_function.function)(info, &setting_info) =>
169                    {
170                        None
171                    }
172                    // No change function used, compare the new info with the old.
173                    (Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
174                    _ => Some(Ok(setting_info)),
175                };
176
177                if let Some(Ok(ref info)) = return_val {
178                    let _ = store.insert(key, Data::SettingInfo(info.clone()));
179                }
180
181                return_val
182            }
183            Ok(Payload::Response(Err(error))) => Some(Err(error)),
184            Err(error) => {
185                log::warn!("An error occurred while watching {:?}:{:?}", self.setting_type, error);
186                Some(Err(match error.root_cause().downcast_ref::<Error>() {
187                    Some(error) => error.clone(),
188                    _ => crate::handler::base::Error::CommunicationError,
189                }))
190            }
191            _ => {
192                panic!("invalid variant {response:?}");
193            }
194        }
195    }
196}
197
198#[async_trait(?Send)]
199impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
200    Sequential for Work<R, E, T>
201{
202    async fn execute(
203        mut self: Box<Self>,
204        messenger: message::Messenger,
205        store_handle: data::StoreHandle,
206        id: ftrace::Id,
207    ) -> Result<(), WorkError> {
208        trace!(id, c"Sequential Work execute");
209        // Lock store for Job signature group.
210        let mut store = store_handle.lock().await;
211
212        // Begin listening for changes before fetching current value to ensure no changes are
213        // missed.
214        let mut listen_receptor = messenger.message(
215            Payload::Request(Request::Listen).into(),
216            Audience::Address(Address::Handler(self.setting_type)),
217        );
218
219        // Get current value.
220        let mut get_receptor = messenger.message(
221            Payload::Request(Request::Get).into(),
222            Audience::Address(Address::Handler(self.setting_type)),
223        );
224
225        // If a value was returned from the get call and considered updated (no existing or
226        // different), return new value immediately.
227        trace!(id, c"Get first response");
228        let next_payload = self.get_next(&mut get_receptor).await?;
229        if let Some(response) = self.process_response(next_payload, &mut store) {
230            self.responder.respond(response.map(R::from).map_err(|err| {
231                log::error!("First watch response has an error: {:?}", err);
232                E::from(err)
233            }));
234            return Ok(());
235        }
236
237        // Otherwise, loop a watch until an updated value is available
238        loop {
239            trace!(id, c"Get looped response");
240            let next_payload = self.get_next(&mut listen_receptor).await?;
241            if let Some(response) = self.process_response(next_payload, &mut store) {
242                self.responder.respond(response.map(R::from).map_err(|err| {
243                    log::error!("Updated watch response has an error: {:?}", err);
244                    E::from(err)
245                }));
246                return Ok(());
247            }
248        }
249    }
250}
251
252impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
253    From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
254{
255    fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
256        let signature = work.signature;
257        Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::base::UnknownInfo;
265    use crate::message::base::MessengerType;
266    use crate::service::MessageHub;
267    use assert_matches::assert_matches;
268    use fuchsia_async as fasync;
269    use futures::channel::oneshot::Sender;
270    use futures::lock::Mutex;
271    use std::rc::Rc;
272
273    struct TestResponder {
274        sender: Sender<Result<SettingInfo, Error>>,
275    }
276
277    impl TestResponder {
278        pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
279            Self { sender }
280        }
281    }
282
283    impl Responder<SettingInfo, Error> for TestResponder {
284        fn respond(self, response: Result<SettingInfo, Error>) {
285            self.sender.send(response).expect("send should succeed");
286        }
287    }
288
289    #[fuchsia::test(allow_stalls = false)]
290    async fn test_watch_basic_functionality() {
291        // Create store for job.
292        let store_handle = Rc::new(Mutex::new(HashMap::new()));
293
294        let get_info = SettingInfo::Unknown(UnknownInfo(true));
295        let listen_info = SettingInfo::Unknown(UnknownInfo(false));
296
297        // Make sure the first job execution returns the initial value (retrieved through get).
298        verify_watch(
299            store_handle.clone(),
300            listen_info.clone(),
301            get_info.clone(),
302            get_info.clone(),
303            None,
304        )
305        .await;
306        // Make sure the second job execution returns the value returned through watching (listen
307        // value).
308        verify_watch(
309            store_handle.clone(),
310            listen_info.clone(),
311            get_info.clone(),
312            listen_info.clone(),
313            None,
314        )
315        .await;
316    }
317
318    async fn verify_watch(
319        store_handle: data::StoreHandle,
320        listen_info: SettingInfo,
321        get_info: SettingInfo,
322        expected_info: SettingInfo,
323        change_function: Option<ChangeFunction>,
324    ) {
325        // Create MessageHub for communication between components.
326        let message_hub_delegate = MessageHub::create_hub();
327
328        // Create mock handler endpoint to receive request.
329        let mut handler_receiver = message_hub_delegate
330            .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
331            .await
332            .expect("handler messenger should be created")
333            .1;
334
335        let (response_tx, response_rx) =
336            futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
337        let (_cancelation_tx, cancelation_rx) = oneshot::channel();
338
339        let work = match change_function {
340            None => Box::new(Work::new(
341                SettingType::Unknown,
342                TestResponder::new(response_tx),
343                cancelation_rx,
344            )),
345            Some(change_function) => Box::new(Work::with_change_function(
346                SettingType::Unknown,
347                TestResponder::new(response_tx),
348                cancelation_rx,
349                change_function,
350            )),
351        };
352
353        // Execute work on async task.
354        let work_messenger = message_hub_delegate
355            .create(MessengerType::Unbound)
356            .await
357            .expect("messenger should be created")
358            .0;
359
360        let work_messenger_signature = work_messenger.get_signature();
361        fasync::Task::local(async move {
362            let _ = work.execute(work_messenger, store_handle, 0.into()).await;
363        })
364        .detach();
365
366        // Ensure the listen request is received from the right sender.
367        let (listen_request, listen_client) = handler_receiver
368            .next_of::<Payload>()
369            .await
370            .expect("should successfully receive a listen request");
371        assert_matches!(listen_request, Payload::Request(Request::Listen));
372        assert!(listen_client.get_author() == work_messenger_signature);
373
374        // Listen should be followed by a get request.
375        let (get_request, get_client) = handler_receiver
376            .next_of::<Payload>()
377            .await
378            .expect("should successfully receive a get request");
379        assert_matches!(get_request, Payload::Request(Request::Get));
380        assert!(get_client.get_author() == work_messenger_signature);
381
382        // Reply to the get request.
383        let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into());
384        let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into());
385
386        assert_matches!(response_rx.await.expect("should receive successful response"),
387                Ok(x) if x == expected_info);
388    }
389
390    // This test verifies that custom change functions work by using a custom change function that
391    // always says a new value is different, even if the actual value is unchanged.
392    #[fuchsia::test(allow_stalls = false)]
393    async fn test_custom_change_function() {
394        // Create store for job.
395        let store_handle = Rc::new(Mutex::new(HashMap::new()));
396
397        // Pre-fill the storage with the value so that the initial get will not trigger a response.
398        let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
399        let _ = store_handle
400            .lock()
401            .await
402            .insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
403
404        verify_watch(
405            store_handle,
406            // Send the same value on both the get and listen requests so that the default change
407            // function would not trigger a response to the client.
408            unchanged_info.clone(),
409            unchanged_info.clone(),
410            unchanged_info,
411            // Use a custom change function that always reports a change.
412            Some(ChangeFunction::new(
413                0,
414                Box::new(move |_old: &SettingInfo, _new: &SettingInfo| true),
415            )),
416        )
417        .await;
418    }
419
420    #[fuchsia::test(allow_stalls = false)]
421    async fn test_error_propagation() {
422        // Create MessageHub for communication between components.
423        let message_hub_delegate = MessageHub::create_hub();
424
425        let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
426
427        let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
428        // Create a listen request to a non-existent end-point.
429        let work = Box::new(Work::new(
430            SettingType::Unknown,
431            TestResponder::new(response_tx),
432            cancelation_rx,
433        ));
434
435        let work_messenger = message_hub_delegate
436            .create(MessengerType::Unbound)
437            .await
438            .expect("messenger should be created")
439            .0;
440
441        // Execute work on async task.
442        fasync::Task::local(async move {
443            let _ =
444                work.execute(work_messenger, Rc::new(Mutex::new(HashMap::new())), 0.into()).await;
445        })
446        .detach();
447
448        // Ensure an error is returned by the executed work.
449        assert_matches!(response_rx.await.expect("should receive successful response"),
450                Err(x) if x == crate::handler::base::Error::CommunicationError);
451    }
452}