1use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::container::CursorItem;
7use crate::logs::servers::{extend_fxt_record, ExtendRecordOpts};
8use fidl_fuchsia_diagnostics::{
9 DataType, Format, FormattedContent, StreamMode, MAXIMUM_ENTRIES_PER_BATCH,
10};
11use fuchsia_sync::Mutex;
12
13use futures::prelude::*;
14use log::{error, warn};
15use serde::Serialize;
16use std::io::{BufWriter, Result as IoResult, Write};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21pub type FormattedStream =
22 Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
23
24#[pin_project::pin_project]
25pub struct FormattedContentBatcher<C> {
26 #[pin]
27 items: C,
28 stats: Arc<BatchIteratorConnectionStats>,
29}
30
31pub fn new_batcher<I, T, E>(
39 items: I,
40 stats: Arc<BatchIteratorConnectionStats>,
41 mode: StreamMode,
42) -> FormattedStream
43where
44 I: Stream<Item = Result<T, E>> + Send + 'static,
45 T: Into<FormattedContent> + Send,
46 E: Into<AccessorError> + Send,
47{
48 match mode {
49 StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
50 Box::pin(FormattedContentBatcher {
51 items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
52 stats,
53 })
54 }
55 StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
56 items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
57 stats,
58 }),
59 }
60}
61
62impl<I, T, E> Stream for FormattedContentBatcher<I>
63where
64 I: Stream<Item = Vec<Result<T, E>>>,
65 T: Into<FormattedContent>,
66 E: Into<AccessorError>,
67{
68 type Item = Vec<Result<FormattedContent, AccessorError>>;
69
70 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 let this = self.project();
72 match this.items.poll_next(cx) {
73 Poll::Ready(Some(chunk)) => {
74 let mut batch = vec![];
76 for item in chunk {
77 let result = match item {
78 Ok(i) => Ok(i.into()),
79 Err(e) => {
80 this.stats.add_result_error();
81 Err(e.into())
82 }
83 };
84 batch.push(result);
85 }
86 Poll::Ready(Some(batch))
87 }
88 Poll::Ready(None) => Poll::Ready(None),
89 Poll::Pending => Poll::Pending,
90 }
91 }
92}
93
94#[derive(Clone)]
95struct VmoWriter {
96 inner: Arc<Mutex<InnerVmoWriter>>,
97}
98
99enum InnerVmoWriter {
100 Active { vmo: zx::Vmo, capacity: u64, tail: u64 },
101 Done,
102}
103
104impl VmoWriter {
105 fn new(start_size: u64) -> Self {
107 let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, start_size)
108 .expect("can always create resizable vmo's");
109 let capacity = vmo.get_size().expect("can always read vmo size");
110 Self { inner: Arc::new(Mutex::new(InnerVmoWriter::Active { vmo, capacity, tail: 0 })) }
111 }
112
113 fn tail(&self) -> u64 {
114 let guard = self.inner.lock();
115 match &*guard {
116 InnerVmoWriter::Done => 0,
117 InnerVmoWriter::Active { tail, .. } => *tail,
118 }
119 }
120
121 fn capacity(&self) -> u64 {
122 let guard = self.inner.lock();
123 match &*guard {
124 InnerVmoWriter::Done => 0,
125 InnerVmoWriter::Active { capacity, .. } => *capacity,
126 }
127 }
128
129 fn finalize(self) -> Option<(zx::Vmo, u64)> {
130 let mut inner = self.inner.lock();
131 let mut swapped = InnerVmoWriter::Done;
132 std::mem::swap(&mut *inner, &mut swapped);
133 match swapped {
134 InnerVmoWriter::Done => None,
135 InnerVmoWriter::Active { vmo, tail, .. } => Some((vmo, tail)),
136 }
137 }
138
139 fn reset(&mut self, new_tail: u64, new_capacity: u64) {
140 let mut inner = self.inner.lock();
141 match &mut *inner {
142 InnerVmoWriter::Done => {}
143 InnerVmoWriter::Active { vmo, capacity, tail } => {
144 vmo.set_size(new_capacity).expect("can always resize a plain vmo");
145 *capacity = new_capacity;
146 *tail = new_tail;
147 }
148 }
149 }
150}
151
152impl Write for VmoWriter {
153 fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
154 match &mut *self.inner.lock() {
155 InnerVmoWriter::Done => Ok(0),
156 InnerVmoWriter::Active { vmo, tail, capacity } => {
157 let new_tail = *tail + buf.len() as u64;
158 if new_tail > *capacity {
159 vmo.set_size(new_tail).expect("can always resize a plain vmo");
160 *capacity = new_tail;
161 }
162 vmo.write(buf, *tail)?;
163 *tail = new_tail;
164 Ok(buf.len())
165 }
166 }
167 }
168
169 fn flush(&mut self) -> IoResult<()> {
170 Ok(())
171 }
172}
173
174pub struct SerializedVmo {
176 pub vmo: zx::Vmo,
177 pub size: u64,
178 format: Format,
179}
180
181fn fxt_to_writer<W: std::io::Write>(mut writer: W, item: &CursorItem) -> Result<(), AccessorError> {
182 let value = extend_fxt_record(
183 item.message.bytes(),
184 &item.identity,
185 item.rolled_out,
186 &ExtendRecordOpts { component_url: true, moniker: true, rolled_out: true },
187 );
188 Ok(writer.write_all(&value)?)
189}
190
191impl SerializedVmo {
192 pub fn serialize(
193 source: &impl Serialize,
194 data_type: DataType,
195 format: Format,
196 ) -> Result<Self, AccessorError> {
197 let writer = VmoWriter::new(match data_type {
198 DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES as u64,
199 DataType::Logs => 4096, });
203 let batch_writer = BufWriter::new(writer.clone());
204 match format {
205 Format::Json => {
206 serde_json::to_writer(batch_writer, source).map_err(AccessorError::Serialization)?
207 }
208 Format::Cbor => ciborium::into_writer(source, batch_writer)
209 .map_err(|err| AccessorError::CborSerialization(err.into()))?,
210 Format::Text => unreachable!("We'll never get Text"),
211 Format::Fxt => unreachable!("We'll never get FXT"),
212 }
213 let (vmo, tail) = writer.finalize().unwrap();
215 Ok(Self { vmo, size: tail, format })
216 }
217}
218
219impl From<SerializedVmo> for FormattedContent {
220 fn from(content: SerializedVmo) -> FormattedContent {
221 match content.format {
222 Format::Json => {
223 content
225 .vmo
226 .set_content_size(&content.size)
227 .expect("set_content_size always returns Ok");
228 FormattedContent::Json(fidl_fuchsia_mem::Buffer {
229 vmo: content.vmo,
230 size: content.size,
231 })
232 }
233 Format::Cbor => {
234 content
235 .vmo
236 .set_content_size(&content.size)
237 .expect("set_content_size always returns Ok");
238 FormattedContent::Cbor(content.vmo)
239 }
240 Format::Fxt => {
241 content
242 .vmo
243 .set_content_size(&content.size)
244 .expect("set_content_size always returns Ok");
245 FormattedContent::Fxt(content.vmo)
246 }
247 Format::Text => unreachable!("We'll never get Text"),
248 }
249 }
250}
251
252#[pin_project::pin_project]
255pub struct FXTPacketSerializer<I> {
256 #[pin]
257 items: I,
258 stats: Option<Arc<BatchIteratorConnectionStats>>,
259 max_packet_size: u64,
260 overflow: Option<CursorItem>,
261}
262
263impl<I> FXTPacketSerializer<I> {
264 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
265 Self { items, stats: Some(stats), max_packet_size, overflow: None }
266 }
267}
268
269impl<I> Stream for FXTPacketSerializer<I>
270where
271 I: Stream<Item = CursorItem> + Unpin,
272{
273 type Item = Result<SerializedVmo, AccessorError>;
274
275 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
278 let mut this = self.project();
279 let mut writer = VmoWriter::new(*this.max_packet_size);
280
281 if let Some(item) = this.overflow.take() {
282 let batch_writer = BufWriter::new(writer.clone());
283 fxt_to_writer(batch_writer, &item)?;
284 if let Some(stats) = &this.stats {
285 stats.add_result();
286 }
287 }
288
289 let mut items_is_pending = false;
290 loop {
291 let item = match this.items.poll_next_unpin(cx) {
292 Poll::Ready(Some(item)) => item,
293 Poll::Ready(None) => break,
294 Poll::Pending => {
295 items_is_pending = true;
296 break;
297 }
298 };
299
300 let writer_tail = writer.tail();
301 let (last_tail, previous_size) = (writer_tail, writer.capacity());
302 let batch_writer = BufWriter::new(writer.clone());
303 fxt_to_writer(batch_writer, &item)?;
304 let writer_tail = writer.tail();
305
306 if writer_tail > *this.max_packet_size {
307 writer.reset(last_tail, previous_size);
308 *this.overflow = Some(item);
309 break;
310 }
311
312 if let Some(stats) = &this.stats {
313 stats.add_result();
314 }
315 }
316
317 let writer_tail = writer.tail();
318
319 if writer_tail > 2 {
320 let (vmo, size) = writer.finalize().unwrap();
322 Poll::Ready(Some(Ok(SerializedVmo { vmo, size, format: Format::Fxt })))
323 } else if items_is_pending {
324 Poll::Pending
325 } else {
326 Poll::Ready(None)
327 }
328 }
329}
330
331#[pin_project::pin_project]
334pub struct JsonPacketSerializer<I, S> {
335 #[pin]
336 items: I,
337 stats: Option<Arc<BatchIteratorConnectionStats>>,
338 max_packet_size: u64,
339 overflow: Option<S>,
340}
341
342impl<I, S> JsonPacketSerializer<I, S> {
343 pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
344 Self { items, stats: Some(stats), max_packet_size, overflow: None }
345 }
346
347 pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
348 Self { items, max_packet_size, overflow: None, stats: None }
349 }
350}
351
352impl<I, S> Stream for JsonPacketSerializer<I, S>
353where
354 I: Stream<Item = S> + Unpin,
355 S: Serialize,
356{
357 type Item = Result<SerializedVmo, AccessorError>;
358
359 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 let mut this = self.project();
363 let mut writer = VmoWriter::new(*this.max_packet_size);
364 writer.write_all(b"[")?;
365
366 if let Some(item) = this.overflow.take() {
367 let batch_writer = BufWriter::new(writer.clone());
368 serde_json::to_writer(batch_writer, &item)?;
369 if let Some(stats) = &this.stats {
370 stats.add_result();
371 }
372 }
373
374 let mut items_is_pending = false;
375 loop {
376 let item = match this.items.poll_next_unpin(cx) {
377 Poll::Ready(Some(item)) => item,
378 Poll::Ready(None) => break,
379 Poll::Pending => {
380 items_is_pending = true;
381 break;
382 }
383 };
384
385 let writer_tail = writer.tail();
386 let is_first = writer_tail == 1;
387 let (last_tail, previous_size) = (writer_tail, writer.capacity());
388 if !is_first {
389 writer.write_all(",\n".as_bytes())?;
390 }
391 let batch_writer = BufWriter::new(writer.clone());
392 serde_json::to_writer(batch_writer, &item)?;
393 let writer_tail = writer.tail();
394 let item_len = writer_tail - last_tail;
395
396 if item_len + 1 >= *this.max_packet_size {
398 warn!(
399 "serializing oversize item into packet (limit={} actual={})",
400 *this.max_packet_size,
401 writer_tail - last_tail,
402 );
403 }
404
405 if writer_tail + 1 > *this.max_packet_size {
407 writer.reset(last_tail, previous_size);
408 *this.overflow = Some(item);
409 break;
410 }
411
412 if let Some(stats) = &this.stats {
413 stats.add_result();
414 }
415 }
416
417 writer.write_all(b"]")?;
418 let writer_tail = writer.tail();
419 if writer_tail > *this.max_packet_size {
420 error!(
421 actual = writer_tail,
422 max = *this.max_packet_size;
423 "returned a string longer than maximum specified",
424 )
425 }
426
427 if writer_tail > 2 {
430 let (vmo, size) = writer.finalize().unwrap();
432 Poll::Ready(Some(Ok(SerializedVmo { vmo, size, format: Format::Json })))
433 } else if items_is_pending {
434 Poll::Pending
435 } else {
436 Poll::Ready(None)
437 }
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::diagnostics::AccessorStats;
445 use futures::stream::iter;
446
447 #[fuchsia::test]
448 async fn two_items_joined_and_split() {
449 let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
450 let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
451 let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
452 let smallest_possible_joined_len = joined[0].len() as u64;
453
454 let make_packets = |max| async move {
455 let node = fuchsia_inspect::Node::default();
456 let accessor_stats = Arc::new(AccessorStats::new(node));
457 let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
458 JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
459 .collect::<Vec<_>>()
460 .await
461 .into_iter()
462 .map(|r| {
463 let result = r.unwrap();
464 let mut buf = vec![0; result.size as usize];
465 result.vmo.read(&mut buf, 0).expect("reading vmo");
466 std::str::from_utf8(&buf).unwrap().to_string()
467 })
468 .collect::<Vec<_>>()
469 };
470
471 let actual_joined = make_packets(smallest_possible_joined_len).await;
472 assert_eq!(&actual_joined[..], joined);
473
474 let actual_split = make_packets(smallest_possible_joined_len - 1).await;
475 assert_eq!(&actual_split[..], split);
476 }
477}