archivist_lib/logs/servers/
log_flush.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::error::LogsError;
6use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
7use fidl_fuchsia_diagnostics as fdiagnostics;
8use fuchsia_async::Scope;
9use futures::StreamExt;
10use futures::channel::mpsc::UnboundedSender;
11use futures::channel::oneshot;
12use log::warn;
13
14pub struct LogFlushServer {
15    scope: Scope,
16    /// Channel used to request log flushing
17    flush_channel: UnboundedSender<oneshot::Sender<()>>,
18}
19
20impl LogFlushServer {
21    pub fn new(scope: Scope, flush_channel: UnboundedSender<oneshot::Sender<()>>) -> Self {
22        Self { scope, flush_channel }
23    }
24
25    /// Listens for flush requests on a FIDL channel.
26    pub fn spawn(&self, stream: fdiagnostics::LogFlusherRequestStream) {
27        let flush_channel = self.flush_channel.clone();
28        self.scope.spawn(async move {
29            if let Err(e) = Self::handle_requests(stream, flush_channel).await {
30                warn!("error handling Log requests: {}", e);
31            }
32        });
33    }
34
35    /// Actually handle the FIDL requests.
36    async fn handle_requests(
37        mut stream: fdiagnostics::LogFlusherRequestStream,
38        flush_channel: UnboundedSender<oneshot::Sender<()>>,
39    ) -> Result<(), LogsError> {
40        while let Some(request) = stream.next().await {
41            let request = request.map_err(|source| LogsError::HandlingRequests {
42                protocol: fdiagnostics::LogFlusherMarker::PROTOCOL_NAME,
43                source,
44            })?;
45
46            match request {
47                fdiagnostics::LogFlusherRequest::WaitUntilFlushed { responder } => {
48                    let (sender, receiver) = oneshot::channel();
49                    // This will be dropped if we're in a configuration without serial, in which case
50                    // we just ignore and reply to the request immediately.
51                    let _ = flush_channel.unbounded_send(sender);
52
53                    // Wait for flush to complete
54                    let _ = receiver.await;
55
56                    // We don't care if the other side exits
57                    let _ = responder.send();
58                }
59                fdiagnostics::LogFlusherRequest::_UnknownMethod {
60                    ordinal,
61                    method_type,
62                    control_handle,
63                    ..
64                } => {
65                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
66                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
67                }
68            }
69        }
70        Ok(())
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use fidl::endpoints::create_proxy;
78    use futures::FutureExt;
79    use futures::channel::mpsc::unbounded;
80    use std::pin::pin;
81
82    #[fuchsia::test]
83    async fn all_logs_get_flushed_when_flush_is_received_before_returning_from_flush() {
84        let (flush_requester, mut flush_receiver) = unbounded();
85        let server = LogFlushServer::new(Scope::new(), flush_requester);
86
87        let (proxy, stream) = create_proxy::<fdiagnostics::LogFlusherMarker>();
88        server.spawn(stream.into_stream());
89
90        let mut flush_fut = pin!(proxy.wait_until_flushed());
91
92        // The future shouldn't be ready.
93        assert!(flush_fut.as_mut().now_or_never().is_none());
94
95        // The server should have sent a flush request.
96        let flush_ack_sender = flush_receiver.next().await.unwrap();
97
98        // The future still shouldn't be ready.
99        assert!(flush_fut.as_mut().now_or_never().is_none());
100
101        // Ack the flush.
102        flush_ack_sender.send(()).unwrap();
103
104        // The future should be ready now.
105        flush_fut.await.unwrap();
106    }
107}