archivist_lib/
formatter.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::diagnostics::BatchIteratorConnectionStats;
5use crate::error::AccessorError;
6use crate::logs::container::CursorItem;
7use crate::logs::servers::{extend_fxt_record, ExtendRecordOpts};
8use fidl_fuchsia_diagnostics::{
9    DataType, Format, FormattedContent, StreamMode, MAXIMUM_ENTRIES_PER_BATCH,
10};
11use fuchsia_sync::Mutex;
12
13use futures::prelude::*;
14use log::{error, warn};
15use serde::Serialize;
16use std::io::{BufWriter, Result as IoResult, Write};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21pub type FormattedStream =
22    Pin<Box<dyn Stream<Item = Vec<Result<FormattedContent, AccessorError>>> + Send>>;
23
24#[pin_project::pin_project]
25pub struct FormattedContentBatcher<C> {
26    #[pin]
27    items: C,
28    stats: Arc<BatchIteratorConnectionStats>,
29}
30
31/// Make a new `FormattedContentBatcher` with a chunking strategy depending on stream mode.
32///
33/// In snapshot mode, batched items will not be flushed to the client until the batch is complete
34/// or the underlying stream has terminated.
35///
36/// In subscribe or snapshot-then-subscribe mode, batched items will be flushed whenever the
37/// underlying stream is pending, ensuring clients always receive latest results.
38pub fn new_batcher<I, T, E>(
39    items: I,
40    stats: Arc<BatchIteratorConnectionStats>,
41    mode: StreamMode,
42) -> FormattedStream
43where
44    I: Stream<Item = Result<T, E>> + Send + 'static,
45    T: Into<FormattedContent> + Send,
46    E: Into<AccessorError> + Send,
47{
48    match mode {
49        StreamMode::Subscribe | StreamMode::SnapshotThenSubscribe => {
50            Box::pin(FormattedContentBatcher {
51                items: items.ready_chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
52                stats,
53            })
54        }
55        StreamMode::Snapshot => Box::pin(FormattedContentBatcher {
56            items: items.chunks(MAXIMUM_ENTRIES_PER_BATCH as _),
57            stats,
58        }),
59    }
60}
61
62impl<I, T, E> Stream for FormattedContentBatcher<I>
63where
64    I: Stream<Item = Vec<Result<T, E>>>,
65    T: Into<FormattedContent>,
66    E: Into<AccessorError>,
67{
68    type Item = Vec<Result<FormattedContent, AccessorError>>;
69
70    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        let this = self.project();
72        match this.items.poll_next(cx) {
73            Poll::Ready(Some(chunk)) => {
74                // loop over chunk instead of into_iter/map because we can't move `this`
75                let mut batch = vec![];
76                for item in chunk {
77                    let result = match item {
78                        Ok(i) => Ok(i.into()),
79                        Err(e) => {
80                            this.stats.add_result_error();
81                            Err(e.into())
82                        }
83                    };
84                    batch.push(result);
85                }
86                Poll::Ready(Some(batch))
87            }
88            Poll::Ready(None) => Poll::Ready(None),
89            Poll::Pending => Poll::Pending,
90        }
91    }
92}
93
94#[derive(Clone)]
95struct VmoWriter {
96    inner: Arc<Mutex<InnerVmoWriter>>,
97}
98
99enum InnerVmoWriter {
100    Active { vmo: zx::Vmo, capacity: u64, tail: u64 },
101    Done,
102}
103
104impl VmoWriter {
105    // TODO(https://fxbug.dev/42125551): take the name of the VMO as well.
106    fn new(start_size: u64) -> Self {
107        let vmo = zx::Vmo::create_with_opts(zx::VmoOptions::RESIZABLE, start_size)
108            .expect("can always create resizable vmo's");
109        let capacity = vmo.get_size().expect("can always read vmo size");
110        Self { inner: Arc::new(Mutex::new(InnerVmoWriter::Active { vmo, capacity, tail: 0 })) }
111    }
112
113    fn tail(&self) -> u64 {
114        let guard = self.inner.lock();
115        match &*guard {
116            InnerVmoWriter::Done => 0,
117            InnerVmoWriter::Active { tail, .. } => *tail,
118        }
119    }
120
121    fn capacity(&self) -> u64 {
122        let guard = self.inner.lock();
123        match &*guard {
124            InnerVmoWriter::Done => 0,
125            InnerVmoWriter::Active { capacity, .. } => *capacity,
126        }
127    }
128
129    fn finalize(self) -> Option<(zx::Vmo, u64)> {
130        let mut inner = self.inner.lock();
131        let mut swapped = InnerVmoWriter::Done;
132        std::mem::swap(&mut *inner, &mut swapped);
133        match swapped {
134            InnerVmoWriter::Done => None,
135            InnerVmoWriter::Active { vmo, tail, .. } => Some((vmo, tail)),
136        }
137    }
138
139    fn reset(&mut self, new_tail: u64, new_capacity: u64) {
140        let mut inner = self.inner.lock();
141        match &mut *inner {
142            InnerVmoWriter::Done => {}
143            InnerVmoWriter::Active { vmo, capacity, tail } => {
144                vmo.set_size(new_capacity).expect("can always resize a plain vmo");
145                *capacity = new_capacity;
146                *tail = new_tail;
147            }
148        }
149    }
150}
151
152impl Write for VmoWriter {
153    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
154        match &mut *self.inner.lock() {
155            InnerVmoWriter::Done => Ok(0),
156            InnerVmoWriter::Active { vmo, tail, capacity } => {
157                let new_tail = *tail + buf.len() as u64;
158                if new_tail > *capacity {
159                    vmo.set_size(new_tail).expect("can always resize a plain vmo");
160                    *capacity = new_tail;
161                }
162                vmo.write(buf, *tail)?;
163                *tail = new_tail;
164                Ok(buf.len())
165            }
166        }
167    }
168
169    fn flush(&mut self) -> IoResult<()> {
170        Ok(())
171    }
172}
173
174/// Holds a VMO containing valid serialized data as well as the size of that data.
175pub struct SerializedVmo {
176    pub vmo: zx::Vmo,
177    pub size: u64,
178    format: Format,
179}
180
181fn fxt_to_writer<W: std::io::Write>(mut writer: W, item: &CursorItem) -> Result<(), AccessorError> {
182    let value = extend_fxt_record(
183        item.message.bytes(),
184        &item.identity,
185        item.rolled_out,
186        &ExtendRecordOpts { component_url: true, moniker: true, rolled_out: true },
187    );
188    Ok(writer.write_all(&value)?)
189}
190
191impl SerializedVmo {
192    pub fn serialize(
193        source: &impl Serialize,
194        data_type: DataType,
195        format: Format,
196    ) -> Result<Self, AccessorError> {
197        let writer = VmoWriter::new(match data_type {
198            DataType::Inspect => inspect_format::constants::DEFAULT_VMO_SIZE_BYTES as u64,
199            // Logs won't go through this codepath anyway, but in case we ever want to serialize a
200            // single log instance it makes sense to start at the page size.
201            DataType::Logs => 4096, // page size
202        });
203        let batch_writer = BufWriter::new(writer.clone());
204        match format {
205            Format::Json => {
206                serde_json::to_writer(batch_writer, source).map_err(AccessorError::Serialization)?
207            }
208            Format::Cbor => ciborium::into_writer(source, batch_writer)
209                .map_err(|err| AccessorError::CborSerialization(err.into()))?,
210            Format::Text => unreachable!("We'll never get Text"),
211            Format::Fxt => unreachable!("We'll never get FXT"),
212        }
213        // Safe to unwrap we should always be able to take the vmo here.
214        let (vmo, tail) = writer.finalize().unwrap();
215        Ok(Self { vmo, size: tail, format })
216    }
217}
218
219impl From<SerializedVmo> for FormattedContent {
220    fn from(content: SerializedVmo) -> FormattedContent {
221        match content.format {
222            Format::Json => {
223                // set_content_size() is redundant, but consumers may expect the size there.
224                content
225                    .vmo
226                    .set_content_size(&content.size)
227                    .expect("set_content_size always returns Ok");
228                FormattedContent::Json(fidl_fuchsia_mem::Buffer {
229                    vmo: content.vmo,
230                    size: content.size,
231                })
232            }
233            Format::Cbor => {
234                content
235                    .vmo
236                    .set_content_size(&content.size)
237                    .expect("set_content_size always returns Ok");
238                FormattedContent::Cbor(content.vmo)
239            }
240            Format::Fxt => {
241                content
242                    .vmo
243                    .set_content_size(&content.size)
244                    .expect("set_content_size always returns Ok");
245                FormattedContent::Fxt(content.vmo)
246            }
247            Format::Text => unreachable!("We'll never get Text"),
248        }
249    }
250}
251
252/// Wraps an iterator over serializable items and yields FormattedContents, packing items
253/// into an FXT array in each VMO up to the size limit provided.
254#[pin_project::pin_project]
255pub struct FXTPacketSerializer<I> {
256    #[pin]
257    items: I,
258    stats: Option<Arc<BatchIteratorConnectionStats>>,
259    max_packet_size: u64,
260    overflow: Option<CursorItem>,
261}
262
263impl<I> FXTPacketSerializer<I> {
264    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
265        Self { items, stats: Some(stats), max_packet_size, overflow: None }
266    }
267}
268
269impl<I> Stream for FXTPacketSerializer<I>
270where
271    I: Stream<Item = CursorItem> + Unpin,
272{
273    type Item = Result<SerializedVmo, AccessorError>;
274
275    /// Serialize log messages in an FXT array up to the maximum size provided. Returns Ok(None)
276    /// when there are no more messages to serialize.
277    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
278        let mut this = self.project();
279        let mut writer = VmoWriter::new(*this.max_packet_size);
280
281        if let Some(item) = this.overflow.take() {
282            let batch_writer = BufWriter::new(writer.clone());
283            fxt_to_writer(batch_writer, &item)?;
284            if let Some(stats) = &this.stats {
285                stats.add_result();
286            }
287        }
288
289        let mut items_is_pending = false;
290        loop {
291            let item = match this.items.poll_next_unpin(cx) {
292                Poll::Ready(Some(item)) => item,
293                Poll::Ready(None) => break,
294                Poll::Pending => {
295                    items_is_pending = true;
296                    break;
297                }
298            };
299
300            let writer_tail = writer.tail();
301            let (last_tail, previous_size) = (writer_tail, writer.capacity());
302            let batch_writer = BufWriter::new(writer.clone());
303            fxt_to_writer(batch_writer, &item)?;
304            let writer_tail = writer.tail();
305
306            if writer_tail > *this.max_packet_size {
307                writer.reset(last_tail, previous_size);
308                *this.overflow = Some(item);
309                break;
310            }
311
312            if let Some(stats) = &this.stats {
313                stats.add_result();
314            }
315        }
316
317        let writer_tail = writer.tail();
318
319        if writer_tail > 2 {
320            // safe to unwrap, the vmo is guaranteed to be present.
321            let (vmo, size) = writer.finalize().unwrap();
322            Poll::Ready(Some(Ok(SerializedVmo { vmo, size, format: Format::Fxt })))
323        } else if items_is_pending {
324            Poll::Pending
325        } else {
326            Poll::Ready(None)
327        }
328    }
329}
330
331/// Wraps an iterator over serializable items and yields FormattedContents, packing items
332/// into a JSON array in each VMO up to the size limit provided.
333#[pin_project::pin_project]
334pub struct JsonPacketSerializer<I, S> {
335    #[pin]
336    items: I,
337    stats: Option<Arc<BatchIteratorConnectionStats>>,
338    max_packet_size: u64,
339    overflow: Option<S>,
340}
341
342impl<I, S> JsonPacketSerializer<I, S> {
343    pub fn new(stats: Arc<BatchIteratorConnectionStats>, max_packet_size: u64, items: I) -> Self {
344        Self { items, stats: Some(stats), max_packet_size, overflow: None }
345    }
346
347    pub fn new_without_stats(max_packet_size: u64, items: I) -> Self {
348        Self { items, max_packet_size, overflow: None, stats: None }
349    }
350}
351
352impl<I, S> Stream for JsonPacketSerializer<I, S>
353where
354    I: Stream<Item = S> + Unpin,
355    S: Serialize,
356{
357    type Item = Result<SerializedVmo, AccessorError>;
358
359    /// Serialize log messages in a JSON array up to the maximum size provided. Returns Ok(None)
360    /// when there are no more messages to serialize.
361    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362        let mut this = self.project();
363        let mut writer = VmoWriter::new(*this.max_packet_size);
364        writer.write_all(b"[")?;
365
366        if let Some(item) = this.overflow.take() {
367            let batch_writer = BufWriter::new(writer.clone());
368            serde_json::to_writer(batch_writer, &item)?;
369            if let Some(stats) = &this.stats {
370                stats.add_result();
371            }
372        }
373
374        let mut items_is_pending = false;
375        loop {
376            let item = match this.items.poll_next_unpin(cx) {
377                Poll::Ready(Some(item)) => item,
378                Poll::Ready(None) => break,
379                Poll::Pending => {
380                    items_is_pending = true;
381                    break;
382                }
383            };
384
385            let writer_tail = writer.tail();
386            let is_first = writer_tail == 1;
387            let (last_tail, previous_size) = (writer_tail, writer.capacity());
388            if !is_first {
389                writer.write_all(",\n".as_bytes())?;
390            }
391            let batch_writer = BufWriter::new(writer.clone());
392            serde_json::to_writer(batch_writer, &item)?;
393            let writer_tail = writer.tail();
394            let item_len = writer_tail - last_tail;
395
396            // +1 for the ending bracket
397            if item_len + 1 >= *this.max_packet_size {
398                warn!(
399                    "serializing oversize item into packet (limit={} actual={})",
400                    *this.max_packet_size,
401                    writer_tail - last_tail,
402                );
403            }
404
405            // existing batch + item + array end bracket
406            if writer_tail + 1 > *this.max_packet_size {
407                writer.reset(last_tail, previous_size);
408                *this.overflow = Some(item);
409                break;
410            }
411
412            if let Some(stats) = &this.stats {
413                stats.add_result();
414            }
415        }
416
417        writer.write_all(b"]")?;
418        let writer_tail = writer.tail();
419        if writer_tail > *this.max_packet_size {
420            error!(
421                actual = writer_tail,
422                max = *this.max_packet_size;
423                "returned a string longer than maximum specified",
424            )
425        }
426
427        // we only want to return an item if we wrote more than opening & closing brackets,
428        // and as a string the batch's length is measured in bytes
429        if writer_tail > 2 {
430            // safe to unwrap, the vmo is guaranteed to be present.
431            let (vmo, size) = writer.finalize().unwrap();
432            Poll::Ready(Some(Ok(SerializedVmo { vmo, size, format: Format::Json })))
433        } else if items_is_pending {
434            Poll::Pending
435        } else {
436            Poll::Ready(None)
437        }
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::diagnostics::AccessorStats;
445    use futures::stream::iter;
446
447    #[fuchsia::test]
448    async fn two_items_joined_and_split() {
449        let inputs = &[&"FFFFFFFFFF", &"GGGGGGGGGG"];
450        let joined = &["[\"FFFFFFFFFF\",\n\"GGGGGGGGGG\"]"];
451        let split = &[r#"["FFFFFFFFFF"]"#, r#"["GGGGGGGGGG"]"#];
452        let smallest_possible_joined_len = joined[0].len() as u64;
453
454        let make_packets = |max| async move {
455            let node = fuchsia_inspect::Node::default();
456            let accessor_stats = Arc::new(AccessorStats::new(node));
457            let test_stats = Arc::new(accessor_stats.new_logs_batch_iterator());
458            JsonPacketSerializer::new(test_stats, max, iter(inputs.iter()))
459                .collect::<Vec<_>>()
460                .await
461                .into_iter()
462                .map(|r| {
463                    let result = r.unwrap();
464                    let mut buf = vec![0; result.size as usize];
465                    result.vmo.read(&mut buf, 0).expect("reading vmo");
466                    std::str::from_utf8(&buf).unwrap().to_string()
467                })
468                .collect::<Vec<_>>()
469        };
470
471        let actual_joined = make_packets(smallest_possible_joined_len).await;
472        assert_eq!(&actual_joined[..], joined);
473
474        let actual_split = make_packets(smallest_possible_joined_len - 1).await;
475        assert_eq!(&actual_split[..], split);
476    }
477}