settings/handler/
setting_proxy.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// TODO(https://fxbug.dev/42147053): Add module documentation describing Setting Proxy's role in
6// setting handling.
7use crate::base::{SettingInfo, SettingType};
8use crate::handler::base::{
9    Error as HandlerError, Payload as HandlerPayload, Payload, Request as HandlerRequest, Request,
10    SettingHandlerFactory,
11};
12use crate::handler::setting_handler;
13use crate::handler::setting_handler::{
14    Command, ControllerError, Event, ExitResult, SettingHandlerResult, State,
15};
16use crate::inspect::listener_logger::ListenerInspectLogger;
17use crate::message::action_fuse::ActionFuse;
18use crate::message::base::{Audience, MessageEvent, MessengerType, Status};
19use crate::{clock, event, service, trace, trace_guard};
20use anyhow::Error;
21use futures::channel::mpsc::UnboundedSender;
22use futures::lock::Mutex;
23use futures::{FutureExt, StreamExt};
24use std::collections::VecDeque;
25use std::rc::Rc;
26use zx::MonotonicDuration;
27use {fuchsia_async as fasync, fuchsia_trace as ftrace};
28
29/// Maximum number of errors tracked per setting proxy before errors are rolled over.
30// The value was chosen arbitrarily. Feel free to increase if it's too small.
31pub(crate) const MAX_NODE_ERRORS: usize = 10;
32
33/// A container for associating a Handler Request with a given [`Client`].
34#[derive(Clone, Debug)]
35struct RequestInfo {
36    setting_request: Request,
37    client: service::message::MessageClient,
38    // This identifier is unique within each setting proxy to identify a
39    // request. This can be used for removing a particular RequestInfo within a
40    // set, such as the active change listeners.
41    id: usize,
42}
43
44impl PartialEq for RequestInfo {
45    fn eq(&self, other: &Self) -> bool {
46        self.id == other.id
47    }
48}
49
50impl RequestInfo {
51    /// Sends the supplied result as a reply with the associated [`Client`].
52    pub(crate) fn reply(&self, result: SettingHandlerResult) {
53        // Ignore the receptor result.
54        let _ =
55            self.client.reply(HandlerPayload::Response(result.map_err(HandlerError::from)).into());
56    }
57
58    /// Sends an acknowledge message back through the reply client. This used in
59    /// long running requests (such a listen) where acknowledge message ensures
60    /// the client the request was processed.
61    async fn acknowledge(&self) {
62        self.client.acknowledge().await;
63    }
64
65    /// Adds a closure that will be triggered when the recipient for a response
66    /// to the request goes out of scope. This allows for the message handler to
67    /// know when the recipient is no longer valid.
68    async fn bind_to_scope(&mut self, trigger_fn: Box<dyn FnOnce()>) {
69        self.client.bind_to_recipient(ActionFuse::create(trigger_fn)).await;
70    }
71}
72
73#[derive(Clone, Debug)]
74struct ActiveRequest {
75    request: Rc<RequestInfo>,
76    // The number of attempts that have been made on this request.
77    attempts: u64,
78    last_result: Option<SettingHandlerResult>,
79}
80
81impl ActiveRequest {
82    pub(crate) fn get_request(&self) -> Request {
83        self.request.setting_request.clone()
84    }
85
86    pub(crate) fn get_info(&mut self) -> &mut Rc<RequestInfo> {
87        &mut self.request
88    }
89}
90
91#[derive(Clone, Debug)]
92enum ProxyRequest {
93    /// Adds a request to the pending request queue.
94    Add(Box<RequestInfo>),
95    /// Executes the next pending request, recreating the handler if the
96    /// argument is set to true.
97    Execute(bool),
98    /// Evaluates supplied the result for the active request.
99    HandleResult(SettingHandlerResult),
100    /// Request to remove the active request.
101    RemoveActive,
102    /// Request to remove listen request.
103    EndListen(usize),
104    /// Starts a timeout for resources be torn down. Called when there are no
105    /// more requests to process.
106    TeardownTimeout,
107    /// Request for resources to be torn down.
108    Teardown,
109    /// Request to retry the active request.
110    Retry,
111    /// Requests listen
112    Listen(ListenEvent),
113}
114
115#[derive(Clone, Debug, PartialEq)]
116enum ListenEvent {
117    Restart,
118}
119
120pub(crate) struct SettingProxy {
121    setting_type: SettingType,
122
123    client_signature: Option<service::message::Signature>,
124    active_request: Option<ActiveRequest>,
125    pending_requests: VecDeque<Box<RequestInfo>>,
126    listen_requests: Vec<Rc<RequestInfo>>,
127    next_request_id: usize,
128
129    /// Factory for generating a new controller to service requests.
130    handler_factory: Rc<Mutex<dyn SettingHandlerFactory>>,
131    /// Messenger factory for communication with service components.
132    delegate: service::message::Delegate,
133    /// Messenger to send messages to controllers.
134    messenger: service::message::Messenger,
135    /// Signature for messages from controllers to be direct towards.
136    signature: service::message::Signature,
137    /// Client for communicating events.
138    event_publisher: event::Publisher,
139
140    /// Sender for passing messages about the active requests and controllers.
141    proxy_request_sender: UnboundedSender<ProxyRequest>,
142    max_attempts: u64,
143    teardown_timeout: MonotonicDuration,
144    request_timeout: Option<MonotonicDuration>,
145    retry_on_timeout: bool,
146    teardown_cancellation: Option<futures::channel::oneshot::Sender<()>>,
147    _node: fuchsia_inspect::Node,
148    error_node: fuchsia_inspect::Node,
149    node_errors: VecDeque<NodeError>,
150    error_count: usize,
151
152    /// Inspect logger for active listener counts.
153    listener_logger: Rc<Mutex<ListenerInspectLogger>>,
154}
155
156struct NodeError {
157    _node: fuchsia_inspect::Node,
158    _timestamp: fuchsia_inspect::StringProperty,
159    _value: fuchsia_inspect::StringProperty,
160}
161
162/// Publishes an event to the event_publisher.
163macro_rules! publish {
164    ($self:ident, $event:expr) => {
165        $self.event_publisher.send_event(event::Event::Handler($self.setting_type, $event));
166    };
167}
168
169impl SettingProxy {
170    /// Creates a SettingProxy that is listening to requests from the
171    /// provided receiver and will send responses/updates on the given sender.
172    #[allow(clippy::too_many_arguments)]
173    pub(crate) async fn create(
174        setting_type: SettingType,
175        handler_factory: Rc<Mutex<dyn SettingHandlerFactory>>,
176        delegate: service::message::Delegate,
177        max_attempts: u64,
178        teardown_timeout: MonotonicDuration,
179        request_timeout: Option<MonotonicDuration>,
180        retry_on_timeout: bool,
181        node: fuchsia_inspect::Node,
182        listener_logger: Rc<Mutex<ListenerInspectLogger>>,
183    ) -> Result<service::message::Signature, Error> {
184        let (messenger, receptor) = delegate
185            .create(MessengerType::Addressable(service::Address::Handler(setting_type)))
186            .await
187            .map_err(Error::new)?;
188        let service_signature = receptor.get_signature();
189
190        let event_publisher = event::Publisher::create(
191            &delegate,
192            MessengerType::Addressable(service::Address::EventSource(
193                event::Address::SettingProxy(setting_type),
194            )),
195        )
196        .await;
197
198        let (proxy_request_sender, proxy_request_receiver) =
199            futures::channel::mpsc::unbounded::<ProxyRequest>();
200
201        let error_node = node.create_child("errors");
202
203        // We must create handle here rather than return back the value as we
204        // reference the proxy in the async tasks below.
205        let mut proxy = Self {
206            setting_type,
207            handler_factory,
208            next_request_id: 0,
209            client_signature: None,
210            active_request: None,
211            pending_requests: VecDeque::new(),
212            listen_requests: Vec::new(),
213            delegate,
214            messenger,
215            signature: service_signature,
216            event_publisher,
217            proxy_request_sender,
218            max_attempts,
219            teardown_timeout,
220            request_timeout,
221            retry_on_timeout,
222            teardown_cancellation: None,
223            _node: node,
224            error_node,
225            node_errors: VecDeque::new(),
226            error_count: 0,
227            listener_logger,
228        };
229
230        // Main task loop for receiving and processing incoming messages.
231        fasync::Task::local(async move {
232            let id = ftrace::Id::new();
233            trace!(
234                id,
235                c"setting_proxy",
236                "setting_type" => format!("{setting_type:?}").as_str()
237            );
238            let receptor_fuse = receptor.fuse();
239            let proxy_fuse = proxy_request_receiver.fuse();
240
241            futures::pin_mut!(receptor_fuse, proxy_fuse);
242
243            loop {
244                futures::select! {
245                    // Handles requests from the service MessageHub and
246                    // communication from the setting controller.
247                    event = receptor_fuse.select_next_some() => {
248                        trace!(
249                            id,
250                            c"service event"
251                        );
252                        proxy.process_service_event(id, event).await;
253                    }
254
255                    // Handles messages for enqueueing requests and processing
256                    // results on the main event loop for proxy.
257                    request = proxy_fuse.select_next_some() => {
258                        trace!(
259                            id,
260                            c"proxy request"
261                        );
262                        proxy.process_proxy_request(id, request).await;
263                    }
264                }
265            }
266        })
267        .detach();
268        Ok(service_signature)
269    }
270
271    async fn process_service_event(
272        &mut self,
273        id: ftrace::Id,
274        event: service::message::MessageEvent,
275    ) {
276        if let MessageEvent::Message(payload, client) = event {
277            match payload {
278                service::Payload::Setting(Payload::Request(request)) => {
279                    trace!(id, c"process_service_request");
280                    self.process_service_request(request, client).await;
281                }
282                service::Payload::Controller(setting_handler::Payload::Event(event)) => {
283                    // Messages received after the client signature
284                    // has been changed will be ignored.
285                    let guard = trace_guard!(id, c"get author");
286                    if Some(client.get_author()) != self.client_signature {
287                        return;
288                    }
289                    drop(guard);
290
291                    match event {
292                        Event::Changed(setting_info) => {
293                            trace!(id, c"change notification");
294                            self.notify(setting_info);
295                        }
296                        Event::Exited(result) => {
297                            trace!(id, c"process exit");
298                            self.process_exit(result);
299                        }
300                        Event::StateChanged(_) => {}
301                    }
302                }
303                _ => {
304                    panic!("Unexpected message");
305                }
306            }
307        }
308    }
309
310    async fn process_proxy_request(&mut self, id: ftrace::Id, request: ProxyRequest) {
311        match request {
312            ProxyRequest::Add(request) => {
313                trace!(id, c"add request");
314                self.add_request(request);
315            }
316            ProxyRequest::Execute(recreate_handler) => {
317                trace!(id, c"execute");
318                self.execute_next_request(id, recreate_handler).await;
319            }
320            ProxyRequest::RemoveActive => {
321                trace!(id, c"remove active");
322                self.remove_active_request();
323            }
324            ProxyRequest::TeardownTimeout => {
325                trace!(id, c"teardown timeout");
326                self.start_teardown_timeout().await;
327            }
328            ProxyRequest::Teardown => {
329                trace!(id, c"teardown");
330                self.teardown_if_needed().await;
331            }
332            ProxyRequest::Retry => {
333                trace!(id, c"retry");
334                self.retry();
335            }
336            ProxyRequest::HandleResult(result) => {
337                trace!(id, c"handle result");
338                self.handle_result(result);
339            }
340            ProxyRequest::Listen(event) => {
341                trace!(id, c"handle listen");
342                self.handle_listen(event).await;
343            }
344            ProxyRequest::EndListen(request_id) => {
345                trace!(id, c"handle end listen");
346                self.listener_logger.lock().await.remove_listener(self.setting_type);
347                self.handle_end_listen(request_id).await;
348            }
349        }
350    }
351
352    async fn process_service_request(
353        &mut self,
354        request: HandlerRequest,
355        client: service::message::MessageClient,
356    ) {
357        let id = self.next_request_id;
358        self.next_request_id += 1;
359        self.process_request(Box::new(RequestInfo { setting_request: request, id, client })).await;
360    }
361
362    async fn get_handler_signature(
363        &mut self,
364        force_create: bool,
365    ) -> Option<service::message::Signature> {
366        if force_create || self.client_signature.is_none() {
367            self.client_signature = match self
368                .handler_factory
369                .lock()
370                .await
371                .generate(self.setting_type, self.delegate.clone(), self.signature)
372                .await
373            {
374                Ok(signature) => Some(signature),
375                Err(e) => {
376                    let node = self.error_node.create_child(format!("{:020}", self.error_count));
377                    let timestamp = node.create_string("timestamp", clock::inspect_format_now());
378                    let value = node.create_string("value", format!("{e:?}"));
379                    self.node_errors.push_back(NodeError {
380                        _node: node,
381                        _timestamp: timestamp,
382                        _value: value,
383                    });
384                    self.error_count += 1;
385                    if self.node_errors.len() > MAX_NODE_ERRORS {
386                        let _ = self.node_errors.pop_front();
387                    }
388                    None
389                }
390            };
391        }
392
393        self.client_signature
394    }
395
396    /// Returns whether there is an active listener across the various
397    /// listening clients.
398    fn is_listening(&self) -> bool {
399        !self.listen_requests.is_empty()
400    }
401
402    /// Informs listeners when the controller has indicated the setting has changed.
403    fn notify(&self, setting_info: SettingInfo) {
404        if !self.is_listening() {
405            return;
406        }
407
408        // Notify each listener on the service MessageHub.
409        for request in &self.listen_requests {
410            request.reply(Ok(Some(setting_info.clone())));
411        }
412    }
413
414    fn process_exit(&mut self, result: ExitResult) {
415        // Log the exit
416        self.event_publisher.send_event(event::Event::Handler(
417            self.setting_type,
418            event::handler::Event::Exit(result),
419        ));
420
421        // Clear the setting handler client signature
422        self.client_signature = None;
423
424        // If there is an active request, process the error. Panic if we couldn't process it.
425        if self.active_request.is_some() {
426            self.proxy_request_sender
427                .unbounded_send(ProxyRequest::HandleResult(Err(ControllerError::ExitError)))
428                .expect(
429                    "SettingProxy::process_exit, proxy_request_sender failed to send ExitError\
430                     proxy request",
431                );
432        }
433
434        // If there is an active listener, forefully refetch
435        if self.is_listening() {
436            self.request(ProxyRequest::Listen(ListenEvent::Restart));
437        }
438    }
439
440    /// Ensures we first have an active controller (spun up by
441    /// get_handler_signature if not already active) before adding the request
442    /// to the proxy's queue.
443    async fn process_request(&mut self, request: Box<RequestInfo>) {
444        match self.get_handler_signature(false).await {
445            None => {
446                request.reply(Err(ControllerError::UnhandledType(self.setting_type)));
447            }
448            Some(_) => {
449                self.request(ProxyRequest::Add(request));
450            }
451        }
452    }
453
454    /// Adds a request to the request queue for this setting.
455    ///
456    /// If this is the first request in the queue, processing will begin immediately.
457    ///
458    /// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
459    fn add_request(&mut self, request: Box<RequestInfo>) {
460        if let Some(teardown_cancellation) = self.teardown_cancellation.take() {
461            let _ = teardown_cancellation.send(());
462        }
463        self.pending_requests.push_back(request);
464
465        // If this is the first request (no active request or pending requests),
466        // request the controller begin execution of requests. Otherwise, the
467        // controller is already executing requests and will eventually process
468        // this new request.
469        if self.pending_requests.len() == 1 && self.active_request.is_none() {
470            self.request(ProxyRequest::Execute(false));
471        }
472    }
473
474    /// Sends a request to be processed by the proxy. Requests are sent as
475    /// messages and marshalled onto a single event loop to ensure proper
476    /// ordering.
477    fn request(&self, request: ProxyRequest) {
478        self.proxy_request_sender
479            .unbounded_send(request)
480            .expect("SettingProxy::request, proxy_request_sender cannot send requests anymore");
481    }
482
483    /// Sends an update to the controller about whether or not it should be
484    /// listening.
485    ///
486    /// # Arguments
487    ///
488    /// * `force_recreate_controller` - a bool representing whether the
489    /// controller should be recreated regardless if it is currently running.
490    async fn send_listen_update(&mut self, force_recreate_controller: bool) {
491        let optional_handler_signature =
492            self.get_handler_signature(force_recreate_controller).await;
493        if optional_handler_signature.is_none() {
494            return;
495        }
496
497        let handler_signature =
498            optional_handler_signature.expect("handler signature should be present");
499
500        let _ = self.messenger.message(
501            setting_handler::Payload::Command(Command::ChangeState(if self.is_listening() {
502                State::Listen
503            } else {
504                State::EndListen
505            }))
506            .into(),
507            Audience::Messenger(handler_signature),
508        );
509
510        self.request(ProxyRequest::TeardownTimeout);
511    }
512
513    /// Notifies handler in the case the notification listener count is
514    /// non-zero and we aren't already listening for changes or there
515    /// are no more listeners and we are actively listening.
516    async fn handle_listen(&mut self, event: ListenEvent) {
517        self.send_listen_update(ListenEvent::Restart == event).await;
518    }
519
520    /// Notifies handler in the case the notification listener count is
521    /// non-zero and we aren't already listening for changes or there
522    /// are no more listeners and we are actively listening.
523    async fn handle_end_listen(&mut self, request_id: usize) {
524        let was_listening = self.is_listening();
525
526        if let Some(pos) = self.listen_requests.iter().position(|target| target.id == request_id) {
527            let _ = self.listen_requests.remove(pos);
528        } else {
529            return;
530        }
531
532        if was_listening != self.is_listening() {
533            self.send_listen_update(false).await;
534        }
535    }
536
537    /// Evaluates the supplied result for the current active request. Based
538    /// on the return result, also determines whether the request should be
539    /// retried. Based on this determination, the function will request from
540    /// the proxy whether to retry the request or remove the request (and send
541    /// response).
542    fn handle_result(&mut self, result: SettingHandlerResult) {
543        let active_request = self.active_request.as_mut().expect("request should be present");
544        let mut retry = false;
545
546        if matches!(result, Err(ControllerError::ExternalFailure(..)))
547            || matches!(result, Err(ControllerError::ExitError))
548        {
549            retry = true;
550        } else if matches!(result, Err(ControllerError::TimeoutError)) {
551            publish!(
552                self,
553                event::handler::Event::Request(
554                    event::handler::Action::Timeout,
555                    active_request.get_request()
556                )
557            );
558            retry = self.retry_on_timeout;
559        }
560
561        active_request.last_result = Some(result);
562
563        if retry {
564            self.request(ProxyRequest::Retry);
565        } else {
566            self.request(ProxyRequest::RemoveActive);
567        }
568    }
569
570    /// Removes the active request for this setting.
571    ///
572    /// Should only be called once a request is finished processing.
573    ///
574    /// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
575    fn remove_active_request(&mut self) {
576        let mut removed_request = self.active_request.take().expect("request should be present");
577
578        // Send result back to original caller if present.
579        if let Some(result) = removed_request.last_result.take() {
580            removed_request.request.reply(result)
581        }
582
583        // If there are still requests to process, then request for the next to
584        // be processed. Otherwise request teardown.
585        if !self.pending_requests.is_empty() {
586            self.request(ProxyRequest::Execute(false));
587        } else {
588            self.request(ProxyRequest::TeardownTimeout);
589        }
590    }
591
592    /// Processes the next request in the queue of pending requests.
593    ///
594    /// If the queue is empty, nothing happens.
595    ///
596    /// Should only be called on the main task spawned in [SettingProxy::create](#method.create).
597    async fn execute_next_request(&mut self, id: ftrace::Id, recreate_handler: bool) {
598        if self.active_request.is_none() {
599            let mut pending = self
600                .pending_requests
601                .pop_front()
602                .expect("execute should only be called with present requests");
603
604            if matches!(pending.setting_request, Request::Listen) {
605                // Add a callback when the client side goes out of scope. Panic if the
606                // unbounded_send failed, which indicates the channel got dropped and requests
607                // cannot be processed anymore.
608                let proxy_request_sender = self.proxy_request_sender.clone();
609                let request_id = pending.id;
610                pending.bind_to_scope(Box::new(move || {
611                    proxy_request_sender.unbounded_send(ProxyRequest::EndListen(request_id)).expect(
612                        "SettingProxy::execute_next_request, proxy_request_sender failed to send \
613                        EndListen proxy request with info",
614                    );
615                }))
616                .await;
617            }
618
619            // Add the request to the queue of requests to process.
620            self.active_request =
621                Some(ActiveRequest { request: Rc::from(pending), attempts: 0, last_result: None });
622        }
623
624        // Recreating signature is always honored, even if the request is not.
625        let signature = self
626            .get_handler_signature(recreate_handler)
627            .await
628            .expect("failed to generate handler signature");
629
630        // since we borrow self as mutable for active_request, we must retrieve
631        // the listening state (which borrows immutable) before.
632        let was_listening = self.is_listening();
633
634        let active_request =
635            self.active_request.as_mut().expect("active request should be present");
636
637        active_request.attempts += 1;
638
639        // Note that we must copy these values as we are borrowing self for
640        // active_requests and self is needed to remove active below.
641        let request = active_request.get_request();
642
643        if matches!(request, Request::Listen) {
644            let info = active_request.get_info();
645
646            // Increment the active listener count in inspect.
647            self.listener_logger.lock().await.add_listener(self.setting_type);
648
649            // Add the request to tracked listen requests.
650            self.listen_requests.push(info.clone());
651
652            // Listening requests must be acknowledged as they are long-living.
653            info.acknowledge().await;
654
655            // If listening state has changed, update state.
656            if was_listening != self.is_listening() {
657                self.send_listen_update(false).await;
658            }
659
660            // Request the active request be removed as it is now tracked
661            // elsewhere.
662            self.request(ProxyRequest::RemoveActive);
663            return;
664        }
665
666        // If we have exceeded the maximum number of attempts, remove this
667        // request from the queue.
668        if active_request.attempts > self.max_attempts {
669            publish!(
670                self,
671                event::handler::Event::Request(
672                    event::handler::Action::AttemptsExceeded,
673                    request.clone()
674                )
675            );
676
677            self.request(ProxyRequest::RemoveActive);
678            return;
679        }
680
681        publish!(
682            self,
683            event::handler::Event::Request(event::handler::Action::Execute, request.clone())
684        );
685
686        let mut receptor = self.messenger.message_with_timeout(
687            setting_handler::Payload::Command(Command::HandleRequest(request.clone())).into(),
688            Audience::Messenger(signature),
689            self.request_timeout,
690        );
691
692        let proxy_request_sender_clone = self.proxy_request_sender.clone();
693
694        fasync::Task::local(async move {
695            trace!(id, c"response");
696            while let Some(message_event) = receptor.next().await {
697                let handler_result = match message_event {
698                    MessageEvent::Message(
699                        service::Payload::Controller(setting_handler::Payload::Result(result)),
700                        _,
701                    ) => Some(result),
702                    MessageEvent::Status(Status::Undeliverable) => {
703                        Some(Err(ControllerError::IrrecoverableError))
704                    }
705                    MessageEvent::Status(Status::Timeout) => {
706                        Some(Err(ControllerError::TimeoutError))
707                    }
708                    _ => None,
709                };
710
711                if let Some(result) = handler_result {
712                    // Mark the request as having been handled after retries have been
713                    // attempted and the client has been notified. Panic if the unbounded_send
714                    // failed, which indicates the channel got dropped and requests cannot be
715                    // processed anymore.
716                    proxy_request_sender_clone
717                        .unbounded_send(ProxyRequest::HandleResult(result))
718                        .expect(
719                            "SettingProxy::execute_next_request, proxy_request_sender failed to \
720                            send proxy request",
721                        );
722                    return;
723                }
724            }
725        })
726        .detach();
727    }
728
729    /// Requests the active request to be tried again, forcefully recreating the
730    /// handler.
731    fn retry(&mut self) {
732        publish!(
733            self,
734            event::handler::Event::Request(
735                event::handler::Action::Retry,
736                self.active_request
737                    .as_ref()
738                    .expect("active request should be present")
739                    .get_request(),
740            )
741        );
742
743        self.request(ProxyRequest::Execute(true));
744    }
745
746    fn has_active_work(&self) -> bool {
747        self.active_request.is_some()
748            || !self.pending_requests.is_empty()
749            || self.is_listening()
750            || self.client_signature.is_none()
751    }
752
753    async fn start_teardown_timeout(&mut self) {
754        if self.has_active_work() {
755            return;
756        }
757
758        let (cancellation_tx, cancellation_rx) = futures::channel::oneshot::channel();
759        if self.teardown_cancellation.is_some() {
760            // Do not overwrite the cancellation. We do not want to extend it if it's already
761            // counting down.
762            return;
763        }
764
765        self.teardown_cancellation = Some(cancellation_tx);
766        let sender = self.proxy_request_sender.clone();
767        let teardown_timeout = self.teardown_timeout;
768        fasync::Task::local(async move {
769            let timeout = fuchsia_async::Timer::new(crate::clock::now() + teardown_timeout).fuse();
770            futures::pin_mut!(cancellation_rx, timeout);
771            futures::select! {
772                _ = cancellation_rx => {
773                    // Exit the loop and do not send teardown message when cancellation received.
774                    return;
775                }
776                _ = timeout => {}, // no-op
777            }
778
779            // Panic if the unbounded_send failed, which indicates the channel got dropped and
780            // requests cannot be processed anymore.
781            sender.unbounded_send(ProxyRequest::Teardown).expect(
782                "SettingProxy::start_teardown_timeout, proxy_request_sender failed to send Teardown\
783                 proxy request",
784            );
785        })
786        .detach();
787    }
788
789    /// Transitions the controller for the `setting_type` to the Teardown phase
790    /// and removes it from the active_controllers.
791    async fn teardown_if_needed(&mut self) {
792        if self.has_active_work() {
793            return;
794        }
795
796        let signature = self.client_signature.take().expect("signature should be set");
797
798        let mut controller_receptor = self.messenger.message(
799            setting_handler::Payload::Command(Command::ChangeState(State::Teardown)).into(),
800            Audience::Messenger(signature),
801        );
802
803        // Wait for the teardown phase to be over before continuing.
804        if controller_receptor.next().await != Some(MessageEvent::Status(Status::Received)) {
805            log::error!("Failed to tear down {:?} controller", self.setting_type);
806        }
807
808        // This ensures that the client event loop for the corresponding controller is
809        // properly stopped. Without this, the client event loop will run forever.
810        self.delegate.delete(signature);
811
812        publish!(self, event::handler::Event::Teardown);
813    }
814}