1use crate::base::{SettingInfo, SettingType};
21use crate::handler::base::{Error, Payload, Request};
22use crate::job::data::{self, Data, Key};
23use crate::job::work::{Error as WorkError, Load, Sequential};
24use crate::job::{Job, Signature};
25use crate::message::base::Audience;
26use crate::message::receptor::Receptor;
27use crate::service::{Address, message};
28use crate::trace;
29use async_trait::async_trait;
30use fuchsia_trace as ftrace;
31use futures::FutureExt;
32use futures::channel::oneshot;
33use std::collections::HashMap;
34use std::marker::PhantomData;
35
36const LAST_VALUE_KEY: &str = "LAST_VALUE";
39
40pub(crate) struct ChangeFunction {
43 #[allow(clippy::type_complexity)]
45 function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
46
47 id: u64,
52}
53
54impl ChangeFunction {
55 #[allow(clippy::type_complexity)]
56 pub fn new(
57 id: u64,
58 function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
59 ) -> ChangeFunction {
60 ChangeFunction { function, id }
61 }
62}
63
64pub trait Responder<R: From<SettingInfo>, E: From<Error>> {
68 fn respond(self, response: Result<R, E>);
69}
70
71pub struct Work<R: From<SettingInfo>, E: From<Error>, T: Responder<R, E>> {
72 setting_type: SettingType,
73 signature: Signature,
74 responder: T,
75 cancelation_rx: oneshot::Receiver<()>,
76 change_function: Option<ChangeFunction>,
77 _response_type: PhantomData<R>,
78 _error_type: PhantomData<E>,
79}
80
81impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
82 Work<R, E, T>
83{
84 #[allow(dead_code)]
85 fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
86 where
87 T: 'static,
88 {
89 Self {
90 setting_type,
91 signature: Signature::new::<T>(),
92 responder,
93 cancelation_rx,
94 change_function: None,
95 _response_type: PhantomData,
96 _error_type: PhantomData,
97 }
98 }
99
100 #[allow(dead_code)]
101 pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
102 where
103 T: 'static,
104 {
105 let (cancelation_tx, cancelation_rx) = oneshot::channel();
106 let work = Self::new(setting_type, responder, cancelation_rx);
107 Job::from((work, cancelation_tx))
108 }
109
110 pub(crate) fn new_job_with_change_function(
111 setting_type: SettingType,
112 responder: T,
113 change_function: ChangeFunction,
114 ) -> Job
115 where
116 T: 'static,
117 {
118 let (cancelation_tx, cancelation_rx) = oneshot::channel();
119 let work =
120 Self::with_change_function(setting_type, responder, cancelation_rx, change_function);
121 Job::from((work, cancelation_tx))
122 }
123
124 pub(crate) fn with_change_function(
125 setting_type: SettingType,
126 responder: T,
127 cancelation_rx: oneshot::Receiver<()>,
128 change_function: ChangeFunction,
129 ) -> Self {
130 Self {
131 setting_type,
132 signature: Signature::with::<T>(change_function.id),
133 responder,
134 cancelation_rx,
135 change_function: Some(change_function),
136 _response_type: PhantomData,
137 _error_type: PhantomData,
138 }
139 }
140
141 async fn get_next(
142 &mut self,
143 receptor: &mut Receptor,
144 ) -> Result<Result<Payload, anyhow::Error>, WorkError> {
145 let receptor = receptor.next_of::<Payload>().fuse();
146 let mut cancelation_rx = &mut self.cancelation_rx;
147 futures::pin_mut!(receptor);
148 futures::select! {
149 result = receptor => Ok(result.map(|(payload, _)| payload)),
150 _ = cancelation_rx => Err(WorkError::Canceled),
151 }
152 }
153
154 fn process_response(
157 &self,
158 response: Result<Payload, anyhow::Error>,
159 store: &mut HashMap<Key, Data>,
160 ) -> Option<Result<SettingInfo, Error>> {
161 match response {
162 Ok(Payload::Response(Ok(Some(setting_info)))) => {
163 let key = Key::Identifier(LAST_VALUE_KEY);
164
165 let return_val = match (store.get(&key), self.change_function.as_ref()) {
166 (Some(Data::SettingInfo(info)), Some(change_function))
168 if !(change_function.function)(info, &setting_info) =>
169 {
170 None
171 }
172 (Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
174 _ => Some(Ok(setting_info)),
175 };
176
177 if let Some(Ok(ref info)) = return_val {
178 let _ = store.insert(key, Data::SettingInfo(info.clone()));
179 }
180
181 return_val
182 }
183 Ok(Payload::Response(Err(error))) => Some(Err(error)),
184 Err(error) => {
185 log::warn!("An error occurred while watching {:?}:{:?}", self.setting_type, error);
186 Some(Err(match error.root_cause().downcast_ref::<Error>() {
187 Some(error) => error.clone(),
188 _ => crate::handler::base::Error::CommunicationError,
189 }))
190 }
191 _ => {
192 panic!("invalid variant {response:?}");
193 }
194 }
195 }
196}
197
198#[async_trait(?Send)]
199impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
200 Sequential for Work<R, E, T>
201{
202 async fn execute(
203 mut self: Box<Self>,
204 messenger: message::Messenger,
205 store_handle: data::StoreHandle,
206 id: ftrace::Id,
207 ) -> Result<(), WorkError> {
208 trace!(id, c"Sequential Work execute");
209 let mut store = store_handle.lock().await;
211
212 let mut listen_receptor = messenger.message(
215 Payload::Request(Request::Listen).into(),
216 Audience::Address(Address::Handler(self.setting_type)),
217 );
218
219 let mut get_receptor = messenger.message(
221 Payload::Request(Request::Get).into(),
222 Audience::Address(Address::Handler(self.setting_type)),
223 );
224
225 trace!(id, c"Get first response");
228 let next_payload = self.get_next(&mut get_receptor).await?;
229 if let Some(response) = self.process_response(next_payload, &mut store) {
230 self.responder.respond(response.map(R::from).map_err(|err| {
231 log::error!("First watch response has an error: {:?}", err);
232 E::from(err)
233 }));
234 return Ok(());
235 }
236
237 loop {
239 trace!(id, c"Get looped response");
240 let next_payload = self.get_next(&mut listen_receptor).await?;
241 if let Some(response) = self.process_response(next_payload, &mut store) {
242 self.responder.respond(response.map(R::from).map_err(|err| {
243 log::error!("Updated watch response has an error: {:?}", err);
244 E::from(err)
245 }));
246 return Ok(());
247 }
248 }
249 }
250}
251
252impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
253 From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
254{
255 fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
256 let signature = work.signature;
257 Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::base::UnknownInfo;
265 use crate::message::base::MessengerType;
266 use crate::service::MessageHub;
267 use assert_matches::assert_matches;
268 use fuchsia_async as fasync;
269 use futures::channel::oneshot::Sender;
270 use futures::lock::Mutex;
271 use std::rc::Rc;
272
273 struct TestResponder {
274 sender: Sender<Result<SettingInfo, Error>>,
275 }
276
277 impl TestResponder {
278 pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
279 Self { sender }
280 }
281 }
282
283 impl Responder<SettingInfo, Error> for TestResponder {
284 fn respond(self, response: Result<SettingInfo, Error>) {
285 self.sender.send(response).expect("send should succeed");
286 }
287 }
288
289 #[fuchsia::test(allow_stalls = false)]
290 async fn test_watch_basic_functionality() {
291 let store_handle = Rc::new(Mutex::new(HashMap::new()));
293
294 let get_info = SettingInfo::Unknown(UnknownInfo(true));
295 let listen_info = SettingInfo::Unknown(UnknownInfo(false));
296
297 verify_watch(
299 store_handle.clone(),
300 listen_info.clone(),
301 get_info.clone(),
302 get_info.clone(),
303 None,
304 )
305 .await;
306 verify_watch(
309 store_handle.clone(),
310 listen_info.clone(),
311 get_info.clone(),
312 listen_info.clone(),
313 None,
314 )
315 .await;
316 }
317
318 async fn verify_watch(
319 store_handle: data::StoreHandle,
320 listen_info: SettingInfo,
321 get_info: SettingInfo,
322 expected_info: SettingInfo,
323 change_function: Option<ChangeFunction>,
324 ) {
325 let message_hub_delegate = MessageHub::create_hub();
327
328 let mut handler_receiver = message_hub_delegate
330 .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
331 .await
332 .expect("handler messenger should be created")
333 .1;
334
335 let (response_tx, response_rx) =
336 futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
337 let (_cancelation_tx, cancelation_rx) = oneshot::channel();
338
339 let work = match change_function {
340 None => Box::new(Work::new(
341 SettingType::Unknown,
342 TestResponder::new(response_tx),
343 cancelation_rx,
344 )),
345 Some(change_function) => Box::new(Work::with_change_function(
346 SettingType::Unknown,
347 TestResponder::new(response_tx),
348 cancelation_rx,
349 change_function,
350 )),
351 };
352
353 let work_messenger = message_hub_delegate
355 .create(MessengerType::Unbound)
356 .await
357 .expect("messenger should be created")
358 .0;
359
360 let work_messenger_signature = work_messenger.get_signature();
361 fasync::Task::local(async move {
362 let _ = work.execute(work_messenger, store_handle, 0.into()).await;
363 })
364 .detach();
365
366 let (listen_request, listen_client) = handler_receiver
368 .next_of::<Payload>()
369 .await
370 .expect("should successfully receive a listen request");
371 assert_matches!(listen_request, Payload::Request(Request::Listen));
372 assert!(listen_client.get_author() == work_messenger_signature);
373
374 let (get_request, get_client) = handler_receiver
376 .next_of::<Payload>()
377 .await
378 .expect("should successfully receive a get request");
379 assert_matches!(get_request, Payload::Request(Request::Get));
380 assert!(get_client.get_author() == work_messenger_signature);
381
382 let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into());
384 let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into());
385
386 assert_matches!(response_rx.await.expect("should receive successful response"),
387 Ok(x) if x == expected_info);
388 }
389
390 #[fuchsia::test(allow_stalls = false)]
393 async fn test_custom_change_function() {
394 let store_handle = Rc::new(Mutex::new(HashMap::new()));
396
397 let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
399 let _ = store_handle
400 .lock()
401 .await
402 .insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
403
404 verify_watch(
405 store_handle,
406 unchanged_info.clone(),
409 unchanged_info.clone(),
410 unchanged_info,
411 Some(ChangeFunction::new(
413 0,
414 Box::new(move |_old: &SettingInfo, _new: &SettingInfo| true),
415 )),
416 )
417 .await;
418 }
419
420 #[fuchsia::test(allow_stalls = false)]
421 async fn test_error_propagation() {
422 let message_hub_delegate = MessageHub::create_hub();
424
425 let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
426
427 let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
428 let work = Box::new(Work::new(
430 SettingType::Unknown,
431 TestResponder::new(response_tx),
432 cancelation_rx,
433 ));
434
435 let work_messenger = message_hub_delegate
436 .create(MessengerType::Unbound)
437 .await
438 .expect("messenger should be created")
439 .0;
440
441 fasync::Task::local(async move {
443 let _ =
444 work.execute(work_messenger, Rc::new(Mutex::new(HashMap::new())), 0.into()).await;
445 })
446 .detach();
447
448 assert_matches!(response_rx.await.expect("should receive successful response"),
450 Err(x) if x == crate::handler::base::Error::CommunicationError);
451 }
452}