archivist_lib/logs/servers/
log_flush.rs1use crate::logs::error::LogsError;
6use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
7use fidl_fuchsia_diagnostics as fdiagnostics;
8use fuchsia_async::Scope;
9use futures::channel::mpsc::{unbounded, UnboundedSender};
10use futures::StreamExt;
11use log::warn;
12
13pub struct LogFlushServer {
14 scope: Scope,
15 flush_channel: UnboundedSender<UnboundedSender<()>>,
17}
18
19impl LogFlushServer {
20 pub fn new(scope: Scope, flush_channel: UnboundedSender<UnboundedSender<()>>) -> Self {
21 Self { scope, flush_channel }
22 }
23
24 pub fn spawn(&self, stream: fdiagnostics::LogFlusherRequestStream) {
26 let flush_channel = self.flush_channel.clone();
27 self.scope.spawn(async move {
28 if let Err(e) = Self::handle_requests(stream, flush_channel).await {
29 warn!("error handling Log requests: {}", e);
30 }
31 });
32 }
33
34 async fn handle_requests(
36 mut stream: fdiagnostics::LogFlusherRequestStream,
37 flush_channel: UnboundedSender<UnboundedSender<()>>,
38 ) -> Result<(), LogsError> {
39 while let Some(request) = stream.next().await {
40 let request = request.map_err(|source| LogsError::HandlingRequests {
41 protocol: fdiagnostics::LogFlusherMarker::PROTOCOL_NAME,
42 source,
43 })?;
44
45 match request {
46 fdiagnostics::LogFlusherRequest::WaitUntilFlushed { responder } => {
47 let (sender, mut receiver) = unbounded();
48 let _ = flush_channel.unbounded_send(sender);
51
52 let _ = receiver.next().await;
54
55 let _ = responder.send();
57 }
58 fdiagnostics::LogFlusherRequest::_UnknownMethod {
59 ordinal,
60 method_type,
61 control_handle,
62 ..
63 } => {
64 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
65 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
66 }
67 }
68 }
69 Ok(())
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76 use fidl::endpoints::create_proxy;
77 use futures::FutureExt;
78 use std::pin::pin;
79
80 #[fuchsia::test]
81 async fn all_logs_get_flushed_when_flush_is_received_before_returning_from_flush() {
82 let (flush_requester, mut flush_receiver) = unbounded();
83 let server = LogFlushServer::new(Scope::new(), flush_requester);
84
85 let (proxy, stream) = create_proxy::<fdiagnostics::LogFlusherMarker>();
86 server.spawn(stream.into_stream());
87
88 let mut flush_fut = pin!(proxy.wait_until_flushed());
89
90 assert!(flush_fut.as_mut().now_or_never().is_none());
92
93 let flush_ack_sender = flush_receiver.next().await.unwrap();
95
96 assert!(flush_fut.as_mut().now_or_never().is_none());
98
99 flush_ack_sender.unbounded_send(()).unwrap();
101
102 flush_fut.await.unwrap();
104 }
105}