1use crate::base::{SettingInfo, SettingType};
21use crate::handler::base::{Error, Payload, Request};
22use crate::job::data::{self, Data, Key};
23use crate::job::work::{Error as WorkError, Load, Sequential};
24use crate::job::{Job, Signature};
25use crate::message::base::Audience;
26use crate::message::receptor::Receptor;
27use crate::service::{message, Address};
28use crate::trace;
29use async_trait::async_trait;
30use fuchsia_trace as ftrace;
31use futures::channel::oneshot;
32use futures::FutureExt;
33use std::collections::HashMap;
34use std::marker::PhantomData;
35
36const LAST_VALUE_KEY: &str = "LAST_VALUE";
39
40pub(crate) struct ChangeFunction {
43 #[allow(clippy::type_complexity)]
45 function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
46
47 id: u64,
52}
53
54impl ChangeFunction {
55 #[allow(clippy::type_complexity)]
56 pub fn new(
57 id: u64,
58 function: Box<dyn Fn(&SettingInfo, &SettingInfo) -> bool>,
59 ) -> ChangeFunction {
60 ChangeFunction { function, id }
61 }
62}
63
64pub trait Responder<R: From<SettingInfo>, E: From<Error>> {
68 fn respond(self, response: Result<R, E>);
69}
70
71pub struct Work<R: From<SettingInfo>, E: From<Error>, T: Responder<R, E>> {
72 setting_type: SettingType,
73 signature: Signature,
74 responder: T,
75 cancelation_rx: oneshot::Receiver<()>,
76 change_function: Option<ChangeFunction>,
77 _response_type: PhantomData<R>,
78 _error_type: PhantomData<E>,
79}
80
81impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
82 Work<R, E, T>
83{
84 fn new(setting_type: SettingType, responder: T, cancelation_rx: oneshot::Receiver<()>) -> Self
85 where
86 T: 'static,
87 {
88 Self {
89 setting_type,
90 signature: Signature::new::<T>(),
91 responder,
92 cancelation_rx,
93 change_function: None,
94 _response_type: PhantomData,
95 _error_type: PhantomData,
96 }
97 }
98
99 pub(crate) fn new_job(setting_type: SettingType, responder: T) -> Job
100 where
101 T: 'static,
102 {
103 let (cancelation_tx, cancelation_rx) = oneshot::channel();
104 let work = Self::new(setting_type, responder, cancelation_rx);
105 Job::from((work, cancelation_tx))
106 }
107
108 pub(crate) fn new_job_with_change_function(
109 setting_type: SettingType,
110 responder: T,
111 change_function: ChangeFunction,
112 ) -> Job
113 where
114 T: 'static,
115 {
116 let (cancelation_tx, cancelation_rx) = oneshot::channel();
117 let work =
118 Self::with_change_function(setting_type, responder, cancelation_rx, change_function);
119 Job::from((work, cancelation_tx))
120 }
121
122 pub(crate) fn with_change_function(
123 setting_type: SettingType,
124 responder: T,
125 cancelation_rx: oneshot::Receiver<()>,
126 change_function: ChangeFunction,
127 ) -> Self {
128 Self {
129 setting_type,
130 signature: Signature::with::<T>(change_function.id),
131 responder,
132 cancelation_rx,
133 change_function: Some(change_function),
134 _response_type: PhantomData,
135 _error_type: PhantomData,
136 }
137 }
138
139 async fn get_next(
140 &mut self,
141 receptor: &mut Receptor,
142 ) -> Result<Result<Payload, anyhow::Error>, WorkError> {
143 let receptor = receptor.next_of::<Payload>().fuse();
144 let mut cancelation_rx = &mut self.cancelation_rx;
145 futures::pin_mut!(receptor);
146 futures::select! {
147 result = receptor => Ok(result.map(|(payload, _)| payload)),
148 _ = cancelation_rx => Err(WorkError::Canceled),
149 }
150 }
151
152 fn process_response(
155 &self,
156 response: Result<Payload, anyhow::Error>,
157 store: &mut HashMap<Key, Data>,
158 ) -> Option<Result<SettingInfo, Error>> {
159 match response {
160 Ok(Payload::Response(Ok(Some(setting_info)))) => {
161 let key = Key::Identifier(LAST_VALUE_KEY);
162
163 let return_val = match (store.get(&key), self.change_function.as_ref()) {
164 (Some(Data::SettingInfo(info)), Some(change_function))
166 if !(change_function.function)(info, &setting_info) =>
167 {
168 None
169 }
170 (Some(Data::SettingInfo(info)), None) if *info == setting_info => None,
172 _ => Some(Ok(setting_info)),
173 };
174
175 if let Some(Ok(ref info)) = return_val {
176 let _ = store.insert(key, Data::SettingInfo(info.clone()));
177 }
178
179 return_val
180 }
181 Ok(Payload::Response(Err(error))) => Some(Err(error)),
182 Err(error) => {
183 log::warn!("An error occurred while watching {:?}:{:?}", self.setting_type, error);
184 Some(Err(match error.root_cause().downcast_ref::<Error>() {
185 Some(error) => error.clone(),
186 _ => crate::handler::base::Error::CommunicationError,
187 }))
188 }
189 _ => {
190 panic!("invalid variant {response:?}");
191 }
192 }
193 }
194}
195
196#[async_trait(?Send)]
197impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
198 Sequential for Work<R, E, T>
199{
200 async fn execute(
201 mut self: Box<Self>,
202 messenger: message::Messenger,
203 store_handle: data::StoreHandle,
204 id: ftrace::Id,
205 ) -> Result<(), WorkError> {
206 trace!(id, c"Sequential Work execute");
207 let mut store = store_handle.lock().await;
209
210 let mut listen_receptor = messenger.message(
213 Payload::Request(Request::Listen).into(),
214 Audience::Address(Address::Handler(self.setting_type)),
215 );
216
217 let mut get_receptor = messenger.message(
219 Payload::Request(Request::Get).into(),
220 Audience::Address(Address::Handler(self.setting_type)),
221 );
222
223 trace!(id, c"Get first response");
226 let next_payload = self.get_next(&mut get_receptor).await?;
227 if let Some(response) = self.process_response(next_payload, &mut store) {
228 self.responder.respond(response.map(R::from).map_err(|err| {
229 log::error!("First watch response has an error: {:?}", err);
230 E::from(err)
231 }));
232 return Ok(());
233 }
234
235 loop {
237 trace!(id, c"Get looped response");
238 let next_payload = self.get_next(&mut listen_receptor).await?;
239 if let Some(response) = self.process_response(next_payload, &mut store) {
240 self.responder.respond(response.map(R::from).map_err(|err| {
241 log::error!("Updated watch response has an error: {:?}", err);
242 E::from(err)
243 }));
244 return Ok(());
245 }
246 }
247 }
248}
249
250impl<R: From<SettingInfo> + 'static, E: From<Error> + 'static, T: Responder<R, E> + 'static>
251 From<(Work<R, E, T>, oneshot::Sender<()>)> for Job
252{
253 fn from((work, cancelation_tx): (Work<R, E, T>, oneshot::Sender<()>)) -> Job {
254 let signature = work.signature;
255 Job::new_with_cancellation(Load::Sequential(Box::new(work), signature), cancelation_tx)
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use crate::base::UnknownInfo;
263 use crate::message::base::MessengerType;
264 use crate::service::MessageHub;
265 use assert_matches::assert_matches;
266 use fuchsia_async as fasync;
267 use futures::channel::oneshot::Sender;
268 use futures::lock::Mutex;
269 use std::rc::Rc;
270
271 struct TestResponder {
272 sender: Sender<Result<SettingInfo, Error>>,
273 }
274
275 impl TestResponder {
276 pub(crate) fn new(sender: Sender<Result<SettingInfo, Error>>) -> Self {
277 Self { sender }
278 }
279 }
280
281 impl Responder<SettingInfo, Error> for TestResponder {
282 fn respond(self, response: Result<SettingInfo, Error>) {
283 self.sender.send(response).expect("send should succeed");
284 }
285 }
286
287 #[fuchsia::test(allow_stalls = false)]
288 async fn test_watch_basic_functionality() {
289 let store_handle = Rc::new(Mutex::new(HashMap::new()));
291
292 let get_info = SettingInfo::Unknown(UnknownInfo(true));
293 let listen_info = SettingInfo::Unknown(UnknownInfo(false));
294
295 verify_watch(
297 store_handle.clone(),
298 listen_info.clone(),
299 get_info.clone(),
300 get_info.clone(),
301 None,
302 )
303 .await;
304 verify_watch(
307 store_handle.clone(),
308 listen_info.clone(),
309 get_info.clone(),
310 listen_info.clone(),
311 None,
312 )
313 .await;
314 }
315
316 async fn verify_watch(
317 store_handle: data::StoreHandle,
318 listen_info: SettingInfo,
319 get_info: SettingInfo,
320 expected_info: SettingInfo,
321 change_function: Option<ChangeFunction>,
322 ) {
323 let message_hub_delegate = MessageHub::create_hub();
325
326 let mut handler_receiver = message_hub_delegate
328 .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
329 .await
330 .expect("handler messenger should be created")
331 .1;
332
333 let (response_tx, response_rx) =
334 futures::channel::oneshot::channel::<Result<SettingInfo, Error>>();
335 let (_cancelation_tx, cancelation_rx) = oneshot::channel();
336
337 let work = match change_function {
338 None => Box::new(Work::new(
339 SettingType::Unknown,
340 TestResponder::new(response_tx),
341 cancelation_rx,
342 )),
343 Some(change_function) => Box::new(Work::with_change_function(
344 SettingType::Unknown,
345 TestResponder::new(response_tx),
346 cancelation_rx,
347 change_function,
348 )),
349 };
350
351 let work_messenger = message_hub_delegate
353 .create(MessengerType::Unbound)
354 .await
355 .expect("messenger should be created")
356 .0;
357
358 let work_messenger_signature = work_messenger.get_signature();
359 fasync::Task::local(async move {
360 let _ = work.execute(work_messenger, store_handle, 0.into()).await;
361 })
362 .detach();
363
364 let (listen_request, listen_client) = handler_receiver
366 .next_of::<Payload>()
367 .await
368 .expect("should successfully receive a listen request");
369 assert_matches!(listen_request, Payload::Request(Request::Listen));
370 assert!(listen_client.get_author() == work_messenger_signature);
371
372 let (get_request, get_client) = handler_receiver
374 .next_of::<Payload>()
375 .await
376 .expect("should successfully receive a get request");
377 assert_matches!(get_request, Payload::Request(Request::Get));
378 assert!(get_client.get_author() == work_messenger_signature);
379
380 let _ = get_client.reply(Payload::Response(Ok(Some(get_info))).into());
382 let _ = listen_client.reply(Payload::Response(Ok(Some(listen_info))).into());
383
384 assert_matches!(response_rx.await.expect("should receive successful response"),
385 Ok(x) if x == expected_info);
386 }
387
388 #[fuchsia::test(allow_stalls = false)]
391 async fn test_custom_change_function() {
392 let store_handle = Rc::new(Mutex::new(HashMap::new()));
394
395 let unchanged_info = SettingInfo::Unknown(UnknownInfo(true));
397 let _ = store_handle
398 .lock()
399 .await
400 .insert(Key::Identifier(LAST_VALUE_KEY), Data::SettingInfo(unchanged_info.clone()));
401
402 verify_watch(
403 store_handle,
404 unchanged_info.clone(),
407 unchanged_info.clone(),
408 unchanged_info,
409 Some(ChangeFunction::new(
411 0,
412 Box::new(move |_old: &SettingInfo, _new: &SettingInfo| true),
413 )),
414 )
415 .await;
416 }
417
418 #[fuchsia::test(allow_stalls = false)]
419 async fn test_error_propagation() {
420 let message_hub_delegate = MessageHub::create_hub();
422
423 let (response_tx, response_rx) = oneshot::channel::<Result<SettingInfo, Error>>();
424
425 let (_cancelation_tx, cancelation_rx) = oneshot::channel::<()>();
426 let work = Box::new(Work::new(
428 SettingType::Unknown,
429 TestResponder::new(response_tx),
430 cancelation_rx,
431 ));
432
433 let work_messenger = message_hub_delegate
434 .create(MessengerType::Unbound)
435 .await
436 .expect("messenger should be created")
437 .0;
438
439 fasync::Task::local(async move {
441 let _ =
442 work.execute(work_messenger, Rc::new(Mutex::new(HashMap::new())), 0.into()).await;
443 })
444 .detach();
445
446 assert_matches!(response_rx.await.expect("should receive successful response"),
448 Err(x) if x == crate::handler::base::Error::CommunicationError);
449 }
450}