settings/ingress/
request.rs1use crate::base::SettingType;
24use crate::handler::base::{Payload, Request, Response};
25use crate::job::work::{Independent, Load};
26use crate::job::Job;
27use crate::message::base::Audience;
28use crate::service::{message, Address};
29use crate::trace;
30use async_trait::async_trait;
31use fuchsia_trace as ftrace;
32use std::marker::PhantomData;
33
34pub(crate) trait Responder<R: From<Response>> {
36 fn respond(self, response: R);
38}
39
40pub(crate) struct Work<R, T>
43where
44 R: From<Response>,
45 T: Responder<R>,
46{
47 request: Request,
48 setting_type: SettingType,
49 responder: T,
50 _data: PhantomData<R>,
51}
52
53impl<R: From<Response>, T: Responder<R>> Work<R, T> {
54 pub(crate) fn new(setting_type: SettingType, request: Request, responder: T) -> Self {
55 Self { setting_type, request, responder, _data: PhantomData }
56 }
57}
58
59#[async_trait(?Send)]
63impl<R, T> Independent for Work<R, T>
64where
65 R: From<Response>,
66 T: Responder<R>,
67{
68 async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id) {
69 trace!(id, c"Independent Work execute");
70 let mut response_listener = messenger.message(
72 Payload::Request(self.request.clone()).into(),
73 Audience::Address(Address::Handler(self.setting_type)),
74 );
75
76 self.responder.respond(R::from(match response_listener.next_of::<Payload>().await {
78 Ok((payload, _)) => match payload {
79 Payload::Response(response) => response,
80 _ => {
81 panic!("should not have received a different payload type:{payload:?}");
86 }
87 },
88 _ => {
89 log::warn!(
90 "An error occurred while independent job was executing for request:{:?}",
91 self.request.clone()
92 );
93 Err(crate::handler::base::Error::CommunicationError)
94 }
95 }));
96 }
97}
98
99impl<R, T> From<Work<R, T>> for Job
103where
104 R: From<Response> + 'static,
105 T: Responder<R> + 'static,
106{
107 fn from(work: Work<R, T>) -> Job {
108 Job::new(Load::Independent(Box::new(work)))
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use crate::message::base::MessengerType;
116 use crate::service::MessageHub;
117 use assert_matches::assert_matches;
118 use fuchsia_async as fasync;
119 use futures::channel::oneshot::Sender;
120
121 struct TestResponder {
122 sender: Sender<Response>,
123 }
124
125 impl TestResponder {
126 pub(crate) fn new(sender: Sender<Response>) -> Self {
127 Self { sender }
128 }
129 }
130
131 impl Responder<Response> for TestResponder {
132 fn respond(self, response: Response) {
133 self.sender.send(response).expect("send of response should succeed");
134 }
135 }
136
137 #[fuchsia::test(allow_stalls = false)]
138 async fn test_request_basic_functionality() {
139 let message_hub_delegate = MessageHub::create_hub();
141
142 let mut handler_receiver = message_hub_delegate
144 .create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
145 .await
146 .expect("handler messenger should be created")
147 .1;
148
149 let request = Request::Restore;
151 let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
152 let work = Box::new(Work::new(
153 SettingType::Unknown,
154 request.clone(),
155 TestResponder::new(response_tx),
156 ));
157
158 let work_messenger = message_hub_delegate
159 .create(MessengerType::Unbound)
160 .await
161 .expect("messenger should be created")
162 .0;
163
164 let work_messenger_signature = work_messenger.get_signature();
167
168 fasync::Task::local(work.execute(work_messenger, 0.into())).detach();
170
171 let (received_request, client) =
173 handler_receiver.next_of::<Payload>().await.expect("should successfully get request");
174 assert_matches!(received_request, Payload::Request(x) if x == request);
175 assert!(client.get_author() == work_messenger_signature);
176
177 let reply = Ok(None);
179 let _ = client.reply(Payload::Response(reply.clone()).into());
180 assert!(response_rx.await.expect("should receive successful response") == reply);
181 }
182
183 #[fuchsia::test(allow_stalls = false)]
184 async fn test_error_propagation() {
185 let message_hub_delegate = MessageHub::create_hub();
188
189 let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
190
191 let request = Request::Restore;
193 let work = Box::new(Work::new(
194 SettingType::Unknown,
195 request.clone(),
196 TestResponder::new(response_tx),
197 ));
198
199 fasync::Task::local(
201 work.execute(
202 message_hub_delegate
203 .create(MessengerType::Unbound)
204 .await
205 .expect("messenger should be created")
206 .0,
207 0.into(),
208 ),
209 )
210 .detach();
211
212 assert_matches!(response_rx.await.expect("should receive successful response"),
214 Err(x) if x == crate::handler::base::Error::CommunicationError);
215 }
216}