archivist_lib/logs/servers/
log_flush.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::error::LogsError;
6use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
7use fidl_fuchsia_diagnostics as fdiagnostics;
8use fuchsia_async::Scope;
9use futures::channel::mpsc::{unbounded, UnboundedSender};
10use futures::StreamExt;
11use log::warn;
12
13pub struct LogFlushServer {
14    scope: Scope,
15    /// Channel used to request log flushing
16    flush_channel: UnboundedSender<UnboundedSender<()>>,
17}
18
19impl LogFlushServer {
20    pub fn new(scope: Scope, flush_channel: UnboundedSender<UnboundedSender<()>>) -> Self {
21        Self { scope, flush_channel }
22    }
23
24    /// Listens for flush requests on a FIDL channel.
25    pub fn spawn(&self, stream: fdiagnostics::LogFlusherRequestStream) {
26        let flush_channel = self.flush_channel.clone();
27        self.scope.spawn(async move {
28            if let Err(e) = Self::handle_requests(stream, flush_channel).await {
29                warn!("error handling Log requests: {}", e);
30            }
31        });
32    }
33
34    /// Actually handle the FIDL requests.
35    async fn handle_requests(
36        mut stream: fdiagnostics::LogFlusherRequestStream,
37        flush_channel: UnboundedSender<UnboundedSender<()>>,
38    ) -> Result<(), LogsError> {
39        while let Some(request) = stream.next().await {
40            let request = request.map_err(|source| LogsError::HandlingRequests {
41                protocol: fdiagnostics::LogFlusherMarker::PROTOCOL_NAME,
42                source,
43            })?;
44
45            match request {
46                fdiagnostics::LogFlusherRequest::WaitUntilFlushed { responder } => {
47                    let (sender, mut receiver) = unbounded();
48                    // This will be dropped if we're in a configuration without serial, in which case
49                    // we just ignore and reply to the request immediately.
50                    let _ = flush_channel.unbounded_send(sender);
51
52                    // Wait for flush to complete
53                    let _ = receiver.next().await;
54
55                    // We don't care if the other side exits
56                    let _ = responder.send();
57                }
58                fdiagnostics::LogFlusherRequest::_UnknownMethod {
59                    ordinal,
60                    method_type,
61                    control_handle,
62                    ..
63                } => {
64                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
65                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
66                }
67            }
68        }
69        Ok(())
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use fidl::endpoints::create_proxy;
77    use futures::FutureExt;
78    use std::pin::pin;
79
80    #[fuchsia::test]
81    async fn all_logs_get_flushed_when_flush_is_received_before_returning_from_flush() {
82        let (flush_requester, mut flush_receiver) = unbounded();
83        let server = LogFlushServer::new(Scope::new(), flush_requester);
84
85        let (proxy, stream) = create_proxy::<fdiagnostics::LogFlusherMarker>();
86        server.spawn(stream.into_stream());
87
88        let mut flush_fut = pin!(proxy.wait_until_flushed());
89
90        // The future shouldn't be ready.
91        assert!(flush_fut.as_mut().now_or_never().is_none());
92
93        // The server should have sent a flush request.
94        let flush_ack_sender = flush_receiver.next().await.unwrap();
95
96        // The future still shouldn't be ready.
97        assert!(flush_fut.as_mut().now_or_never().is_none());
98
99        // Ack the flush.
100        flush_ack_sender.unbounded_send(()).unwrap();
101
102        // The future should be ready now.
103        flush_fut.await.unwrap();
104    }
105}