settings/agent/
camera_watcher.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::agent::{
6    AgentError, Context as AgentContext, Invocation, InvocationResult, Lifespan, Payload,
7};
8use crate::base::SettingType;
9use crate::event::{Event, Publisher, camera_watcher};
10use crate::handler::base::{Payload as HandlerPayload, Request};
11use crate::message::base::Audience;
12use crate::{service, trace, trace_guard};
13use fuchsia_async as fasync;
14use futures::channel::mpsc::UnboundedSender;
15use settings_camera::connect_to_camera;
16use settings_common::inspect::event::ExternalEventPublisher;
17use settings_common::service_context::ServiceContext;
18
19use std::collections::HashSet;
20use std::rc::Rc;
21
22use super::{AgentCreator, CreationFunc};
23
24/// Setting types that the camera watcher agent will send updates to, if they're
25/// available on the device.
26fn get_event_setting_types() -> HashSet<SettingType> {
27    vec![SettingType::Input].into_iter().collect()
28}
29
30pub(crate) fn create_registrar(
31    muted_txs: Vec<UnboundedSender<bool>>,
32    external_publisher: ExternalEventPublisher,
33) -> AgentCreator {
34    AgentCreator {
35        debug_id: "CameraWatcherAgent",
36        create: CreationFunc::Dynamic(Rc::new(move |context| {
37            let muted_txs = muted_txs.clone();
38            let external_publisher = external_publisher.clone();
39            Box::pin(async move {
40                CameraWatcherAgent::create(context, muted_txs, external_publisher).await;
41            })
42        })),
43    }
44}
45
46// TODO(https://fxbug.dev/42149412): Extract common template from agents.
47pub(crate) struct CameraWatcherAgent {
48    publisher: Publisher,
49    messenger: service::message::Messenger,
50
51    /// Settings to send camera watcher events to.
52    recipient_settings: HashSet<SettingType>,
53    /// Sends an event whenever camera muted state changes. The `bool`
54    /// represents whether the camera is muted or not.
55    muted_txs: Vec<UnboundedSender<bool>>,
56    external_publisher: ExternalEventPublisher,
57}
58
59impl CameraWatcherAgent {
60    pub(crate) async fn create(
61        context: AgentContext,
62        muted_txs: Vec<UnboundedSender<bool>>,
63        external_publisher: ExternalEventPublisher,
64    ) {
65        let mut agent = CameraWatcherAgent {
66            publisher: context.get_publisher(),
67            messenger: context
68                .create_messenger()
69                .await
70                .expect("messenger should be created for CameraWatchAgent"),
71            recipient_settings: context
72                .available_components
73                .intersection(&get_event_setting_types())
74                .cloned()
75                .collect::<HashSet<SettingType>>(),
76            muted_txs,
77            external_publisher,
78        };
79
80        let mut receptor = context.receptor;
81        fasync::Task::local(async move {
82            let id = fuchsia_trace::Id::new();
83            let guard = trace_guard!(id, c"camera watcher agent");
84            while let Ok((payload, client)) = receptor.next_of::<Payload>().await {
85                trace!(id, c"payload");
86                if let Payload::Invocation(invocation) = payload {
87                    let _ = client.reply(Payload::Complete(agent.handle(invocation).await).into());
88                }
89            }
90            drop(guard);
91
92            log::info!("Camera watcher agent done processing requests");
93        })
94        .detach()
95    }
96
97    async fn handle(&mut self, invocation: Invocation) -> InvocationResult {
98        match invocation.lifespan {
99            Lifespan::Initialization => Err(AgentError::UnhandledLifespan),
100            Lifespan::Service => {
101                self.handle_service_lifespan(&*invocation.service_context.common_context()).await
102            }
103        }
104    }
105
106    async fn handle_service_lifespan(
107        &mut self,
108        service_context: &ServiceContext,
109    ) -> InvocationResult {
110        match connect_to_camera(service_context, self.external_publisher.clone()).await {
111            Ok(camera_device_client) => {
112                let mut event_handler = EventHandler {
113                    muted_txs: self.muted_txs.clone(),
114                    publisher: self.publisher.clone(),
115                    messenger: self.messenger.clone(),
116                    recipient_settings: self.recipient_settings.clone(),
117                    sw_muted: false,
118                };
119                fasync::Task::local(async move {
120                    let id = fuchsia_trace::Id::new();
121                    // Here we don't care about hw_muted state because the input service would pick
122                    // up mute changes directly from the switch. We care about sw changes because
123                    // other clients of the camera3 service could change the sw mute state but not
124                    // notify the settings service.
125                    trace!(id, c"camera_watcher_agent_handler");
126                    while let Ok((sw_muted, _hw_muted)) =
127                        camera_device_client.watch_mute_state().await
128                    {
129                        trace!(id, c"event");
130                        event_handler.handle_event(sw_muted);
131                    }
132                })
133                .detach();
134
135                Ok(())
136            }
137            Err(e) => {
138                log::error!("Unable to watch camera device: {:?}", e);
139                Err(AgentError::UnexpectedError)
140            }
141        }
142    }
143}
144
145struct EventHandler {
146    muted_txs: Vec<UnboundedSender<bool>>,
147    publisher: Publisher,
148    messenger: service::message::Messenger,
149    recipient_settings: HashSet<SettingType>,
150    sw_muted: bool,
151}
152
153impl EventHandler {
154    fn handle_event(&mut self, sw_muted: bool) {
155        if self.sw_muted != sw_muted {
156            self.sw_muted = sw_muted;
157            self.send_event(sw_muted);
158        }
159    }
160
161    fn send_event(&self, muted: bool) {
162        for muted_tx in &self.muted_txs {
163            let _ = muted_tx.unbounded_send(muted);
164        }
165
166        self.publisher.send_event(Event::CameraUpdate(camera_watcher::Event::OnSWMuteState(muted)));
167        let setting_request: Request = Request::OnCameraSWState(muted);
168
169        // Send the event to all the interested setting types that are also available.
170        for setting_type in self.recipient_settings.iter() {
171            // Ignore the receptor result.
172            let _ = self.messenger.message(
173                HandlerPayload::Request(setting_request.clone()).into(),
174                Audience::Address(service::Address::Handler(*setting_type)),
175            );
176        }
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::event;
184    use crate::message::base::{MessageEvent, MessengerType};
185    use crate::message::receptor::Receptor;
186    use crate::tests::helpers::{
187        create_messenger_and_publisher, create_messenger_and_publisher_from_hub,
188        create_receptor_for_setting_type,
189    };
190    use assert_matches::assert_matches;
191    use futures::StreamExt;
192    use futures::channel::mpsc;
193    use settings_test_common::fakes::service::ServiceRegistry;
194
195    // Tests that the initialization lifespan is not handled.
196    #[fuchsia::test(allow_stalls = false)]
197    async fn initialization_lifespan_is_unhandled() {
198        // Setup messengers needed to construct the agent.
199        let (messenger, publisher) = create_messenger_and_publisher().await;
200        let (event_tx, _event_rx) = mpsc::unbounded();
201        let external_publisher = ExternalEventPublisher::new(event_tx);
202
203        // Construct the agent.
204        let mut agent = CameraWatcherAgent {
205            muted_txs: vec![],
206            publisher,
207            messenger,
208            recipient_settings: HashSet::new(),
209            external_publisher,
210        };
211
212        // Try to initiatate the initialization lifespan.
213        let result = agent
214            .handle(Invocation {
215                lifespan: Lifespan::Initialization,
216                service_context: Rc::new(crate::service_context::ServiceContext::new(None, None)),
217            })
218            .await;
219
220        assert!(matches!(result, Err(AgentError::UnhandledLifespan)));
221    }
222
223    // Tests that the agent cannot start without a camera3 service.
224    #[fuchsia::test(allow_stalls = false)]
225    async fn when_camera3_inaccessible_returns_err() {
226        // Setup messengers needed to construct the agent.
227        let (messenger, publisher) = create_messenger_and_publisher().await;
228        let (event_tx, _event_rx) = mpsc::unbounded();
229        let external_publisher = ExternalEventPublisher::new(event_tx);
230
231        // Construct the agent.
232        let mut agent = CameraWatcherAgent {
233            muted_txs: vec![],
234            publisher,
235            messenger,
236            recipient_settings: HashSet::new(),
237            external_publisher,
238        };
239
240        let service_context = Rc::new(crate::service_context::ServiceContext::new(
241            // Create a service registry without a camera3 service interface.
242            Some(ServiceRegistry::serve(ServiceRegistry::create())),
243            None,
244        ));
245
246        // Try to initiate the Service lifespan without providing the camera3 fidl interface.
247        let result =
248            agent.handle(Invocation { lifespan: Lifespan::Service, service_context }).await;
249        assert!(matches!(result, Err(AgentError::UnexpectedError)));
250    }
251
252    // Tests that events can be sent to the intended recipients.
253    #[fuchsia::test(allow_stalls = false)]
254    async fn event_handler_proxies_event() {
255        let service_message_hub = service::MessageHub::create_hub();
256        let (messenger, publisher) =
257            create_messenger_and_publisher_from_hub(&service_message_hub).await;
258
259        // Get the messenger's signature and the receptor for agents. We need
260        // a different messenger below because a broadcast would not send a message
261        // to itself. The signature is used to delete the original messenger for this
262        // receptor.
263        let event_receptor = service::build_event_listener(&service_message_hub).await;
264
265        // Get the messenger's signature and the receptor for agents. We need
266        // a different messenger below because a broadcast would not send a message
267        // to itself. The signature is used to delete the original messenger for this
268        // receptor.
269        let handler_receptor: Receptor =
270            create_receptor_for_setting_type(&service_message_hub, SettingType::Unknown).await;
271
272        let (tx1, mut rx1) = mpsc::unbounded();
273        let (tx2, mut rx2) = mpsc::unbounded();
274
275        let mut event_handler = EventHandler {
276            muted_txs: vec![tx1.clone(), tx2.clone()],
277            publisher,
278            messenger,
279            recipient_settings: vec![SettingType::Unknown].into_iter().collect(),
280            sw_muted: false,
281        };
282
283        // Send the events.
284        event_handler.handle_event(true);
285
286        // Delete the messengers for the receptors we're selecting below. This
287        // will allow the `select!` to eventually hit the `complete` case.
288        service_message_hub.delete(handler_receptor.get_signature());
289        service_message_hub.delete(event_receptor.get_signature());
290
291        let mut agent_received_sw_mute = false;
292        let mut handler_received_event = false;
293        let mut channel_received = 0;
294
295        let fused_event = event_receptor.fuse();
296        let fused_setting_handler = handler_receptor.fuse();
297        let mut next_rx1 = rx1.next();
298        let mut next_rx2 = rx2.next();
299        futures::pin_mut!(fused_event, fused_setting_handler);
300
301        // Loop over the select so we can handle the messages as they come in. When all messages
302        // have been handled, due to the messengers being deleted above, the complete branch should
303        // be hit to break out of the loop.
304        loop {
305            futures::select! {
306                message = fused_event.select_next_some() => {
307                    if let MessageEvent::Message(service::Payload::Event(event::Payload::Event(
308                        event::Event::CameraUpdate(event)
309                    )), _) = message
310                    {
311                        match event {
312                            event::camera_watcher::Event::OnSWMuteState(muted) => {
313                                assert!(muted);
314                                agent_received_sw_mute = true;
315                            }
316                        }
317                    }
318                },
319                message = fused_setting_handler.select_next_some() => {
320                    if let MessageEvent::Message(
321                        service::Payload::Setting(HandlerPayload::Request(
322                            Request::OnCameraSWState(_muted))),
323                        _,
324                    ) = message
325                    {
326                        handler_received_event = true;
327                    }
328                }
329                event = next_rx1 => {
330                    let Some(muted) = event else {
331                        continue;
332                    };
333                    assert!(muted);
334                    // Close channel so we can exit select loop.
335                    tx1.close_channel();
336                    channel_received += 1;
337                }
338                event = next_rx2 => {
339                    let Some(muted) = event else {
340                        continue;
341                    };
342                    assert!(muted);
343                    // Close channel so we can exit select loop.
344                    tx2.close_channel();
345                    channel_received += 1;
346                }
347                complete => break,
348            }
349        }
350
351        assert!(agent_received_sw_mute);
352        assert!(handler_received_event);
353        assert_eq!(channel_received, 2);
354    }
355
356    // Tests that events are not sent to unavailable settings.
357    #[fuchsia::test(allow_stalls = false)]
358    async fn event_handler_sends_no_events_if_no_settings_available() {
359        let service_message_hub = service::MessageHub::create_hub();
360        let (messenger, publisher) =
361            create_messenger_and_publisher_from_hub(&service_message_hub).await;
362        let handler_address = service::Address::Handler(SettingType::Unknown);
363        let verification_request = Request::Get;
364
365        // Get the messenger's signature and the receptor for agents. We need
366        // a different messenger below because a broadcast would not send a message
367        // to itself. The signature is used to delete the original messenger for this
368        // receptor.
369        let mut handler_receptor: Receptor = service_message_hub
370            .create(MessengerType::Addressable(handler_address))
371            .await
372            .expect("Unable to create handler receptor")
373            .1;
374
375        // Declare all settings as unavailable so that no events are sent.
376        let mut event_handler = EventHandler {
377            muted_txs: vec![],
378            publisher,
379            messenger,
380            recipient_settings: HashSet::new(),
381            sw_muted: false,
382        };
383
384        // Send the events
385        event_handler.handle_event(true);
386
387        // Send an arbitrary request that should be the next payload received.
388        let _ = service_message_hub
389            .create(MessengerType::Unbound)
390            .await
391            .expect("Unable to create messenger")
392            .0
393            .message(
394                HandlerPayload::Request(verification_request.clone()).into(),
395                Audience::Address(handler_address),
396            );
397
398        // Delete the messengers for the receptors we're selecting below. This will allow the while
399        // loop below to eventually finish.
400        service_message_hub.delete(handler_receptor.get_signature());
401
402        assert_matches!(
403            handler_receptor.next_of::<HandlerPayload>().await,
404            Ok((HandlerPayload::Request(request), _))
405                if request == verification_request
406        )
407    }
408}