settings/agent/
camera_watcher.rs1use crate::agent::{
6 AgentError, Context as AgentContext, Invocation, InvocationResult, Lifespan, Payload,
7};
8use crate::base::SettingType;
9use crate::event::{camera_watcher, Event, Publisher};
10use crate::handler::base::{Payload as HandlerPayload, Request};
11use crate::input::common::connect_to_camera;
12use crate::message::base::Audience;
13use crate::service_context::ServiceContext;
14use crate::{service, trace, trace_guard};
15use fuchsia_async as fasync;
16use std::collections::HashSet;
17use std::rc::Rc;
18
19fn get_event_setting_types() -> HashSet<SettingType> {
22 vec![SettingType::Input].into_iter().collect()
23}
24
25pub(crate) struct CameraWatcherAgent {
27 publisher: Publisher,
28 messenger: service::message::Messenger,
29
30 recipient_settings: HashSet<SettingType>,
32}
33
34impl CameraWatcherAgent {
35 pub(crate) async fn create(context: AgentContext) {
36 let mut agent = CameraWatcherAgent {
37 publisher: context.get_publisher(),
38 messenger: context
39 .create_messenger()
40 .await
41 .expect("messenger should be created for CameraWatchAgent"),
42 recipient_settings: context
43 .available_components
44 .intersection(&get_event_setting_types())
45 .cloned()
46 .collect::<HashSet<SettingType>>(),
47 };
48
49 let mut receptor = context.receptor;
50 fasync::Task::local(async move {
51 let id = fuchsia_trace::Id::new();
52 let guard = trace_guard!(id, c"camera watcher agent");
53 while let Ok((payload, client)) = receptor.next_of::<Payload>().await {
54 trace!(id, c"payload");
55 if let Payload::Invocation(invocation) = payload {
56 let _ = client.reply(Payload::Complete(agent.handle(invocation).await).into());
57 }
58 }
59 drop(guard);
60
61 log::info!("Camera watcher agent done processing requests");
62 })
63 .detach()
64 }
65
66 async fn handle(&mut self, invocation: Invocation) -> InvocationResult {
67 match invocation.lifespan {
68 Lifespan::Initialization => Err(AgentError::UnhandledLifespan),
69 Lifespan::Service => self.handle_service_lifespan(invocation.service_context).await,
70 }
71 }
72
73 async fn handle_service_lifespan(
74 &mut self,
75 service_context: Rc<ServiceContext>,
76 ) -> InvocationResult {
77 match connect_to_camera(service_context).await {
78 Ok(camera_device_client) => {
79 let mut event_handler = EventHandler {
80 publisher: self.publisher.clone(),
81 messenger: self.messenger.clone(),
82 recipient_settings: self.recipient_settings.clone(),
83 sw_muted: false,
84 };
85 fasync::Task::local(async move {
86 let id = fuchsia_trace::Id::new();
87 trace!(id, c"camera_watcher_agent_handler");
92 while let Ok((sw_muted, _hw_muted)) =
93 camera_device_client.watch_mute_state().await
94 {
95 trace!(id, c"event");
96 event_handler.handle_event(sw_muted);
97 }
98 })
99 .detach();
100
101 Ok(())
102 }
103 Err(e) => {
104 log::error!("Unable to watch camera device: {:?}", e);
105 Err(AgentError::UnexpectedError)
106 }
107 }
108 }
109}
110
111struct EventHandler {
112 publisher: Publisher,
113 messenger: service::message::Messenger,
114 recipient_settings: HashSet<SettingType>,
115 sw_muted: bool,
116}
117
118impl EventHandler {
119 fn handle_event(&mut self, sw_muted: bool) {
120 if self.sw_muted != sw_muted {
121 self.sw_muted = sw_muted;
122 self.send_event(sw_muted);
123 }
124 }
125
126 fn send_event(&self, muted: bool) {
127 self.publisher.send_event(Event::CameraUpdate(camera_watcher::Event::OnSWMuteState(muted)));
128 let setting_request: Request = Request::OnCameraSWState(muted);
129
130 for setting_type in self.recipient_settings.iter() {
132 let _ = self.messenger.message(
134 HandlerPayload::Request(setting_request.clone()).into(),
135 Audience::Address(service::Address::Handler(*setting_type)),
136 );
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::event;
145 use crate::message::base::{MessageEvent, MessengerType};
146 use crate::message::receptor::Receptor;
147 use crate::tests::fakes::service_registry::ServiceRegistry;
148 use crate::tests::helpers::{
149 create_messenger_and_publisher, create_messenger_and_publisher_from_hub,
150 create_receptor_for_setting_type,
151 };
152 use assert_matches::assert_matches;
153 use futures::StreamExt;
154
155 #[fuchsia::test(allow_stalls = false)]
157 async fn initialization_lifespan_is_unhandled() {
158 let (messenger, publisher) = create_messenger_and_publisher().await;
160
161 let mut agent =
163 CameraWatcherAgent { publisher, messenger, recipient_settings: HashSet::new() };
164
165 let result = agent
167 .handle(Invocation {
168 lifespan: Lifespan::Initialization,
169 service_context: Rc::new(ServiceContext::new(None, None)),
170 })
171 .await;
172
173 assert!(matches!(result, Err(AgentError::UnhandledLifespan)));
174 }
175
176 #[fuchsia::test(allow_stalls = false)]
178 async fn when_camera3_inaccessible_returns_err() {
179 let (messenger, publisher) = create_messenger_and_publisher().await;
181
182 let mut agent =
184 CameraWatcherAgent { publisher, messenger, recipient_settings: HashSet::new() };
185
186 let service_context = Rc::new(ServiceContext::new(
187 Some(ServiceRegistry::serve(ServiceRegistry::create())),
189 None,
190 ));
191
192 let result =
194 agent.handle(Invocation { lifespan: Lifespan::Service, service_context }).await;
195 assert!(matches!(result, Err(AgentError::UnexpectedError)));
196 }
197
198 #[fuchsia::test(allow_stalls = false)]
200 async fn event_handler_proxies_event() {
201 let service_message_hub = service::MessageHub::create_hub();
202 let (messenger, publisher) =
203 create_messenger_and_publisher_from_hub(&service_message_hub).await;
204
205 let event_receptor = service::build_event_listener(&service_message_hub).await;
210
211 let handler_receptor: Receptor =
216 create_receptor_for_setting_type(&service_message_hub, SettingType::Unknown).await;
217
218 let mut event_handler = EventHandler {
219 publisher,
220 messenger,
221 recipient_settings: vec![SettingType::Unknown].into_iter().collect(),
222 sw_muted: false,
223 };
224
225 event_handler.handle_event(true);
227
228 service_message_hub.delete(handler_receptor.get_signature());
231 service_message_hub.delete(event_receptor.get_signature());
232
233 let mut agent_received_sw_mute = false;
234 let mut handler_received_event = false;
235
236 let fused_event = event_receptor.fuse();
237 let fused_setting_handler = handler_receptor.fuse();
238 futures::pin_mut!(fused_event, fused_setting_handler);
239
240 loop {
244 futures::select! {
245 message = fused_event.select_next_some() => {
246 if let MessageEvent::Message(service::Payload::Event(event::Payload::Event(
247 event::Event::CameraUpdate(event)
248 )), _) = message
249 {
250 match event {
251 event::camera_watcher::Event::OnSWMuteState(muted) => {
252 assert!(muted);
253 agent_received_sw_mute = true;
254 }
255 }
256 }
257 },
258 message = fused_setting_handler.select_next_some() => {
259 if let MessageEvent::Message(
260 service::Payload::Setting(HandlerPayload::Request(
261 Request::OnCameraSWState(_muted))),
262 _,
263 ) = message
264 {
265 handler_received_event = true;
266 }
267 }
268 complete => break,
269 }
270 }
271
272 assert!(agent_received_sw_mute);
273 assert!(handler_received_event);
274 }
275
276 #[fuchsia::test(allow_stalls = false)]
278 async fn event_handler_sends_no_events_if_no_settings_available() {
279 let service_message_hub = service::MessageHub::create_hub();
280 let (messenger, publisher) =
281 create_messenger_and_publisher_from_hub(&service_message_hub).await;
282 let handler_address = service::Address::Handler(SettingType::Unknown);
283 let verification_request = Request::Get;
284
285 let mut handler_receptor: Receptor = service_message_hub
290 .create(MessengerType::Addressable(handler_address))
291 .await
292 .expect("Unable to create handler receptor")
293 .1;
294
295 let mut event_handler = EventHandler {
297 publisher,
298 messenger,
299 recipient_settings: HashSet::new(),
300 sw_muted: false,
301 };
302
303 event_handler.handle_event(true);
305
306 let _ = service_message_hub
308 .create(MessengerType::Unbound)
309 .await
310 .expect("Unable to create messenger")
311 .0
312 .message(
313 HandlerPayload::Request(verification_request.clone()).into(),
314 Audience::Address(handler_address),
315 );
316
317 service_message_hub.delete(handler_receptor.get_signature());
320
321 assert_matches!(
322 handler_receptor.next_of::<HandlerPayload>().await,
323 Ok((HandlerPayload::Request(request), _))
324 if request == verification_request
325 )
326 }
327}