1use crate::agent::{
6 AgentError, Context as AgentContext, Invocation, InvocationResult, Lifespan, Payload,
7};
8use crate::base::SettingType;
9use crate::event::{Event, Publisher, camera_watcher};
10use crate::handler::base::{Payload as HandlerPayload, Request};
11use crate::message::base::Audience;
12use crate::{service, trace, trace_guard};
13use fuchsia_async as fasync;
14use futures::channel::mpsc::UnboundedSender;
15use settings_camera::connect_to_camera;
16use settings_common::inspect::event::ExternalEventPublisher;
17use settings_common::service_context::ServiceContext;
18
19use std::collections::HashSet;
20use std::rc::Rc;
21
22use super::{AgentCreator, CreationFunc};
23
24fn get_event_setting_types() -> HashSet<SettingType> {
27 vec![SettingType::Input].into_iter().collect()
28}
29
30pub(crate) fn create_registrar(
31 muted_txs: Vec<UnboundedSender<bool>>,
32 external_publisher: ExternalEventPublisher,
33) -> AgentCreator {
34 AgentCreator {
35 debug_id: "CameraWatcherAgent",
36 create: CreationFunc::Dynamic(Rc::new(move |context| {
37 let muted_txs = muted_txs.clone();
38 let external_publisher = external_publisher.clone();
39 Box::pin(async move {
40 CameraWatcherAgent::create(context, muted_txs, external_publisher).await;
41 })
42 })),
43 }
44}
45
46pub(crate) struct CameraWatcherAgent {
48 publisher: Publisher,
49 messenger: service::message::Messenger,
50
51 recipient_settings: HashSet<SettingType>,
53 muted_txs: Vec<UnboundedSender<bool>>,
56 external_publisher: ExternalEventPublisher,
57}
58
59impl CameraWatcherAgent {
60 pub(crate) async fn create(
61 context: AgentContext,
62 muted_txs: Vec<UnboundedSender<bool>>,
63 external_publisher: ExternalEventPublisher,
64 ) {
65 let mut agent = CameraWatcherAgent {
66 publisher: context.get_publisher(),
67 messenger: context
68 .create_messenger()
69 .await
70 .expect("messenger should be created for CameraWatchAgent"),
71 recipient_settings: context
72 .available_components
73 .intersection(&get_event_setting_types())
74 .cloned()
75 .collect::<HashSet<SettingType>>(),
76 muted_txs,
77 external_publisher,
78 };
79
80 let mut receptor = context.receptor;
81 fasync::Task::local(async move {
82 let id = fuchsia_trace::Id::new();
83 let guard = trace_guard!(id, c"camera watcher agent");
84 while let Ok((payload, client)) = receptor.next_of::<Payload>().await {
85 trace!(id, c"payload");
86 if let Payload::Invocation(invocation) = payload {
87 let _ = client.reply(Payload::Complete(agent.handle(invocation).await).into());
88 }
89 }
90 drop(guard);
91
92 log::info!("Camera watcher agent done processing requests");
93 })
94 .detach()
95 }
96
97 async fn handle(&mut self, invocation: Invocation) -> InvocationResult {
98 match invocation.lifespan {
99 Lifespan::Initialization => Err(AgentError::UnhandledLifespan),
100 Lifespan::Service => {
101 self.handle_service_lifespan(&*invocation.service_context.common_context()).await
102 }
103 }
104 }
105
106 async fn handle_service_lifespan(
107 &mut self,
108 service_context: &ServiceContext,
109 ) -> InvocationResult {
110 match connect_to_camera(service_context, self.external_publisher.clone()).await {
111 Ok(camera_device_client) => {
112 let mut event_handler = EventHandler {
113 muted_txs: self.muted_txs.clone(),
114 publisher: self.publisher.clone(),
115 messenger: self.messenger.clone(),
116 recipient_settings: self.recipient_settings.clone(),
117 sw_muted: false,
118 };
119 fasync::Task::local(async move {
120 let id = fuchsia_trace::Id::new();
121 trace!(id, c"camera_watcher_agent_handler");
126 while let Ok((sw_muted, _hw_muted)) =
127 camera_device_client.watch_mute_state().await
128 {
129 trace!(id, c"event");
130 event_handler.handle_event(sw_muted);
131 }
132 })
133 .detach();
134
135 Ok(())
136 }
137 Err(e) => {
138 log::error!("Unable to watch camera device: {:?}", e);
139 Err(AgentError::UnexpectedError)
140 }
141 }
142 }
143}
144
145struct EventHandler {
146 muted_txs: Vec<UnboundedSender<bool>>,
147 publisher: Publisher,
148 messenger: service::message::Messenger,
149 recipient_settings: HashSet<SettingType>,
150 sw_muted: bool,
151}
152
153impl EventHandler {
154 fn handle_event(&mut self, sw_muted: bool) {
155 if self.sw_muted != sw_muted {
156 self.sw_muted = sw_muted;
157 self.send_event(sw_muted);
158 }
159 }
160
161 fn send_event(&self, muted: bool) {
162 for muted_tx in &self.muted_txs {
163 let _ = muted_tx.unbounded_send(muted);
164 }
165
166 self.publisher.send_event(Event::CameraUpdate(camera_watcher::Event::OnSWMuteState(muted)));
167 let setting_request: Request = Request::OnCameraSWState(muted);
168
169 for setting_type in self.recipient_settings.iter() {
171 let _ = self.messenger.message(
173 HandlerPayload::Request(setting_request.clone()).into(),
174 Audience::Address(service::Address::Handler(*setting_type)),
175 );
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::event;
184 use crate::message::base::{MessageEvent, MessengerType};
185 use crate::message::receptor::Receptor;
186 use crate::tests::helpers::{
187 create_messenger_and_publisher, create_messenger_and_publisher_from_hub,
188 create_receptor_for_setting_type,
189 };
190 use assert_matches::assert_matches;
191 use futures::StreamExt;
192 use futures::channel::mpsc;
193 use settings_test_common::fakes::service::ServiceRegistry;
194
195 #[fuchsia::test(allow_stalls = false)]
197 async fn initialization_lifespan_is_unhandled() {
198 let (messenger, publisher) = create_messenger_and_publisher().await;
200 let (event_tx, _event_rx) = mpsc::unbounded();
201 let external_publisher = ExternalEventPublisher::new(event_tx);
202
203 let mut agent = CameraWatcherAgent {
205 muted_txs: vec![],
206 publisher,
207 messenger,
208 recipient_settings: HashSet::new(),
209 external_publisher,
210 };
211
212 let result = agent
214 .handle(Invocation {
215 lifespan: Lifespan::Initialization,
216 service_context: Rc::new(crate::service_context::ServiceContext::new(None, None)),
217 })
218 .await;
219
220 assert!(matches!(result, Err(AgentError::UnhandledLifespan)));
221 }
222
223 #[fuchsia::test(allow_stalls = false)]
225 async fn when_camera3_inaccessible_returns_err() {
226 let (messenger, publisher) = create_messenger_and_publisher().await;
228 let (event_tx, _event_rx) = mpsc::unbounded();
229 let external_publisher = ExternalEventPublisher::new(event_tx);
230
231 let mut agent = CameraWatcherAgent {
233 muted_txs: vec![],
234 publisher,
235 messenger,
236 recipient_settings: HashSet::new(),
237 external_publisher,
238 };
239
240 let service_context = Rc::new(crate::service_context::ServiceContext::new(
241 Some(ServiceRegistry::serve(ServiceRegistry::create())),
243 None,
244 ));
245
246 let result =
248 agent.handle(Invocation { lifespan: Lifespan::Service, service_context }).await;
249 assert!(matches!(result, Err(AgentError::UnexpectedError)));
250 }
251
252 #[fuchsia::test(allow_stalls = false)]
254 async fn event_handler_proxies_event() {
255 let service_message_hub = service::MessageHub::create_hub();
256 let (messenger, publisher) =
257 create_messenger_and_publisher_from_hub(&service_message_hub).await;
258
259 let event_receptor = service::build_event_listener(&service_message_hub).await;
264
265 let handler_receptor: Receptor =
270 create_receptor_for_setting_type(&service_message_hub, SettingType::Unknown).await;
271
272 let (tx1, mut rx1) = mpsc::unbounded();
273 let (tx2, mut rx2) = mpsc::unbounded();
274
275 let mut event_handler = EventHandler {
276 muted_txs: vec![tx1.clone(), tx2.clone()],
277 publisher,
278 messenger,
279 recipient_settings: vec![SettingType::Unknown].into_iter().collect(),
280 sw_muted: false,
281 };
282
283 event_handler.handle_event(true);
285
286 service_message_hub.delete(handler_receptor.get_signature());
289 service_message_hub.delete(event_receptor.get_signature());
290
291 let mut agent_received_sw_mute = false;
292 let mut handler_received_event = false;
293 let mut channel_received = 0;
294
295 let fused_event = event_receptor.fuse();
296 let fused_setting_handler = handler_receptor.fuse();
297 let mut next_rx1 = rx1.next();
298 let mut next_rx2 = rx2.next();
299 futures::pin_mut!(fused_event, fused_setting_handler);
300
301 loop {
305 futures::select! {
306 message = fused_event.select_next_some() => {
307 if let MessageEvent::Message(service::Payload::Event(event::Payload::Event(
308 event::Event::CameraUpdate(event)
309 )), _) = message
310 {
311 match event {
312 event::camera_watcher::Event::OnSWMuteState(muted) => {
313 assert!(muted);
314 agent_received_sw_mute = true;
315 }
316 }
317 }
318 },
319 message = fused_setting_handler.select_next_some() => {
320 if let MessageEvent::Message(
321 service::Payload::Setting(HandlerPayload::Request(
322 Request::OnCameraSWState(_muted))),
323 _,
324 ) = message
325 {
326 handler_received_event = true;
327 }
328 }
329 event = next_rx1 => {
330 let Some(muted) = event else {
331 continue;
332 };
333 assert!(muted);
334 tx1.close_channel();
336 channel_received += 1;
337 }
338 event = next_rx2 => {
339 let Some(muted) = event else {
340 continue;
341 };
342 assert!(muted);
343 tx2.close_channel();
345 channel_received += 1;
346 }
347 complete => break,
348 }
349 }
350
351 assert!(agent_received_sw_mute);
352 assert!(handler_received_event);
353 assert_eq!(channel_received, 2);
354 }
355
356 #[fuchsia::test(allow_stalls = false)]
358 async fn event_handler_sends_no_events_if_no_settings_available() {
359 let service_message_hub = service::MessageHub::create_hub();
360 let (messenger, publisher) =
361 create_messenger_and_publisher_from_hub(&service_message_hub).await;
362 let handler_address = service::Address::Handler(SettingType::Unknown);
363 let verification_request = Request::Get;
364
365 let mut handler_receptor: Receptor = service_message_hub
370 .create(MessengerType::Addressable(handler_address))
371 .await
372 .expect("Unable to create handler receptor")
373 .1;
374
375 let mut event_handler = EventHandler {
377 muted_txs: vec![],
378 publisher,
379 messenger,
380 recipient_settings: HashSet::new(),
381 sw_muted: false,
382 };
383
384 event_handler.handle_event(true);
386
387 let _ = service_message_hub
389 .create(MessengerType::Unbound)
390 .await
391 .expect("Unable to create messenger")
392 .0
393 .message(
394 HandlerPayload::Request(verification_request.clone()).into(),
395 Audience::Address(handler_address),
396 );
397
398 service_message_hub.delete(handler_receptor.get_signature());
401
402 assert_matches!(
403 handler_receptor.next_of::<HandlerPayload>().await,
404 Ok((HandlerPayload::Request(request), _))
405 if request == verification_request
406 )
407 }
408}