archivist_lib/logs/servers/
log_flush.rs1use crate::logs::error::LogsError;
6use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
7use fidl_fuchsia_diagnostics as fdiagnostics;
8use fuchsia_async::Scope;
9use futures::StreamExt;
10use futures::channel::mpsc::UnboundedSender;
11use futures::channel::oneshot;
12use log::warn;
13
14pub struct LogFlushServer {
15 scope: Scope,
16 flush_channel: UnboundedSender<oneshot::Sender<()>>,
18}
19
20impl LogFlushServer {
21 pub fn new(scope: Scope, flush_channel: UnboundedSender<oneshot::Sender<()>>) -> Self {
22 Self { scope, flush_channel }
23 }
24
25 pub fn spawn(&self, stream: fdiagnostics::LogFlusherRequestStream) {
27 let flush_channel = self.flush_channel.clone();
28 self.scope.spawn(async move {
29 if let Err(e) = Self::handle_requests(stream, flush_channel).await {
30 warn!("error handling Log requests: {}", e);
31 }
32 });
33 }
34
35 async fn handle_requests(
37 mut stream: fdiagnostics::LogFlusherRequestStream,
38 flush_channel: UnboundedSender<oneshot::Sender<()>>,
39 ) -> Result<(), LogsError> {
40 while let Some(request) = stream.next().await {
41 let request = request.map_err(|source| LogsError::HandlingRequests {
42 protocol: fdiagnostics::LogFlusherMarker::PROTOCOL_NAME,
43 source,
44 })?;
45
46 match request {
47 fdiagnostics::LogFlusherRequest::WaitUntilFlushed { responder } => {
48 let (sender, receiver) = oneshot::channel();
49 let _ = flush_channel.unbounded_send(sender);
52
53 let _ = receiver.await;
55
56 let _ = responder.send();
58 }
59 fdiagnostics::LogFlusherRequest::_UnknownMethod {
60 ordinal,
61 method_type,
62 control_handle,
63 ..
64 } => {
65 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
66 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
67 }
68 }
69 }
70 Ok(())
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use fidl::endpoints::create_proxy;
78 use futures::FutureExt;
79 use futures::channel::mpsc::unbounded;
80 use std::pin::pin;
81
82 #[fuchsia::test]
83 async fn all_logs_get_flushed_when_flush_is_received_before_returning_from_flush() {
84 let (flush_requester, mut flush_receiver) = unbounded();
85 let server = LogFlushServer::new(Scope::new(), flush_requester);
86
87 let (proxy, stream) = create_proxy::<fdiagnostics::LogFlusherMarker>();
88 server.spawn(stream.into_stream());
89
90 let mut flush_fut = pin!(proxy.wait_until_flushed());
91
92 assert!(flush_fut.as_mut().now_or_never().is_none());
94
95 let flush_ack_sender = flush_receiver.next().await.unwrap();
97
98 assert!(flush_fut.as_mut().now_or_never().is_none());
100
101 flush_ack_sender.send(()).unwrap();
103
104 flush_fut.await.unwrap();
106 }
107}