1use crate::base::{SettingInfo, SettingType};
8use crate::handler::base::{
9 Error as HandlerError, Payload as HandlerPayload, Payload, Request as HandlerRequest, Request,
10 SettingHandlerFactory,
11};
12use crate::handler::setting_handler;
13use crate::handler::setting_handler::{
14 Command, ControllerError, Event, ExitResult, SettingHandlerResult, State,
15};
16use crate::inspect::listener_logger::ListenerInspectLogger;
17use crate::message::action_fuse::ActionFuse;
18use crate::message::base::{Audience, MessageEvent, MessengerType, Status};
19use crate::{clock, event, service, trace, trace_guard};
20use anyhow::Error;
21use futures::channel::mpsc::UnboundedSender;
22use futures::lock::Mutex;
23use futures::{FutureExt, StreamExt};
24use std::collections::VecDeque;
25use std::rc::Rc;
26use zx::MonotonicDuration;
27use {fuchsia_async as fasync, fuchsia_trace as ftrace};
28
29pub(crate) const MAX_NODE_ERRORS: usize = 10;
32
33#[derive(Clone, Debug)]
35struct RequestInfo {
36 setting_request: Request,
37 client: service::message::MessageClient,
38 id: usize,
42}
43
44impl PartialEq for RequestInfo {
45 fn eq(&self, other: &Self) -> bool {
46 self.id == other.id
47 }
48}
49
50impl RequestInfo {
51 pub(crate) fn reply(&self, result: SettingHandlerResult) {
53 let _ =
55 self.client.reply(HandlerPayload::Response(result.map_err(HandlerError::from)).into());
56 }
57
58 async fn acknowledge(&self) {
62 self.client.acknowledge().await;
63 }
64
65 async fn bind_to_scope(&mut self, trigger_fn: Box<dyn FnOnce()>) {
69 self.client.bind_to_recipient(ActionFuse::create(trigger_fn)).await;
70 }
71}
72
73#[derive(Clone, Debug)]
74struct ActiveRequest {
75 request: Rc<RequestInfo>,
76 attempts: u64,
78 last_result: Option<SettingHandlerResult>,
79}
80
81impl ActiveRequest {
82 pub(crate) fn get_request(&self) -> Request {
83 self.request.setting_request.clone()
84 }
85
86 pub(crate) fn get_info(&mut self) -> &mut Rc<RequestInfo> {
87 &mut self.request
88 }
89}
90
91#[derive(Clone, Debug)]
92enum ProxyRequest {
93 Add(Box<RequestInfo>),
95 Execute(bool),
98 HandleResult(SettingHandlerResult),
100 RemoveActive,
102 EndListen(usize),
104 TeardownTimeout,
107 Teardown,
109 Retry,
111 Listen(ListenEvent),
113}
114
115#[derive(Clone, Debug, PartialEq)]
116enum ListenEvent {
117 Restart,
118}
119
120pub(crate) struct SettingProxy {
121 setting_type: SettingType,
122
123 client_signature: Option<service::message::Signature>,
124 active_request: Option<ActiveRequest>,
125 pending_requests: VecDeque<Box<RequestInfo>>,
126 listen_requests: Vec<Rc<RequestInfo>>,
127 next_request_id: usize,
128
129 handler_factory: Rc<Mutex<dyn SettingHandlerFactory>>,
131 delegate: service::message::Delegate,
133 messenger: service::message::Messenger,
135 signature: service::message::Signature,
137 event_publisher: event::Publisher,
139
140 proxy_request_sender: UnboundedSender<ProxyRequest>,
142 max_attempts: u64,
143 teardown_timeout: MonotonicDuration,
144 request_timeout: Option<MonotonicDuration>,
145 retry_on_timeout: bool,
146 teardown_cancellation: Option<futures::channel::oneshot::Sender<()>>,
147 _node: fuchsia_inspect::Node,
148 error_node: fuchsia_inspect::Node,
149 node_errors: VecDeque<NodeError>,
150 error_count: usize,
151
152 listener_logger: Rc<Mutex<ListenerInspectLogger>>,
154}
155
156struct NodeError {
157 _node: fuchsia_inspect::Node,
158 _timestamp: fuchsia_inspect::StringProperty,
159 _value: fuchsia_inspect::StringProperty,
160}
161
162macro_rules! publish {
164 ($self:ident, $event:expr) => {
165 $self.event_publisher.send_event(event::Event::Handler($self.setting_type, $event));
166 };
167}
168
169impl SettingProxy {
170 #[allow(clippy::too_many_arguments)]
173 pub(crate) async fn create(
174 setting_type: SettingType,
175 handler_factory: Rc<Mutex<dyn SettingHandlerFactory>>,
176 delegate: service::message::Delegate,
177 max_attempts: u64,
178 teardown_timeout: MonotonicDuration,
179 request_timeout: Option<MonotonicDuration>,
180 retry_on_timeout: bool,
181 node: fuchsia_inspect::Node,
182 listener_logger: Rc<Mutex<ListenerInspectLogger>>,
183 ) -> Result<service::message::Signature, Error> {
184 let (messenger, receptor) = delegate
185 .create(MessengerType::Addressable(service::Address::Handler(setting_type)))
186 .await
187 .map_err(Error::new)?;
188 let service_signature = receptor.get_signature();
189
190 let event_publisher = event::Publisher::create(
191 &delegate,
192 MessengerType::Addressable(service::Address::EventSource(
193 event::Address::SettingProxy(setting_type),
194 )),
195 )
196 .await;
197
198 let (proxy_request_sender, proxy_request_receiver) =
199 futures::channel::mpsc::unbounded::<ProxyRequest>();
200
201 let error_node = node.create_child("errors");
202
203 let mut proxy = Self {
206 setting_type,
207 handler_factory,
208 next_request_id: 0,
209 client_signature: None,
210 active_request: None,
211 pending_requests: VecDeque::new(),
212 listen_requests: Vec::new(),
213 delegate,
214 messenger,
215 signature: service_signature,
216 event_publisher,
217 proxy_request_sender,
218 max_attempts,
219 teardown_timeout,
220 request_timeout,
221 retry_on_timeout,
222 teardown_cancellation: None,
223 _node: node,
224 error_node,
225 node_errors: VecDeque::new(),
226 error_count: 0,
227 listener_logger,
228 };
229
230 fasync::Task::local(async move {
232 let id = ftrace::Id::new();
233 trace!(
234 id,
235 c"setting_proxy",
236 "setting_type" => format!("{setting_type:?}").as_str()
237 );
238 let receptor_fuse = receptor.fuse();
239 let proxy_fuse = proxy_request_receiver.fuse();
240
241 futures::pin_mut!(receptor_fuse, proxy_fuse);
242
243 loop {
244 futures::select! {
245 event = receptor_fuse.select_next_some() => {
248 trace!(
249 id,
250 c"service event"
251 );
252 proxy.process_service_event(id, event).await;
253 }
254
255 request = proxy_fuse.select_next_some() => {
258 trace!(
259 id,
260 c"proxy request"
261 );
262 proxy.process_proxy_request(id, request).await;
263 }
264 }
265 }
266 })
267 .detach();
268 Ok(service_signature)
269 }
270
271 async fn process_service_event(
272 &mut self,
273 id: ftrace::Id,
274 event: service::message::MessageEvent,
275 ) {
276 if let MessageEvent::Message(payload, client) = event {
277 match payload {
278 service::Payload::Setting(Payload::Request(request)) => {
279 trace!(id, c"process_service_request");
280 self.process_service_request(request, client).await;
281 }
282 service::Payload::Controller(setting_handler::Payload::Event(event)) => {
283 let guard = trace_guard!(id, c"get author");
286 if Some(client.get_author()) != self.client_signature {
287 return;
288 }
289 drop(guard);
290
291 match event {
292 Event::Changed(setting_info) => {
293 trace!(id, c"change notification");
294 self.notify(setting_info);
295 }
296 Event::Exited(result) => {
297 trace!(id, c"process exit");
298 self.process_exit(result);
299 }
300 Event::StateChanged(_) => {}
301 }
302 }
303 _ => {
304 panic!("Unexpected message");
305 }
306 }
307 }
308 }
309
310 async fn process_proxy_request(&mut self, id: ftrace::Id, request: ProxyRequest) {
311 match request {
312 ProxyRequest::Add(request) => {
313 trace!(id, c"add request");
314 self.add_request(request);
315 }
316 ProxyRequest::Execute(recreate_handler) => {
317 trace!(id, c"execute");
318 self.execute_next_request(id, recreate_handler).await;
319 }
320 ProxyRequest::RemoveActive => {
321 trace!(id, c"remove active");
322 self.remove_active_request();
323 }
324 ProxyRequest::TeardownTimeout => {
325 trace!(id, c"teardown timeout");
326 self.start_teardown_timeout().await;
327 }
328 ProxyRequest::Teardown => {
329 trace!(id, c"teardown");
330 self.teardown_if_needed().await;
331 }
332 ProxyRequest::Retry => {
333 trace!(id, c"retry");
334 self.retry();
335 }
336 ProxyRequest::HandleResult(result) => {
337 trace!(id, c"handle result");
338 self.handle_result(result);
339 }
340 ProxyRequest::Listen(event) => {
341 trace!(id, c"handle listen");
342 self.handle_listen(event).await;
343 }
344 ProxyRequest::EndListen(request_id) => {
345 trace!(id, c"handle end listen");
346 self.listener_logger.lock().await.remove_listener(self.setting_type);
347 self.handle_end_listen(request_id).await;
348 }
349 }
350 }
351
352 async fn process_service_request(
353 &mut self,
354 request: HandlerRequest,
355 client: service::message::MessageClient,
356 ) {
357 let id = self.next_request_id;
358 self.next_request_id += 1;
359 self.process_request(Box::new(RequestInfo { setting_request: request, id, client })).await;
360 }
361
362 async fn get_handler_signature(
363 &mut self,
364 force_create: bool,
365 ) -> Option<service::message::Signature> {
366 if force_create || self.client_signature.is_none() {
367 self.client_signature = match self
368 .handler_factory
369 .lock()
370 .await
371 .generate(self.setting_type, self.delegate.clone(), self.signature)
372 .await
373 {
374 Ok(signature) => Some(signature),
375 Err(e) => {
376 let node = self.error_node.create_child(format!("{:020}", self.error_count));
377 let timestamp = node.create_string("timestamp", clock::inspect_format_now());
378 let value = node.create_string("value", format!("{e:?}"));
379 self.node_errors.push_back(NodeError {
380 _node: node,
381 _timestamp: timestamp,
382 _value: value,
383 });
384 self.error_count += 1;
385 if self.node_errors.len() > MAX_NODE_ERRORS {
386 let _ = self.node_errors.pop_front();
387 }
388 None
389 }
390 };
391 }
392
393 self.client_signature
394 }
395
396 fn is_listening(&self) -> bool {
399 !self.listen_requests.is_empty()
400 }
401
402 fn notify(&self, setting_info: SettingInfo) {
404 if !self.is_listening() {
405 return;
406 }
407
408 for request in &self.listen_requests {
410 request.reply(Ok(Some(setting_info.clone())));
411 }
412 }
413
414 fn process_exit(&mut self, result: ExitResult) {
415 self.event_publisher.send_event(event::Event::Handler(
417 self.setting_type,
418 event::handler::Event::Exit(result),
419 ));
420
421 self.client_signature = None;
423
424 if self.active_request.is_some() {
426 self.proxy_request_sender
427 .unbounded_send(ProxyRequest::HandleResult(Err(ControllerError::ExitError)))
428 .expect(
429 "SettingProxy::process_exit, proxy_request_sender failed to send ExitError\
430 proxy request",
431 );
432 }
433
434 if self.is_listening() {
436 self.request(ProxyRequest::Listen(ListenEvent::Restart));
437 }
438 }
439
440 async fn process_request(&mut self, request: Box<RequestInfo>) {
444 match self.get_handler_signature(false).await {
445 None => {
446 request.reply(Err(ControllerError::UnhandledType(self.setting_type)));
447 }
448 Some(_) => {
449 self.request(ProxyRequest::Add(request));
450 }
451 }
452 }
453
454 fn add_request(&mut self, request: Box<RequestInfo>) {
460 if let Some(teardown_cancellation) = self.teardown_cancellation.take() {
461 let _ = teardown_cancellation.send(());
462 }
463 self.pending_requests.push_back(request);
464
465 if self.pending_requests.len() == 1 && self.active_request.is_none() {
470 self.request(ProxyRequest::Execute(false));
471 }
472 }
473
474 fn request(&self, request: ProxyRequest) {
478 self.proxy_request_sender
479 .unbounded_send(request)
480 .expect("SettingProxy::request, proxy_request_sender cannot send requests anymore");
481 }
482
483 async fn send_listen_update(&mut self, force_recreate_controller: bool) {
491 let optional_handler_signature =
492 self.get_handler_signature(force_recreate_controller).await;
493 if optional_handler_signature.is_none() {
494 return;
495 }
496
497 let handler_signature =
498 optional_handler_signature.expect("handler signature should be present");
499
500 let _ = self.messenger.message(
501 setting_handler::Payload::Command(Command::ChangeState(if self.is_listening() {
502 State::Listen
503 } else {
504 State::EndListen
505 }))
506 .into(),
507 Audience::Messenger(handler_signature),
508 );
509
510 self.request(ProxyRequest::TeardownTimeout);
511 }
512
513 async fn handle_listen(&mut self, event: ListenEvent) {
517 self.send_listen_update(ListenEvent::Restart == event).await;
518 }
519
520 async fn handle_end_listen(&mut self, request_id: usize) {
524 let was_listening = self.is_listening();
525
526 if let Some(pos) = self.listen_requests.iter().position(|target| target.id == request_id) {
527 let _ = self.listen_requests.remove(pos);
528 } else {
529 return;
530 }
531
532 if was_listening != self.is_listening() {
533 self.send_listen_update(false).await;
534 }
535 }
536
537 fn handle_result(&mut self, result: SettingHandlerResult) {
543 let active_request = self.active_request.as_mut().expect("request should be present");
544 let mut retry = false;
545
546 if matches!(result, Err(ControllerError::ExternalFailure(..)))
547 || matches!(result, Err(ControllerError::ExitError))
548 {
549 retry = true;
550 } else if matches!(result, Err(ControllerError::TimeoutError)) {
551 publish!(
552 self,
553 event::handler::Event::Request(
554 event::handler::Action::Timeout,
555 active_request.get_request()
556 )
557 );
558 retry = self.retry_on_timeout;
559 }
560
561 active_request.last_result = Some(result);
562
563 if retry {
564 self.request(ProxyRequest::Retry);
565 } else {
566 self.request(ProxyRequest::RemoveActive);
567 }
568 }
569
570 fn remove_active_request(&mut self) {
576 let mut removed_request = self.active_request.take().expect("request should be present");
577
578 if let Some(result) = removed_request.last_result.take() {
580 removed_request.request.reply(result)
581 }
582
583 if !self.pending_requests.is_empty() {
586 self.request(ProxyRequest::Execute(false));
587 } else {
588 self.request(ProxyRequest::TeardownTimeout);
589 }
590 }
591
592 async fn execute_next_request(&mut self, id: ftrace::Id, recreate_handler: bool) {
598 if self.active_request.is_none() {
599 let mut pending = self
600 .pending_requests
601 .pop_front()
602 .expect("execute should only be called with present requests");
603
604 if matches!(pending.setting_request, Request::Listen) {
605 let proxy_request_sender = self.proxy_request_sender.clone();
609 let request_id = pending.id;
610 pending.bind_to_scope(Box::new(move || {
611 proxy_request_sender.unbounded_send(ProxyRequest::EndListen(request_id)).expect(
612 "SettingProxy::execute_next_request, proxy_request_sender failed to send \
613 EndListen proxy request with info",
614 );
615 }))
616 .await;
617 }
618
619 self.active_request =
621 Some(ActiveRequest { request: Rc::from(pending), attempts: 0, last_result: None });
622 }
623
624 let signature = self
626 .get_handler_signature(recreate_handler)
627 .await
628 .expect("failed to generate handler signature");
629
630 let was_listening = self.is_listening();
633
634 let active_request =
635 self.active_request.as_mut().expect("active request should be present");
636
637 active_request.attempts += 1;
638
639 let request = active_request.get_request();
642
643 if matches!(request, Request::Listen) {
644 let info = active_request.get_info();
645
646 self.listener_logger.lock().await.add_listener(self.setting_type);
648
649 self.listen_requests.push(info.clone());
651
652 info.acknowledge().await;
654
655 if was_listening != self.is_listening() {
657 self.send_listen_update(false).await;
658 }
659
660 self.request(ProxyRequest::RemoveActive);
663 return;
664 }
665
666 if active_request.attempts > self.max_attempts {
669 publish!(
670 self,
671 event::handler::Event::Request(
672 event::handler::Action::AttemptsExceeded,
673 request.clone()
674 )
675 );
676
677 self.request(ProxyRequest::RemoveActive);
678 return;
679 }
680
681 publish!(
682 self,
683 event::handler::Event::Request(event::handler::Action::Execute, request.clone())
684 );
685
686 let mut receptor = self.messenger.message_with_timeout(
687 setting_handler::Payload::Command(Command::HandleRequest(request.clone())).into(),
688 Audience::Messenger(signature),
689 self.request_timeout,
690 );
691
692 let proxy_request_sender_clone = self.proxy_request_sender.clone();
693
694 fasync::Task::local(async move {
695 trace!(id, c"response");
696 while let Some(message_event) = receptor.next().await {
697 let handler_result = match message_event {
698 MessageEvent::Message(
699 service::Payload::Controller(setting_handler::Payload::Result(result)),
700 _,
701 ) => Some(result),
702 MessageEvent::Status(Status::Undeliverable) => {
703 Some(Err(ControllerError::IrrecoverableError))
704 }
705 MessageEvent::Status(Status::Timeout) => {
706 Some(Err(ControllerError::TimeoutError))
707 }
708 _ => None,
709 };
710
711 if let Some(result) = handler_result {
712 proxy_request_sender_clone
717 .unbounded_send(ProxyRequest::HandleResult(result))
718 .expect(
719 "SettingProxy::execute_next_request, proxy_request_sender failed to \
720 send proxy request",
721 );
722 return;
723 }
724 }
725 })
726 .detach();
727 }
728
729 fn retry(&mut self) {
732 publish!(
733 self,
734 event::handler::Event::Request(
735 event::handler::Action::Retry,
736 self.active_request
737 .as_ref()
738 .expect("active request should be present")
739 .get_request(),
740 )
741 );
742
743 self.request(ProxyRequest::Execute(true));
744 }
745
746 fn has_active_work(&self) -> bool {
747 self.active_request.is_some()
748 || !self.pending_requests.is_empty()
749 || self.is_listening()
750 || self.client_signature.is_none()
751 }
752
753 async fn start_teardown_timeout(&mut self) {
754 if self.has_active_work() {
755 return;
756 }
757
758 let (cancellation_tx, cancellation_rx) = futures::channel::oneshot::channel();
759 if self.teardown_cancellation.is_some() {
760 return;
763 }
764
765 self.teardown_cancellation = Some(cancellation_tx);
766 let sender = self.proxy_request_sender.clone();
767 let teardown_timeout = self.teardown_timeout;
768 fasync::Task::local(async move {
769 let timeout = fuchsia_async::Timer::new(crate::clock::now() + teardown_timeout).fuse();
770 futures::pin_mut!(cancellation_rx, timeout);
771 futures::select! {
772 _ = cancellation_rx => {
773 return;
775 }
776 _ = timeout => {}, }
778
779 sender.unbounded_send(ProxyRequest::Teardown).expect(
782 "SettingProxy::start_teardown_timeout, proxy_request_sender failed to send Teardown\
783 proxy request",
784 );
785 })
786 .detach();
787 }
788
789 async fn teardown_if_needed(&mut self) {
792 if self.has_active_work() {
793 return;
794 }
795
796 let signature = self.client_signature.take().expect("signature should be set");
797
798 let mut controller_receptor = self.messenger.message(
799 setting_handler::Payload::Command(Command::ChangeState(State::Teardown)).into(),
800 Audience::Messenger(signature),
801 );
802
803 if controller_receptor.next().await != Some(MessageEvent::Status(Status::Received)) {
805 log::error!("Failed to tear down {:?} controller", self.setting_type);
806 }
807
808 self.delegate.delete(signature);
811
812 publish!(self, event::handler::Event::Teardown);
813 }
814}