fuchsia_inspect_auto_persist/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_diagnostics_persist::PersistResult;
6use futures::channel::mpsc;
7use futures::{Future, StreamExt};
8use injectable_time::{MonotonicInstant, TimeSource};
9use log::{error, info};
10use std::ops::{Deref, DerefMut};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14pub type PersistenceReqSender = mpsc::Sender<String>;
15
16/// Wrapper around an Inspect node T so that after the node is accessed (and written to),
17/// the corresponding Data Persistence tag would be sent through a channel so that it
18/// can be forwarded to the Data Persistence Service.
19pub struct AutoPersist<T> {
20    inspect_node: T,
21    persistence_tag: String,
22    persistence_req_sender: PersistenceReqSender,
23    sender_is_blocked: Arc<AtomicBool>,
24}
25
26impl<T> AutoPersist<T> {
27    pub fn new(
28        inspect_node: T,
29        persistence_tag: &str,
30        persistence_req_sender: PersistenceReqSender,
31    ) -> Self {
32        Self {
33            inspect_node,
34            persistence_tag: persistence_tag.to_string(),
35            persistence_req_sender,
36            sender_is_blocked: Arc::new(AtomicBool::new(false)),
37        }
38    }
39
40    /// Return a guard that derefs to `inspect_node`. When the guard is dropped,
41    /// `persistence_tag` is sent via the `persistence_req_sender`.
42    pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
43        AutoPersistGuard {
44            inspect_node: &mut self.inspect_node,
45            persistence_tag: &self.persistence_tag,
46            persistence_req_sender: &mut self.persistence_req_sender,
47            sender_is_blocked: Arc::clone(&self.sender_is_blocked),
48        }
49    }
50}
51
52pub struct AutoPersistGuard<'a, T> {
53    inspect_node: &'a mut T,
54    persistence_tag: &'a str,
55    persistence_req_sender: &'a mut PersistenceReqSender,
56    sender_is_blocked: Arc<AtomicBool>,
57}
58
59impl<T> Deref for AutoPersistGuard<'_, T> {
60    type Target = T;
61
62    fn deref(&self) -> &Self::Target {
63        self.inspect_node
64    }
65}
66
67impl<T> DerefMut for AutoPersistGuard<'_, T> {
68    fn deref_mut(&mut self) -> &mut Self::Target {
69        self.inspect_node
70    }
71}
72
73impl<T> Drop for AutoPersistGuard<'_, T> {
74    fn drop(&mut self) {
75        if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
76            // If sender has not been blocked before, set bool to true and log error message
77            if self
78                .sender_is_blocked
79                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
80                .is_ok()
81            {
82                error!("PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting");
83            }
84        } else {
85            // If sender has been blocked before, set bool to false and log message
86            if self
87                .sender_is_blocked
88                .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
89                .is_ok()
90            {
91                info!("PersistenceReqSender recovered and resumed sending");
92            }
93        }
94    }
95}
96
97fn log_at_most_once_per_min_factory(
98    time_source: impl TimeSource,
99    mut log_fn: impl FnMut(String),
100) -> impl FnMut(String) {
101    let mut last_logged = None;
102    move |message| {
103        let now = zx::MonotonicInstant::from_nanos(time_source.now());
104        let should_log = match last_logged {
105            Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
106            None => true,
107        };
108        if should_log {
109            log_fn(message);
110            last_logged.replace(now);
111        }
112    }
113}
114
115// arbitrary value
116const DEFAULT_BUFFER_SIZE: usize = 100;
117
118/// Create a sender for sending Persistence tag, and a Future representing a sending thread
119/// that forwards that tag to the Data Persistence service.
120///
121/// If the sending thread fails to forward a tag, or the Persistence Service returns an error
122/// code, an error will be logged. However, an error is only logged at most once per minute
123/// to avoid log spam.
124pub fn create_persistence_req_sender(
125    persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
126) -> (PersistenceReqSender, impl Future<Output = ()>) {
127    let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
128    let fut = async move {
129        let persistence_proxy = persistence_proxy.clone();
130        let mut log_error =
131            log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
132        while let Some(tag_name) = receiver.next().await {
133            let resp = persistence_proxy.persist(&tag_name).await;
134            match resp {
135                Ok(PersistResult::Queued) => continue,
136                Ok(other) => log_error(format!(
137                    "Persistence Service returned an error for tag {tag_name}: {other:?}"
138                )),
139                Err(e) => log_error(format!(
140                    "Failed to send request to Persistence Service for tag {tag_name}: {e}"
141                )),
142            }
143        }
144    };
145    (sender, fut)
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use fidl::endpoints::create_proxy_and_stream;
152    use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
153    use fuchsia_async as fasync;
154    use fuchsia_inspect::Inspector;
155    use futures::task::Poll;
156    use std::cell::RefCell;
157    use std::pin::pin;
158    use std::rc::Rc;
159
160    #[fuchsia::test]
161    fn test_auto_persist() {
162        let (sender, mut receiver) = mpsc::channel::<String>(100);
163        let inspector = Inspector::default();
164        let node = inspector.root().create_child("node");
165        let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
166
167        // There should be no message on the receiver end yet
168        assert!(receiver.try_next().is_err());
169
170        {
171            let _guard = auto_persist_node.get_mut();
172        }
173
174        match receiver.try_next() {
175            Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
176            _ => panic!("expect message in receiver"),
177        }
178    }
179
180    #[fuchsia::test]
181    fn test_create_persistence_req_sender() {
182        let mut exec = fasync::TestExecutor::new();
183        let (persistence_proxy, mut persistence_stream) =
184            create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
185        let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
186
187        let mut req_forwarder_fut = pin!(req_forwarder_fut);
188
189        // Nothing has happened yet, so these futures should be Pending
190        match exec.run_until_stalled(&mut req_forwarder_fut) {
191            Poll::Pending => (),
192            other => panic!("unexpected variant: {other:?}"),
193        };
194        match exec.run_until_stalled(&mut persistence_stream.next()) {
195            Poll::Pending => (),
196            other => panic!("unexpected variant: {other:?}"),
197        };
198
199        assert!(req_sender.try_send("some-tag".to_string()).is_ok());
200
201        // req_forwarder_fut still Pending because it's a loop
202        match exec.run_until_stalled(&mut req_forwarder_fut) {
203            Poll::Pending => (),
204            other => panic!("unexpected variant: {other:?}"),
205        };
206        // There should be a message in the stream now
207        match exec.run_until_stalled(&mut persistence_stream.next()) {
208            Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
209                assert_eq!(tag, "some-tag")
210            }
211            other => panic!("unexpected variant: {other:?}"),
212        };
213    }
214
215    #[derive(Debug)]
216    struct FakeTimeSource {
217        now: Rc<RefCell<zx::MonotonicInstant>>,
218    }
219
220    impl TimeSource for FakeTimeSource {
221        fn now(&self) -> i64 {
222            self.now.borrow().into_nanos()
223        }
224    }
225
226    #[fuchsia::test]
227    fn test_log_at_most_once_per_min_factory() {
228        let log_count = Rc::new(RefCell::new(0));
229        let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
230        let fake_time_source = FakeTimeSource { now: now.clone() };
231        let mut log =
232            log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
233
234        log("message 1".to_string());
235        assert_eq!(*log_count.borrow(), 1);
236
237        // No time has passed, so log_count shouldn't increase
238        log("message 2".to_string());
239        assert_eq!(*log_count.borrow(), 1);
240
241        {
242            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
243        }
244
245        // Not enough time has passed, so log_count shouldn't increase
246        log("message 3".to_string());
247        assert_eq!(*log_count.borrow(), 1);
248
249        {
250            *now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
251        }
252
253        // Enough time has passed, so log_count should increase
254        log("message 3".to_string());
255        assert_eq!(*log_count.borrow(), 2);
256    }
257}