settings/ingress/
watch.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Watch request handling.
6//!
7//! This mod defines the components for handling hanging-get, or "watch", [Requests](Request). These
8//! requests return a value to the requester when a value different from the previously returned /
9//! value is available. This pattern is common across the various setting service interfaces.
10//! Since there is context involved between watch requests, these job workloads are [Sequential].
11//!
12//! Users of these components define three implementations to create "watch"-related jobs. First,
13//! implementations of [From<SettingInfo>] and [From<Error>] are needed. Since these requests will
14//! always return a value on success, the request handling automatically converts the [SettingInfo].
15//! The built-in conversion to the user type with the [From] trait implementation helps reduce the
16//! explicit conversion in the responding code. Lastly, the user must implement [Responder], which
17//! returns a [Result] converted from the [Response](crate::handler::base::Response) returned from
18//! the setting service.
19
20use crate::base::{SettingInfo, SettingType};
21use crate::handler::base::{Error, Payload, Request};
22use crate::job::data::{self, Data, Key};
23use crate::job::work::{Error as WorkError, Load, Sequential};
24use crate::job::{Job, Signature};
25use crate::message::base::Audience;
26use crate::message::receptor::Receptor;
27use crate::service::{message, Address};
28use crate::trace;
29use async_trait::async_trait;
30use fuchsia_trace as ftrace;
31use futures::channel::oneshot;
32use futures::FutureExt;
33use std::collections::HashMap;
34use std::marker::PhantomData;
35
36/// The key used to store the last value sent. This cache is scoped to the
37/// [Job's Signature](Signature).
38const LAST_VALUE_KEY: &str = "LAST_VALUE";
39
40/// A custom function used to compare an existing setting value with a new one to determine if
41/// listeners should be notified. If true is returned, listeners will be notified.
42pub(crate) struct ChangeFunction {
43    /// The function that will be used to evaluate whether or not a setting has changed.
44    #[allow(clippy::type_complexity)]
45    function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
46
47    /// An identifier for the change function that is used to group hanging gets. This identifier
48    /// should be the same for all change functions in a given sequence. For example, when a client
49    /// calls a setting API that allows a Watch method to take parameters, subsequent calls with the
50    /// same parameters should create ChangeFunctions with the same ID.
51    id: u64,
52}
53
54impl ChangeFunction {
55    #[allow(clippy::type_complexity)]
56    pub fn new(
57        id: u64,
58        function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
59    ) -> ChangeFunction {
60        ChangeFunction { function, id }
61    }
62}
63
64/// [Responder] is a trait for handing back results of a watch request. It is unique from other
65/// work responders, since [Work] consumers expect a value to be present on success. The Responder
66/// specifies the conversions for [Response](crate::handler::base::Response).
67pub trait Responder<R: From<SettingInfo>, E: From<Error>> {
68    fn respond(self, response: Result<R, E>);
69}
70
71pub struct Work<R: From<SettingInfo>, E: From<Error>, T: Responder<R, E>> {
72    setting_type: SettingType,
73    signature: Signature,
74    responder: T,
75    cancelation_rx: oneshot::Receiver<()>,
76    change_function: Option<ChangeFunction>,
77    _response_type: PhantomData<R>,
78    _error_type: PhantomData<E>,
79}
80
81impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
82    Work<R, E, T>
83{
84    fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
85    where
86        T: 'static,
87    {
88        Self {
89            setting_type,
90            signature: Signature::new::<T>(),
91            responder,
92            cancelation_rx,
93            change_function: None,
94            _response_type: PhantomData,
95            _error_type: PhantomData,
96        }
97    }
98
99    pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
100    where
101        T: 'static,
102    {
103        let (cancelation_tx, cancelation_rx) = oneshot::channel();
104        let work = Self::new(setting_type, responder, cancelation_rx);
105        Job::from((work, cancelation_tx))
106    }
107
108    pub(crate) fn new_job_with_change_function(
109        setting_type: SettingType,
110        responder: T,
111        change_function: ChangeFunction,
112    ) -> Job
113    where
114        T: 'static,
115    {
116        let (cancelation_tx, cancelation_rx) = oneshot::channel();
117        let work =
118            Self::with_change_function(setting_type, responder, cancelation_rx, change_function);
119        Job::from((work, cancelation_tx))
120    }
121
122    pub(crate) fn with_change_function(
123        setting_type: SettingType,
124        responder: T,
125        cancelation_rx: oneshot::Receiver<()>,
126        change_function: ChangeFunction,
127    ) -> Self {
128        Self {
129            setting_type,
130            signature: Signature::with::<T>(change_function.id),
131            responder,
132            cancelation_rx,
133            change_function: Some(change_function),
134            _response_type: PhantomData,
135            _error_type: PhantomData,
136        }
137    }
138
139    async fn get_next(
140        &mut self,
141        receptor: &mut Receptor,
142    ) -> Result<Result<Payload, anyhow::Error>, WorkError> {
143        let receptor = receptor.next_of::<Payload>().fuse();
144        let mut cancelation_rx = &mut self.cancelation_rx;
145        futures::pin_mut!(receptor);
146        futures::select! {
147            result = receptor => Ok(result.map(|(payload, _)| payload)),
148            _ = cancelation_rx => Err(WorkError::Canceled),
149        }
150    }
151
152    /// Returns a non-empty value when the last response should be returned to the caller. The lack
153    /// of a response indicates the watched value has not changed and watching will continue.
154    fn process_response(
155        &self,
156        response: Result<Payload, anyhow::Error>,
157        store: &mut HashMap<Key, Data>,
158    ) -> Option<Result<SettingInfo, Error>> {
159        match response {
160            Ok(Payload::Response(Ok(Some(setting_info)))) => {
161                let key = Key::Identifier(LAST_VALUE_KEY);
162
163                let return_val = match (store.get(&key), self.change_function.as_ref()) {
164                    // Apply the change function to determine if we should notify listeners.
165                    (Some(Data::SettingInfo(info)), Some(change_function))
166                        if !(change_function.function)(info, &setting_info) =>
167                    {
168                        None
169                    }
170                    // No change function used, compare the new info with the old.
171                    (Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
172                    _ => Some(Ok(setting_info)),
173                };
174
175                if let Some(Ok(ref info)) = return_val {
176                    let _ = store.insert(key, Data::SettingInfo(info.clone()));
177                }
178
179                return_val
180            }
181            Ok(Payload::Response(Err(error))) => Some(Err(error)),
182            Err(error) => {
183                log::warn!("An error occurred while watching {:?}:{:?}", self.setting_type, error);
184                Some(Err(match error.root_cause().downcast_ref::<Error>() {
185                    Some(error) => error.clone(),
186                    _ => crate::handler::base::Error::CommunicationError,
187                }))
188            }
189            _ => {
190                panic!("invalid variant {response:?}");
191            }
192        }
193    }
194}
195
196#[async_trait(?Send)]
197impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
198    Sequential for Work<R, E, T>
199{
200    async fn execute(
201        mut self: Box<Self>,
202        messenger: message::Messenger,
203        store_handle: data::StoreHandle,
204        id: ftrace::Id,
205    ) -> Result<(), WorkError> {
206        trace!(id, c"Sequential Work execute");
207        // Lock store for Job signature group.
208        let mut store = store_handle.lock().await;
209
210        // Begin listening for changes before fetching current value to ensure no changes are
211        // missed.
212        let mut listen_receptor = messenger.message(
213            Payload::Request(Request::Listen).into(),
214            Audience::Address(Address::Handler(self.setting_type)),
215        );
216
217        // Get current value.
218        let mut get_receptor = messenger.message(
219            Payload::Request(Request::Get).into(),
220            Audience::Address(Address::Handler(self.setting_type)),
221        );
222
223        // If a value was returned from the get call and considered updated (no existing or
224        // different), return new value immediately.
225        trace!(id, c"Get first response");
226        let next_payload = self.get_next(&mut get_receptor).await?;
227        if let Some(response) = self.process_response(next_payload, &mut store) {
228            self.responder.respond(response.map(R::from).map_err(|err| {
229                log::error!("First watch response has an error: {:?}", err);
230                E::from(err)
231            }));
232            return Ok(());
233        }
234
235        // Otherwise, loop a watch until an updated value is available
236        loop {
237            trace!(id, c"Get looped response");
238            let next_payload = self.get_next(&mut listen_receptor).await?;
239            if let Some(response) = self.process_response(next_payload, &mut store) {
240                self.responder.respond(response.map(R::from).map_err(|err| {
241                    log::error!("Updated watch response has an error: {:?}", err);
242                    E::from(err)
243                }));
244                return Ok(());
245            }
246        }
247    }
248}
249
250impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
251    From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
252{
253    fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
254        let signature = work.signature;
255        Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::base::UnknownInfo;
263    use crate::message::base::MessengerType;
264    use crate::service::MessageHub;
265    use assert_matches::assert_matches;
266    use fuchsia_async as fasync;
267    use futures::channel::oneshot::Sender;
268    use futures::lock::Mutex;
269    use std::rc::Rc;
270
271    struct TestResponder {
272        sender: Sender<Result<SettingInfo, Error>>,
273    }
274
275    impl TestResponder {
276        pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
277            Self { sender }
278        }
279    }
280
281    impl Responder<SettingInfo, Error> for TestResponder {
282        fn respond(self, response: Result<SettingInfo, Error>) {
283            self.sender.send(response).expect("send should succeed");
284        }
285    }
286
287    #[fuchsia::test(allow_stalls = false)]
288    async fn test_watch_basic_functionality() {
289        // Create store for job.
290        let store_handle = Rc::new(Mutex::new(HashMap::new()));
291
292        let get_info = SettingInfo::Unknown(UnknownInfo(true));
293        let listen_info = SettingInfo::Unknown(UnknownInfo(false));
294
295        // Make sure the first job execution returns the initial value (retrieved through get).
296        verify_watch(
297            store_handle.clone(),
298            listen_info.clone(),
299            get_info.clone(),
300            get_info.clone(),
301            None,
302        )
303        .await;
304        // Make sure the second job execution returns the value returned through watching (listen
305        // value).
306        verify_watch(
307            store_handle.clone(),
308            listen_info.clone(),
309            get_info.clone(),
310            listen_info.clone(),
311            None,
312        )
313        .await;
314    }
315
316    async fn verify_watch(
317        store_handle: data::StoreHandle,
318        listen_info: SettingInfo,
319        get_info: SettingInfo,
320        expected_info: SettingInfo,
321        change_function: Option<ChangeFunction>,
322    ) {
323        // Create MessageHub for communication between components.
324        let message_hub_delegate = MessageHub::create_hub();
325
326        // Create mock handler endpoint to receive request.
327        let mut handler_receiver = message_hub_delegate
328            .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
329            .await
330            .expect("handler messenger should be created")
331            .1;
332
333        let (response_tx, response_rx) =
334            futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
335        let (_cancelation_tx, cancelation_rx) = oneshot::channel();
336
337        let work = match change_function {
338            None => Box::new(Work::new(
339                SettingType::Unknown,
340                TestResponder::new(response_tx),
341                cancelation_rx,
342            )),
343            Some(change_function) => Box::new(Work::with_change_function(
344                SettingType::Unknown,
345                TestResponder::new(response_tx),
346                cancelation_rx,
347                change_function,
348            )),
349        };
350
351        // Execute work on async task.
352        let work_messenger = message_hub_delegate
353            .create(MessengerType::Unbound)
354            .await
355            .expect("messenger should be created")
356            .0;
357
358        let work_messenger_signature = work_messenger.get_signature();
359        fasync::Task::local(async move {
360            let _ = work.execute(work_messenger, store_handle, 0.into()).await;
361        })
362        .detach();
363
364        // Ensure the listen request is received from the right sender.
365        let (listen_request, listen_client) = handler_receiver
366            .next_of::<Payload>()
367            .await
368            .expect("should successfully receive a listen request");
369        assert_matches!(listen_request, Payload::Request(Request::Listen));
370        assert!(listen_client.get_author() == work_messenger_signature);
371
372        // Listen should be followed by a get request.
373        let (get_request, get_client) = handler_receiver
374            .next_of::<Payload>()
375            .await
376            .expect("should successfully receive a get request");
377        assert_matches!(get_request, Payload::Request(Request::Get));
378        assert!(get_client.get_author() == work_messenger_signature);
379
380        // Reply to the get request.
381        let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into());
382        let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into());
383
384        assert_matches!(response_rx.await.expect("should receive successful response"),
385                Ok(x) if x == expected_info);
386    }
387
388    // This test verifies that custom change functions work by using a custom change function that
389    // always says a new value is different, even if the actual value is unchanged.
390    #[fuchsia::test(allow_stalls = false)]
391    async fn test_custom_change_function() {
392        // Create store for job.
393        let store_handle = Rc::new(Mutex::new(HashMap::new()));
394
395        // Pre-fill the storage with the value so that the initial get will not trigger a response.
396        let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
397        let _ = store_handle
398            .lock()
399            .await
400            .insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
401
402        verify_watch(
403            store_handle,
404            // Send the same value on both the get and listen requests so that the default change
405            // function would not trigger a response to the client.
406            unchanged_info.clone(),
407            unchanged_info.clone(),
408            unchanged_info,
409            // Use a custom change function that always reports a change.
410            Some(ChangeFunction::new(
411                0,
412                Box::new(move |_old: &SettingInfo, _new: &SettingInfo| true),
413            )),
414        )
415        .await;
416    }
417
418    #[fuchsia::test(allow_stalls = false)]
419    async fn test_error_propagation() {
420        // Create MessageHub for communication between components.
421        let message_hub_delegate = MessageHub::create_hub();
422
423        let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
424
425        let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
426        // Create a listen request to a non-existent end-point.
427        let work = Box::new(Work::new(
428            SettingType::Unknown,
429            TestResponder::new(response_tx),
430            cancelation_rx,
431        ));
432
433        let work_messenger = message_hub_delegate
434            .create(MessengerType::Unbound)
435            .await
436            .expect("messenger should be created")
437            .0;
438
439        // Execute work on async task.
440        fasync::Task::local(async move {
441            let _ =
442                work.execute(work_messenger, Rc::new(Mutex::new(HashMap::new())), 0.into()).await;
443        })
444        .detach();
445
446        // Ensure an error is returned by the executed work.
447        assert_matches!(response_rx.await.expect("should receive successful response"),
448                Err(x) if x == crate::handler::base::Error::CommunicationError);
449    }
450}