1use crate::agent::{Context, Payload};
14use crate::base::{SettingInfo, SettingType};
15use crate::handler::base::{Error, Payload as HandlerPayload, Request};
16use crate::inspect::utils::enums::ResponseType;
17use crate::message::base::{MessageEvent, MessengerType};
18use crate::message::receptor::Receptor;
19use crate::service::TryFromWithClient;
20use crate::{clock, service, trace};
21use settings_inspect_utils::joinable_inspect_vecdeque::JoinableInspectVecDeque;
22use settings_inspect_utils::managed_inspect_map::ManagedInspectMap;
23use settings_inspect_utils::managed_inspect_queue::ManagedInspectQueue;
24
25use fuchsia_async as fasync;
26use fuchsia_inspect::{self as inspect, component, NumericProperty};
27use fuchsia_inspect_derive::{IValue, Inspect};
28use futures::stream::FuturesUnordered;
29use futures::StreamExt;
30use std::rc::Rc;
31
32const MAX_PENDING_REQUESTS: usize = 20;
36
37const MAX_REQUEST_RESPONSE_PAIRS: usize = 10;
39
40const MAX_REQUEST_RESPONSE_TIMESTAMPS: usize = 10;
42
43const REQUEST_RESPONSE_NODE_NAME: &str = "requests_and_responses";
45
46const RESPONSE_COUNTS_NODE_NAME: &str = "response_counts";
48
49#[derive(Default, Inspect)]
50struct SettingTypeResponseCountInfo {
52 #[inspect(forward)]
55 response_counts_by_type: ManagedInspectMap<ResponseTypeCount>,
56}
57
58#[derive(Default, Inspect)]
59struct ResponseTypeCount {
61 count: inspect::UintProperty,
62 inspect_node: inspect::Node,
63}
64
65#[derive(Inspect)]
67struct SettingTypeRequestResponseInfo {
68 #[inspect(rename = "requests_and_responses")]
75 requests_and_responses_by_type: ManagedInspectMap<ManagedInspectMap<RequestResponsePairInfo>>,
76
77 pending_requests: ManagedInspectQueue<PendingRequestInspectInfo>,
79
80 inspect_node: inspect::Node,
82
83 #[inspect(skip)]
88 count: u64,
89}
90
91impl SettingTypeRequestResponseInfo {
92 fn new() -> Self {
93 Self {
94 requests_and_responses_by_type: Default::default(),
95 pending_requests: ManagedInspectQueue::<PendingRequestInspectInfo>::new(
96 MAX_PENDING_REQUESTS,
97 ),
98 inspect_node: Default::default(),
99 count: 0,
100 }
101 }
102}
103
104#[derive(Debug, Default, Inspect)]
106struct PendingRequestInspectInfo {
107 request: IValue<String>,
109
110 #[inspect(skip)]
113 request_type: String,
114
115 timestamp: IValue<String>,
118
119 #[inspect(skip)]
121 count: u64,
122
123 inspect_node: inspect::Node,
125}
126
127#[derive(Default, Inspect)]
133struct RequestResponsePairInfo {
134 request: IValue<String>,
136
137 response: IValue<String>,
139
140 #[inspect(skip)]
142 request_count: u64,
143
144 request_timestamps: IValue<JoinableInspectVecDeque>,
149
150 response_timestamps: IValue<JoinableInspectVecDeque>,
155
156 inspect_node: inspect::Node,
158}
159
160impl RequestResponsePairInfo {
161 fn new(request: String, response: String, count: u64) -> Self {
162 Self {
163 request: IValue::new(request),
164 response: IValue::new(response),
165 request_count: count,
166 request_timestamps: Default::default(),
167 response_timestamps: Default::default(),
168 inspect_node: Default::default(),
169 }
170 }
171}
172
173pub(crate) struct SettingProxyInspectAgent {
176 response_counts: ManagedInspectMap<SettingTypeResponseCountInfo>,
178
179 setting_request_response_info: ManagedInspectMap<SettingTypeRequestResponseInfo>,
181}
182
183impl SettingProxyInspectAgent {
184 pub(crate) async fn create(context: Context) {
185 Self::create_with_node(
186 context,
187 component::inspector().root().create_child(REQUEST_RESPONSE_NODE_NAME),
188 component::inspector().root().create_child(RESPONSE_COUNTS_NODE_NAME),
189 )
190 .await;
191 }
192
193 async fn create_with_node(
194 context: Context,
195 request_response_inspect_node: inspect::Node,
196 response_counts_node: inspect::Node,
197 ) {
198 let (_, message_rx) = context
199 .delegate
200 .create(MessengerType::Broker(Rc::new(move |message| {
201 matches!(message.payload(), service::Payload::Setting(HandlerPayload::Request(_)))
203 })))
204 .await
205 .expect("should receive client");
206
207 let mut agent = SettingProxyInspectAgent {
208 response_counts: ManagedInspectMap::<SettingTypeResponseCountInfo>::with_node(
209 response_counts_node,
210 ),
211 setting_request_response_info:
212 ManagedInspectMap::<SettingTypeRequestResponseInfo>::with_node(
213 request_response_inspect_node,
214 ),
215 };
216
217 fasync::Task::local({
218 async move {
219 let _ = &context;
220 let id = fuchsia_trace::Id::new();
221 trace!(id, c"setting_proxy_inspect_agent");
222 let event = message_rx.fuse();
223 let agent_event = context.receptor.fuse();
224 futures::pin_mut!(agent_event, event);
225
226 let mut unordered = FuturesUnordered::new();
229 loop {
230 futures::select! {
231 message_event = event.select_next_some() => {
232 trace!(
233 id,
234 c"message_event"
235 );
236 if let Some((setting_type, count, mut reply_receptor)) =
237 agent.process_message_event(message_event) {
238 unordered.push(async move {
239 let payload = reply_receptor.next_payload().await;
240 (setting_type, count, payload)
241 });
242 };
243 },
244 reply = unordered.select_next_some() => {
245 let (setting_type, count, payload) = reply;
246 if let Ok((
247 service::Payload::Setting(
248 HandlerPayload::Response(response)),
249 _,
250 )) = payload
251 {
252 agent.record_response(setting_type, count, response);
253 }
254 },
255 agent_message = agent_event.select_next_some() => {
256 trace!(
257 id,
258 c"agent_event"
259 );
260 if let MessageEvent::Message(
261 service::Payload::Agent(Payload::Invocation(_invocation)), client)
262 = agent_message {
263 let _ = client.reply(Payload::Complete(Ok(())).into());
266 }
267 },
268 }
269 }
270 }})
271 .detach();
272 }
273
274 fn process_message_event(
277 &mut self,
278 event: service::message::MessageEvent,
279 ) -> Option<(SettingType, u64, Receptor)> {
280 if let Ok((HandlerPayload::Request(request), mut client)) =
281 HandlerPayload::try_from_with_client(event)
282 {
283 if let service::message::Audience::Address(service::Address::Handler(setting_type)) =
284 client.get_audience()
285 {
286 if request != Request::Listen {
290 let count = self.record_request(setting_type, request);
291 return Some((setting_type, count, client.spawn_observer()));
292 }
293 }
294 }
295 None
296 }
297
298 fn record_request(&mut self, setting_type: SettingType, request: Request) -> u64 {
302 let setting_type_str = format!("{setting_type:?}");
303 let timestamp = clock::inspect_format_now();
304
305 let request_response_info = self
307 .setting_request_response_info
308 .get_or_insert_with(setting_type_str, SettingTypeRequestResponseInfo::new);
309
310 let request_count = request_response_info.count;
311 request_response_info.count += 1;
312
313 let pending_request_info = PendingRequestInspectInfo {
314 request: format!("{request:?}").into(),
315 request_type: request.for_inspect().to_string(),
316 timestamp: timestamp.into(),
317 count: request_count,
318 inspect_node: inspect::Node::default(),
319 };
320
321 let count_key = format!("{request_count:020}");
322 request_response_info.pending_requests.push(&count_key, pending_request_info);
323
324 request_count
325 }
326
327 fn record_response(
330 &mut self,
331 setting_type: SettingType,
332 count: u64,
333 response: Result<Option<SettingInfo>, Error>,
334 ) {
335 let setting_type_str = format!("{setting_type:?}");
336 let timestamp = clock::inspect_format_now();
337
338 self.increment_response_count(setting_type_str.clone(), &response);
340
341 let condensed_setting_type_info = self
344 .setting_request_response_info
345 .map_mut()
346 .get_mut(&setting_type_str)
347 .expect("Missing info for request");
348
349 let pending_requests = &mut condensed_setting_type_info.pending_requests;
350
351 let position = match pending_requests.iter_mut().position(|info| info.count == count) {
355 Some(position) => position,
356 None => {
357 return;
360 }
361 };
362 let pending =
363 pending_requests.items_mut().remove(position).expect("Failed to find pending item");
364
365 let request_type_info_map = condensed_setting_type_info
367 .requests_and_responses_by_type
368 .get_or_insert_with(pending.request_type, || {
369 ManagedInspectMap::<RequestResponsePairInfo>::default()
370 });
371
372 let map_key = format!("{:?}{:?}", pending.request, response);
375
376 let removed_info = request_type_info_map.map_mut().remove(&map_key);
381
382 let response_str = format!("{response:?}");
383 let mut info = removed_info.unwrap_or_else(|| {
384 RequestResponsePairInfo::new(pending.request.into_inner(), response_str, pending.count)
385 });
386 {
387 let mut_requests = &mut info.request_timestamps.as_mut().0;
391 let mut_responses = &mut info.response_timestamps.as_mut().0;
392
393 mut_requests.push_back(pending.timestamp.into_inner());
394 mut_responses.push_back(timestamp);
395
396 if mut_requests.len() > MAX_REQUEST_RESPONSE_TIMESTAMPS {
398 let _ = mut_requests.pop_front();
399 }
400 if mut_responses.len() > MAX_REQUEST_RESPONSE_TIMESTAMPS {
401 let _ = mut_responses.pop_front();
402 }
403 }
404
405 let count_key = format!("{:020}", pending.count);
407 let _ = request_type_info_map.insert_with_property_name(map_key, count_key, info);
408
409 let num_request_response_pairs = request_type_info_map.map().len();
411 if num_request_response_pairs > MAX_REQUEST_RESPONSE_PAIRS {
412 let mut lowest_count: u64 = u64::MAX;
415 let mut lowest_key: Option<String> = None;
416 for (key, inspect_info) in request_type_info_map.map() {
417 if inspect_info.request_count < lowest_count {
418 lowest_count = inspect_info.request_count;
419 lowest_key = Some(key.clone());
420 }
421 }
422
423 if let Some(key_to_remove) = lowest_key {
424 let _ = request_type_info_map.map_mut().remove(&key_to_remove);
425 }
426 }
427 }
428
429 fn increment_response_count(
430 &mut self,
431 setting_type_str: String,
432 response: &Result<Option<SettingInfo>, Error>,
433 ) {
434 let response_count_info = self
437 .response_counts
438 .get_or_insert_with(setting_type_str, SettingTypeResponseCountInfo::default);
439
440 let response_type: ResponseType = response.clone().into();
443 let response_count = response_count_info
444 .response_counts_by_type
445 .get_or_insert_with(format!("{response_type:?}"), ResponseTypeCount::default);
446 let _ = response_count.count.add(1u64);
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::display::types::SetDisplayInfo;
454 use crate::intl::types::{IntlInfo, LocaleId, TemperatureUnit};
455 use diagnostics_assertions::{assert_data_tree, TreeAssertion};
456 use std::collections::HashSet;
457 use zx::MonotonicInstant;
458
459 struct RequestProcessor {
463 delegate: service::message::Delegate,
464 }
465
466 impl RequestProcessor {
467 fn new(delegate: service::message::Delegate) -> Self {
468 RequestProcessor { delegate }
469 }
470
471 async fn send_request(
472 &self,
473 setting_type: SettingType,
474 setting_request: Request,
475 should_reply: bool,
476 ) {
477 let (messenger, _) =
478 self.delegate.create(MessengerType::Unbound).await.expect("should be created");
479
480 let (_, mut receptor) = self
481 .delegate
482 .create(MessengerType::Addressable(service::Address::Handler(setting_type)))
483 .await
484 .expect("should be created");
485
486 let mut reply_receptor = messenger.message(
487 HandlerPayload::Request(setting_request).into(),
488 service::message::Audience::Address(service::Address::Handler(setting_type)),
489 );
490
491 if let Some(message_event) = futures::StreamExt::next(&mut receptor).await {
492 if let Ok((_, reply_client)) = HandlerPayload::try_from_with_client(message_event) {
493 if should_reply {
494 let _ = reply_client.reply(HandlerPayload::Response(Ok(None)).into());
495 }
496 }
497 }
498 let _ = reply_receptor.next_payload().await;
499 }
500
501 async fn send_and_receive(&self, setting_type: SettingType, setting_request: Request) {
502 self.send_request(setting_type, setting_request, true).await;
503 }
504 }
505
506 async fn create_context() -> Context {
507 Context::new(
508 service::MessageHub::create_hub()
509 .create(MessengerType::Unbound)
510 .await
511 .expect("should be present")
512 .1,
513 service::MessageHub::create_hub(),
514 HashSet::new(),
515 )
516 .await
517 }
518
519 #[fuchsia::test(allow_stalls = false)]
522 async fn test_inspect_grouped_responses() {
523 clock::mock::set(MonotonicInstant::from_nanos(0));
525
526 let inspector = inspect::Inspector::default();
527 let condense_node = inspector.root().create_child(REQUEST_RESPONSE_NODE_NAME);
528 let response_counts_node = inspector.root().create_child(RESPONSE_COUNTS_NODE_NAME);
529 let context = create_context().await;
530
531 let request_processor = RequestProcessor::new(context.delegate.clone());
532
533 SettingProxyInspectAgent::create_with_node(context, condense_node, response_counts_node)
534 .await;
535
536 let turn_off_auto_brightness = Request::SetDisplayInfo(SetDisplayInfo {
538 auto_brightness: Some(false),
539 ..SetDisplayInfo::default()
540 });
541 request_processor
542 .send_and_receive(SettingType::Display, turn_off_auto_brightness.clone())
543 .await;
544
545 clock::mock::set(MonotonicInstant::from_nanos(100));
547 request_processor
548 .send_and_receive(
549 SettingType::Display,
550 Request::SetDisplayInfo(SetDisplayInfo {
551 auto_brightness: Some(true),
552 ..SetDisplayInfo::default()
553 }),
554 )
555 .await;
556
557 clock::mock::set(MonotonicInstant::from_nanos(200));
560 request_processor.send_and_receive(SettingType::Display, turn_off_auto_brightness).await;
561
562 assert_data_tree!(inspector, root: contains {
563 requests_and_responses: {
564 "Display": {
565 "pending_requests": {},
566 "requests_and_responses": {
567 "SetDisplayInfo": {
568 "00000000000000000001": {
569 "request": "SetDisplayInfo(SetDisplayInfo { \
570 manual_brightness_value: None, \
571 auto_brightness_value: None, \
572 auto_brightness: Some(true), \
573 screen_enabled: None, \
574 low_light_mode: None, \
575 theme: None \
576 })",
577 "request_timestamps": "0.000000100",
578 "response": "Ok(None)",
579 "response_timestamps": "0.000000100"
580 },
581 "00000000000000000002": {
582 "request": "SetDisplayInfo(SetDisplayInfo { \
583 manual_brightness_value: None, \
584 auto_brightness_value: None, \
585 auto_brightness: Some(false), \
586 screen_enabled: None, \
587 low_light_mode: None, \
588 theme: None \
589 })",
590 "request_timestamps": "0.000000000,0.000000200",
591 "response": "Ok(None)",
592 "response_timestamps": "0.000000000,0.000000200"
593 }
594 }
595 }
596 }
597 },
598 });
599 }
600
601 #[fuchsia::test(allow_stalls = false)]
604 async fn test_inspect_mixed_request_types() {
605 clock::mock::set(MonotonicInstant::from_nanos(0));
607
608 let inspector = inspect::Inspector::default();
609 let condense_node = inspector.root().create_child(REQUEST_RESPONSE_NODE_NAME);
610 let response_counts_node = inspector.root().create_child(RESPONSE_COUNTS_NODE_NAME);
611 let context = create_context().await;
612
613 let request_processor = RequestProcessor::new(context.delegate.clone());
614
615 SettingProxyInspectAgent::create_with_node(context, condense_node, response_counts_node)
616 .await;
617
618 request_processor
620 .send_and_receive(
621 SettingType::Display,
622 Request::SetDisplayInfo(SetDisplayInfo {
623 auto_brightness: Some(false),
624 ..SetDisplayInfo::default()
625 }),
626 )
627 .await;
628
629 clock::mock::set(MonotonicInstant::from_nanos(100));
631 request_processor.send_and_receive(SettingType::Display, Request::Get).await;
632
633 clock::mock::set(MonotonicInstant::from_nanos(200));
635 request_processor
636 .send_and_receive(
637 SettingType::Display,
638 Request::SetDisplayInfo(SetDisplayInfo {
639 auto_brightness: Some(true),
640 ..SetDisplayInfo::default()
641 }),
642 )
643 .await;
644
645 clock::mock::set(MonotonicInstant::from_nanos(300));
646 request_processor.send_and_receive(SettingType::Display, Request::Get).await;
647
648 assert_data_tree!(inspector, root: contains {
649 requests_and_responses: {
650 "Display": {
651 "pending_requests": {},
652 "requests_and_responses": {
653 "Get": {
654 "00000000000000000003": {
655 "request": "Get",
656 "request_timestamps": "0.000000100,0.000000300",
657 "response": "Ok(None)",
658 "response_timestamps": "0.000000100,0.000000300"
659 }
660 },
661 "SetDisplayInfo": {
662 "00000000000000000000": {
663 "request": "SetDisplayInfo(SetDisplayInfo { \
664 manual_brightness_value: None, \
665 auto_brightness_value: None, \
666 auto_brightness: Some(false), \
667 screen_enabled: None, \
668 low_light_mode: None, \
669 theme: None \
670 })",
671 "request_timestamps": "0.000000000",
672 "response": "Ok(None)",
673 "response_timestamps": "0.000000000"
674 },
675 "00000000000000000002": {
676 "request": "SetDisplayInfo(SetDisplayInfo { \
677 manual_brightness_value: None, \
678 auto_brightness_value: None, \
679 auto_brightness: Some(true), \
680 screen_enabled: None, \
681 low_light_mode: None, \
682 theme: None \
683 })",
684 "request_timestamps": "0.000000200",
685 "response": "Ok(None)",
686 "response_timestamps": "0.000000200"
687 }
688 }
689 }
690 }
691 },
692 response_counts: {
693 "Display": {
694 "OkNone": {
695 count: 4u64,
696 }
697 },
698 },
699 });
700 }
701
702 #[fuchsia::test(allow_stalls = false)]
703 async fn test_pending_request() {
704 clock::mock::set(MonotonicInstant::from_nanos(0));
706
707 let inspector = inspect::Inspector::default();
708 let condense_node = inspector.root().create_child(REQUEST_RESPONSE_NODE_NAME);
709 let request_counts_node = inspector.root().create_child(RESPONSE_COUNTS_NODE_NAME);
710 let context = create_context().await;
711
712 let request_processor = RequestProcessor::new(context.delegate.clone());
713
714 SettingProxyInspectAgent::create_with_node(context, condense_node, request_counts_node)
715 .await;
716
717 request_processor
718 .send_request(
719 SettingType::Display,
720 Request::SetDisplayInfo(SetDisplayInfo {
721 auto_brightness: Some(false),
722 ..SetDisplayInfo::default()
723 }),
724 false,
725 )
726 .await;
727
728 assert_data_tree!(inspector, root: contains {
729 requests_and_responses: {
730 "Display": {
731 "pending_requests": {
732 "00000000000000000000": {
733 "request": "SetDisplayInfo(SetDisplayInfo { \
734 manual_brightness_value: None, \
735 auto_brightness_value: None, \
736 auto_brightness: Some(false), \
737 screen_enabled: None, \
738 low_light_mode: None, \
739 theme: None \
740 })",
741 "timestamp": "0.000000000",
742 }
743 },
744 "requests_and_responses": {}
745 }
746 },
747 });
748 }
749
750 #[fuchsia::test(allow_stalls = false)]
751 async fn test_response_counts_inspect() {
752 clock::mock::set(MonotonicInstant::from_nanos(0));
754
755 let inspector = inspect::Inspector::default();
756 let condense_node = inspector.root().create_child(REQUEST_RESPONSE_NODE_NAME);
757 let request_counts_node = inspector.root().create_child(RESPONSE_COUNTS_NODE_NAME);
758 let context = create_context().await;
759
760 let request_processor = RequestProcessor::new(context.delegate.clone());
761
762 SettingProxyInspectAgent::create_with_node(context, condense_node, request_counts_node)
763 .await;
764
765 request_processor
766 .send_and_receive(
767 SettingType::Display,
768 Request::SetDisplayInfo(SetDisplayInfo {
769 auto_brightness: Some(false),
770 ..SetDisplayInfo::default()
771 }),
772 )
773 .await;
774
775 clock::mock::set(MonotonicInstant::from_nanos(100));
776 request_processor.send_and_receive(SettingType::Display, Request::Get).await;
777
778 clock::mock::set(MonotonicInstant::from_nanos(200));
779 request_processor
780 .send_and_receive(
781 SettingType::Display,
782 Request::SetDisplayInfo(SetDisplayInfo {
783 auto_brightness: None,
784 ..SetDisplayInfo::default()
785 }),
786 )
787 .await;
788
789 clock::mock::set(MonotonicInstant::from_nanos(300));
790 request_processor.send_and_receive(SettingType::Display, Request::Get).await;
791
792 assert_data_tree!(inspector, root: contains {
793 response_counts: {
794 "Display": {
795 "OkNone": {
796 count: 4u64,
797 },
798 },
799 },
800 });
801 }
802
803 #[fuchsia::test(allow_stalls = false)]
806 async fn inspect_queue_test() {
807 clock::mock::set(MonotonicInstant::from_nanos(0));
809 let inspector = inspect::Inspector::default();
810 let condense_node = inspector.root().create_child(REQUEST_RESPONSE_NODE_NAME);
811 let response_counts_node = inspector.root().create_child(RESPONSE_COUNTS_NODE_NAME);
812 let context = create_context().await;
813 let request_processor = RequestProcessor::new(context.delegate.clone());
814
815 SettingProxyInspectAgent::create_with_node(context, condense_node, response_counts_node)
816 .await;
817
818 request_processor
819 .send_and_receive(
820 SettingType::Intl,
821 Request::SetIntlInfo(IntlInfo {
822 locales: Some(vec![LocaleId { id: "en-US".to_string() }]),
823 temperature_unit: Some(TemperatureUnit::Celsius),
824 time_zone_id: Some("UTC".to_string()),
825 hour_cycle: None,
826 }),
827 )
828 .await;
829
830 for i in 0..MAX_REQUEST_RESPONSE_PAIRS + 1 {
833 request_processor
834 .send_and_receive(
835 SettingType::Display,
836 Request::SetDisplayInfo(SetDisplayInfo {
837 manual_brightness_value: Some((i as f32) / 100f32),
838 ..SetDisplayInfo::default()
839 }),
840 )
841 .await;
842 }
843
844 fn display_subtree_assertion() -> TreeAssertion {
847 let mut tree_assertion = TreeAssertion::new("Display", false);
848 let mut request_response_assertion = TreeAssertion::new("requests_and_responses", true);
849 let mut request_assertion = TreeAssertion::new("SetDisplayInfo", true);
850
851 for i in 1..MAX_REQUEST_RESPONSE_PAIRS + 1 {
852 request_assertion
854 .add_child_assertion(TreeAssertion::new(&format!("{i:020}"), false));
855 }
856 request_response_assertion.add_child_assertion(request_assertion);
857 tree_assertion.add_child_assertion(request_response_assertion);
858 tree_assertion
859 }
860
861 assert_data_tree!(inspector, root: contains {
862 requests_and_responses: {
863 display_subtree_assertion(),
864 "Intl": {
865 "pending_requests": {},
866 "requests_and_responses": {
867 "SetIntlInfo": {
868 "00000000000000000000": {
869 "request": "SetIntlInfo(IntlInfo { \
870 locales: Some([LocaleId { id: \"en-US\" }]), \
871 temperature_unit: Some(Celsius), \
872 time_zone_id: Some(\"UTC\"), \
873 hour_cycle: None \
874 })",
875 "request_timestamps": "0.000000000",
876 "response": "Ok(None)",
877 "response_timestamps": "0.000000000"
878 }
879 }
880 }
881 }
882 },
883 });
884 }
885}