settings/intl/
intl_fidl_handler.rs1use super::intl_controller::{IntlController, IntlError, Request};
6use super::types::IntlInfo;
7use async_utils::hanging_get::server;
8use fidl_fuchsia_settings::{
9 Error as SettingsError, IntlRequest, IntlRequestStream, IntlSettings, IntlWatchResponder,
10};
11use fuchsia_async as fasync;
12use futures::StreamExt;
13use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
14use futures::channel::oneshot;
15use settings_common::inspect::event::{
16 RequestType, ResponseType, UsagePublisher, UsageResponsePublisher,
17};
18
19pub(super) type SubscriberObject = (UsageResponsePublisher<IntlInfo>, IntlWatchResponder);
20type HangingGetFn = fn(&IntlInfo, SubscriberObject) -> bool;
21pub(super) type HangingGet = server::HangingGet<IntlInfo, SubscriberObject, HangingGetFn>;
22pub(super) type Publisher = server::Publisher<IntlInfo, SubscriberObject, HangingGetFn>;
23pub(super) type Subscriber = server::Subscriber<IntlInfo, SubscriberObject, HangingGetFn>;
24
25pub struct IntlFidlHandler {
26 hanging_get: HangingGet,
27 controller_tx: UnboundedSender<Request>,
28 usage_publisher: UsagePublisher<IntlInfo>,
29}
30
31impl IntlFidlHandler {
32 pub(crate) fn new(
33 intl_controller: &mut IntlController,
34 usage_publisher: UsagePublisher<IntlInfo>,
35 initial_value: IntlInfo,
36 ) -> (Self, UnboundedReceiver<Request>) {
37 let hanging_get = HangingGet::new(initial_value, Self::hanging_get);
38 intl_controller.register_publisher(hanging_get.new_publisher());
39 let (controller_tx, controller_rx) = mpsc::unbounded();
40 (Self { hanging_get, controller_tx, usage_publisher }, controller_rx)
41 }
42
43 fn hanging_get(info: &IntlInfo, (usage_responder, responder): SubscriberObject) -> bool {
44 usage_responder.respond(format!("{info:?}"), ResponseType::OkSome);
45 if let Err(e) = responder.send(&IntlSettings::from(info.clone())) {
46 log::warn!("Failed to respond to watch request: {e:?}");
47 return false;
48 }
49 true
50 }
51
52 pub fn handle_stream(&mut self, mut stream: IntlRequestStream) {
53 let request_handler = RequestHandler {
54 subscriber: self.hanging_get.new_subscriber(),
55 controller_tx: self.controller_tx.clone(),
56 usage_publisher: self.usage_publisher.clone(),
57 };
58 fasync::Task::local(async move {
59 while let Some(Ok(request)) = stream.next().await {
60 request_handler.handle_request(request).await;
61 }
62 })
63 .detach();
64 }
65}
66
67#[derive(Debug)]
68enum HandlerError {
69 AlreadySubscribed,
70 ControllerStopped,
71 Controller(IntlError),
72}
73
74impl From<&HandlerError> for ResponseType {
75 fn from(error: &HandlerError) -> Self {
76 match error {
77 HandlerError::AlreadySubscribed => ResponseType::AlreadySubscribed,
78 HandlerError::ControllerStopped => ResponseType::UnexpectedError,
79 HandlerError::Controller(e) => ResponseType::from(e),
80 }
81 }
82}
83
84struct RequestHandler {
85 subscriber: Subscriber,
86 controller_tx: UnboundedSender<Request>,
87 usage_publisher: UsagePublisher<IntlInfo>,
88}
89
90impl RequestHandler {
91 async fn handle_request(&self, request: IntlRequest) {
92 match request {
93 IntlRequest::Watch { responder } => {
94 let usage_res = self.usage_publisher.request("Watch".to_string(), RequestType::Get);
95 if let Err((usage_res, responder)) =
96 self.subscriber.register2((usage_res, responder))
97 {
98 let e = HandlerError::AlreadySubscribed;
99 usage_res.respond(format!("Err({e:?})"), ResponseType::from(&e));
100 drop(responder);
101 }
102 }
103 IntlRequest::Set { settings, responder } => {
104 let usage_res = self
105 .usage_publisher
106 .request(format!("Set{{settings:{settings:?}}}"), RequestType::Set);
107 if let Err(e) = self.set(settings).await {
108 usage_res.respond(format!("Err({e:?}"), ResponseType::from(&e));
109 let _ = responder.send(Err(SettingsError::Failed));
110 } else {
111 usage_res.respond("Ok(())".to_string(), ResponseType::OkNone);
112 let _ = responder.send(Ok(()));
113 }
114 }
115 }
116 }
117
118 async fn set(&self, settings: IntlSettings) -> Result<(), HandlerError> {
119 let (set_tx, set_rx) = oneshot::channel();
120 self.controller_tx
121 .unbounded_send(Request::Set(IntlInfo::from(settings), set_tx))
122 .map_err(|_| HandlerError::ControllerStopped)?;
123 set_rx
124 .await
125 .map_err(|_| HandlerError::ControllerStopped)
126 .and_then(|res| res.map_err(HandlerError::Controller))
127 }
128}