settings/intl/
intl_fidl_handler.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::intl_controller::{IntlController, IntlError, Request};
6use super::types::IntlInfo;
7use async_utils::hanging_get::server;
8use fidl_fuchsia_settings::{
9    Error as SettingsError, IntlRequest, IntlRequestStream, IntlSettings, IntlWatchResponder,
10};
11use fuchsia_async as fasync;
12use futures::StreamExt;
13use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
14use futures::channel::oneshot;
15use settings_common::inspect::event::{
16    RequestType, ResponseType, UsagePublisher, UsageResponsePublisher,
17};
18
19pub(super) type SubscriberObject = (UsageResponsePublisher<IntlInfo>, IntlWatchResponder);
20type HangingGetFn = fn(&IntlInfo, SubscriberObject) -> bool;
21pub(super) type HangingGet = server::HangingGet<IntlInfo, SubscriberObject, HangingGetFn>;
22pub(super) type Publisher = server::Publisher<IntlInfo, SubscriberObject, HangingGetFn>;
23pub(super) type Subscriber = server::Subscriber<IntlInfo, SubscriberObject, HangingGetFn>;
24
25pub struct IntlFidlHandler {
26    hanging_get: HangingGet,
27    controller_tx: UnboundedSender<Request>,
28    usage_publisher: UsagePublisher<IntlInfo>,
29}
30
31impl IntlFidlHandler {
32    pub(crate) fn new(
33        intl_controller: &mut IntlController,
34        usage_publisher: UsagePublisher<IntlInfo>,
35        initial_value: IntlInfo,
36    ) -> (Self, UnboundedReceiver<Request>) {
37        let hanging_get = HangingGet::new(initial_value, Self::hanging_get);
38        intl_controller.register_publisher(hanging_get.new_publisher());
39        let (controller_tx, controller_rx) = mpsc::unbounded();
40        (Self { hanging_get, controller_tx, usage_publisher }, controller_rx)
41    }
42
43    fn hanging_get(info: &IntlInfo, (usage_responder, responder): SubscriberObject) -> bool {
44        usage_responder.respond(format!("{info:?}"), ResponseType::OkSome);
45        if let Err(e) = responder.send(&IntlSettings::from(info.clone())) {
46            log::warn!("Failed to respond to watch request: {e:?}");
47            return false;
48        }
49        true
50    }
51
52    pub fn handle_stream(&mut self, mut stream: IntlRequestStream) {
53        let request_handler = RequestHandler {
54            subscriber: self.hanging_get.new_subscriber(),
55            controller_tx: self.controller_tx.clone(),
56            usage_publisher: self.usage_publisher.clone(),
57        };
58        fasync::Task::local(async move {
59            while let Some(Ok(request)) = stream.next().await {
60                request_handler.handle_request(request).await;
61            }
62        })
63        .detach();
64    }
65}
66
67#[derive(Debug)]
68enum HandlerError {
69    AlreadySubscribed,
70    ControllerStopped,
71    Controller(IntlError),
72}
73
74impl From<&HandlerError> for ResponseType {
75    fn from(error: &HandlerError) -> Self {
76        match error {
77            HandlerError::AlreadySubscribed => ResponseType::AlreadySubscribed,
78            HandlerError::ControllerStopped => ResponseType::UnexpectedError,
79            HandlerError::Controller(e) => ResponseType::from(e),
80        }
81    }
82}
83
84struct RequestHandler {
85    subscriber: Subscriber,
86    controller_tx: UnboundedSender<Request>,
87    usage_publisher: UsagePublisher<IntlInfo>,
88}
89
90impl RequestHandler {
91    async fn handle_request(&self, request: IntlRequest) {
92        match request {
93            IntlRequest::Watch { responder } => {
94                let usage_res = self.usage_publisher.request("Watch".to_string(), RequestType::Get);
95                if let Err((usage_res, responder)) =
96                    self.subscriber.register2((usage_res, responder))
97                {
98                    let e = HandlerError::AlreadySubscribed;
99                    usage_res.respond(format!("Err({e:?})"), ResponseType::from(&e));
100                    drop(responder);
101                }
102            }
103            IntlRequest::Set { settings, responder } => {
104                let usage_res = self
105                    .usage_publisher
106                    .request(format!("Set{{settings:{settings:?}}}"), RequestType::Set);
107                if let Err(e) = self.set(settings).await {
108                    usage_res.respond(format!("Err({e:?}"), ResponseType::from(&e));
109                    let _ = responder.send(Err(SettingsError::Failed));
110                } else {
111                    usage_res.respond("Ok(())".to_string(), ResponseType::OkNone);
112                    let _ = responder.send(Ok(()));
113                }
114            }
115        }
116    }
117
118    async fn set(&self, settings: IntlSettings) -> Result<(), HandlerError> {
119        let (set_tx, set_rx) = oneshot::channel();
120        self.controller_tx
121            .unbounded_send(Request::Set(IntlInfo::from(settings), set_tx))
122            .map_err(|_| HandlerError::ControllerStopped)?;
123        set_rx
124            .await
125            .map_err(|_| HandlerError::ControllerStopped)
126            .and_then(|res| res.map_err(HandlerError::Controller))
127    }
128}