settings/ingress/
request.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Single request handling.
6//!
7//! The request mod defines the components necessary for executing a single, isolated command within
8//! the [Job] ecosystem. The most common work type in the setting service that fall in this category
9//! are set commands where the requested action can be succinctly captured in a single request in
10//! the service.
11//!
12//! One must define two concrete trait implementations in order to use the components of this mod.
13//! The first trait is the [From] trait for [Response]. While [Response] could be broken
14//! down into its contained [SettingInfo](crate::base::SettingInfo) and
15//! [Error](crate::handler::base::Error) types, callers often only care about the success of a call.
16//! For example, set calls typically return an empty value upon success and therefore do not
17//! have a value to convert. The second trait is [Responder], which takes the first trait
18//! implementation as a parameter. Ths trait allows callers to customize how the response is handled
19//! with their own type as defined in the [From<Response>] trait. One should note that the
20//! responder itself is passed in on the callback. This allows for the consumption of any resources
21//! in the one-time use callback.
22
23use crate::base::SettingType;
24use crate::handler::base::{Payload, Request, Response};
25use crate::job::work::{Independent, Load};
26use crate::job::Job;
27use crate::message::base::Audience;
28use crate::service::{message, Address};
29use crate::trace;
30use async_trait::async_trait;
31use fuchsia_trace as ftrace;
32use std::marker::PhantomData;
33
34/// A [Responder] is passed into [Work] as a handler for responses generated by the work.
35pub(crate) trait Responder<R: From<Response>> {
36    /// Invoked when a response to the request is ready.
37    fn respond(self, response: R);
38}
39
40/// [Work] executes a single request and passes the results back to a specified responder. Consumers
41/// of [Work] specify a [SettingType] along with [Request] for proper routing.
42pub(crate) struct Work<R, T>
43where
44    R: From<Response>,
45    T: Responder<R>,
46{
47    request: Request,
48    setting_type: SettingType,
49    responder: T,
50    _data: PhantomData<R>,
51}
52
53impl<R: From<Response>, T: Responder<R>> Work<R, T> {
54    pub(crate) fn new(setting_type: SettingType, request: Request, responder: T) -> Self {
55        Self { setting_type, request, responder, _data: PhantomData }
56    }
57}
58
59/// [Work] implements the [Independent] trait as each request execution should be done in isolation
60/// and executes in the order it was received, not waiting on any existing [Job] of the same group
61/// to be executed (as is the case for [crate::job::work::Sequential]).
62#[async_trait(?Send)]
63impl<R, T> Independent for Work<R, T>
64where
65    R: From<Response>,
66    T: Responder<R>,
67{
68    async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id) {
69        trace!(id, c"Independent Work execute");
70        // Send request through MessageHub.
71        let mut response_listener = messenger.message(
72            Payload::Request(self.request.clone()).into(),
73            Audience::Address(Address::Handler(self.setting_type)),
74        );
75
76        // On success, invoke the responder with the converted response.
77        self.responder.respond(R::from(match response_listener.next_of::<Payload>().await {
78            Ok((payload, _)) => match payload {
79                Payload::Response(response) => response,
80                _ => {
81                    // While it's possible for the request to fail, this will be communicated
82                    // through the response for logic related errors or the return value of
83                    // receptor::next_of. Work should never encounter a different type of payload
84                    // and therefore treat this scenario as fatal.
85                    panic!("should not have received a different payload type:{payload:?}");
86                }
87            },
88            _ => {
89                log::warn!(
90                    "An error occurred while independent job was executing for request:{:?}",
91                    self.request.clone()
92                );
93                Err(crate::handler::base::Error::CommunicationError)
94            }
95        }));
96    }
97}
98
99/// The [From] implementation here is for conveniently converting a [Work] definition into a [Job].
100/// Since [Work] is a singleshot request, it is automatically converted into a [Load::Independent]
101/// workload.
102impl<R, T> From<Work<R, T>> for Job
103where
104    R: From<Response> + 'static,
105    T: Responder<R> + 'static,
106{
107    fn from(work: Work<R, T>) -> Job {
108        Job::new(Load::Independent(Box::new(work)))
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use crate::message::base::MessengerType;
116    use crate::service::MessageHub;
117    use assert_matches::assert_matches;
118    use fuchsia_async as fasync;
119    use futures::channel::oneshot::Sender;
120
121    struct TestResponder {
122        sender: Sender<Response>,
123    }
124
125    impl TestResponder {
126        pub(crate) fn new(sender: Sender<Response>) -> Self {
127            Self { sender }
128        }
129    }
130
131    impl Responder<Response> for TestResponder {
132        fn respond(self, response: Response) {
133            self.sender.send(response).expect("send of response should succeed");
134        }
135    }
136
137    #[fuchsia::test(allow_stalls = false)]
138    async fn test_request_basic_functionality() {
139        // Create MessageHub for communication between components.
140        let message_hub_delegate = MessageHub::create_hub();
141
142        // Create mock handler endpoint to receive request.
143        let mut handler_receiver = message_hub_delegate
144            .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
145            .await
146            .expect("handler messenger should be created")
147            .1;
148
149        // Create job to send request.
150        let request = Request::Restore;
151        let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
152        let work = Box::new(Work::new(
153            SettingType::Unknown,
154            request.clone(),
155            TestResponder::new(response_tx),
156        ));
157
158        let work_messenger = message_hub_delegate
159            .create(MessengerType::Unbound)
160            .await
161            .expect("messenger should be created")
162            .0;
163
164        // Retrieve signature before passing in messenger to work for verifying the sender of any
165        // requests.
166        let work_messenger_signature = work_messenger.get_signature();
167
168        // Execute work asynchronously.
169        fasync::Task::local(work.execute(work_messenger, 0.into())).detach();
170
171        // Ensure the request is sent from the right sender.
172        let (received_request, client) =
173            handler_receiver.next_of::<Payload>().await.expect("should successfully get request");
174        assert_matches!(received_request, Payload::Request(x) if x == request);
175        assert!(client.get_author() == work_messenger_signature);
176
177        // Ensure the response is received and forwarded by the work.
178        let reply = Ok(None);
179        let _ = client.reply(Payload::Response(reply.clone()).into());
180        assert!(response_rx.await.expect("should receive successful response") == reply);
181    }
182
183    #[fuchsia::test(allow_stalls = false)]
184    async fn test_error_propagation() {
185        // Create MessageHub for communication between components. Do not create any handler for the
186        // test SettingType address.
187        let message_hub_delegate = MessageHub::create_hub();
188
189        let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
190
191        // Create job to send request.
192        let request = Request::Restore;
193        let work = Box::new(Work::new(
194            SettingType::Unknown,
195            request.clone(),
196            TestResponder::new(response_tx),
197        ));
198
199        // Execute work on async task.
200        fasync::Task::local(
201            work.execute(
202                message_hub_delegate
203                    .create(MessengerType::Unbound)
204                    .await
205                    .expect("messenger should be created")
206                    .0,
207                0.into(),
208            ),
209        )
210        .detach();
211
212        // Ensure an error was returned, which should match that generated by the request work load.
213        assert_matches!(response_rx.await.expect("should receive successful response"),
214                Err(x) if x == crate::handler::base::Error::CommunicationError);
215    }
216}