heapdump_snapshot/
streamer.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11// Number of bytes the header of a vector occupies in a fidl message.
12// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
13const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15// Number of bytes the header of a fidl message occupies.
16// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
17const FIDL_HEADER_BYTES: usize = 16;
18
19// Size of the fixed part of a `SnapshotReceiver/Batch` FIDL message. The actual size is given by
20// this number plus the size of each element in the batch.
21const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23/// Implements pagination on top of a SnapshotReceiver channel.
24pub struct Streamer {
25    dest: fheapdump_client::SnapshotReceiverProxy,
26    buffer: Vec<fheapdump_client::SnapshotElement>,
27    buffer_size: usize,
28}
29
30impl Streamer {
31    pub fn new(dest: fheapdump_client::SnapshotReceiverProxy) -> Streamer {
32        Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
33    }
34
35    /// Sends the given `elem`.
36    ///
37    /// This method internally flushes the outgoing buffer, if necessary, so that it never exceeds
38    /// the maximum allowed size.
39    pub async fn push_element(
40        mut self,
41        elem: fheapdump_client::SnapshotElement,
42    ) -> Result<Streamer, Error> {
43        let elem_size = elem.measure().num_bytes;
44
45        // Flush the current buffer if the new element would not fit in it.
46        if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
47            self.flush_buffer().await?;
48        }
49
50        // Append the new element.
51        self.buffer.push(elem);
52        self.buffer_size += elem_size;
53
54        Ok(self)
55    }
56
57    /// Sends the end-of-stream marker.
58    pub async fn end_of_stream(mut self) -> Result<(), Error> {
59        // Send the last elements in the queue.
60        if !self.buffer.is_empty() {
61            self.flush_buffer().await?;
62        }
63
64        // Send an empty batch to signal the end of the stream.
65        self.flush_buffer().await?;
66
67        Ok(())
68    }
69
70    async fn flush_buffer(&mut self) -> Result<(), Error> {
71        // Read and reset the buffer.
72        let buffer = std::mem::replace(&mut self.buffer, Vec::new());
73        self.buffer_size = EMPTY_BUFFER_SIZE;
74
75        // Send it.
76        let fut = self.dest.batch(&buffer);
77        Ok(fut.await?)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use fidl::endpoints::create_proxy_and_stream;
85    use fuchsia_async as fasync;
86    use maplit::hashmap;
87    use std::collections::HashMap;
88    use std::rc::Rc;
89    use test_case::test_case;
90
91    use crate::ThreadInfo;
92    use crate::snapshot::Snapshot;
93
94    // Generate an allocation hash map with a huge number of entries, to test that pagination splits
95    // them properly.
96    fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
97        let mut result = HashMap::new();
98        let mut addr = 0;
99        for size in 0..1000000 {
100            result.insert(addr, size);
101            addr += size;
102        }
103        result
104    }
105
106    const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
107    const FAKE_THREAD_KOID: u64 = 8989;
108    const FAKE_THREAD_NAME: &str = "fake-thread-name";
109    const FAKE_THREAD_KEY: u64 = 1212;
110    const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
111    const FAKE_STACK_TRACE_KEY: u64 = 1234;
112    const FAKE_REGION_ADDRESS: u64 = 8192;
113    const FAKE_REGION_NAME: &str = "fake-region-name";
114    const FAKE_REGION_SIZE: u64 = 28672;
115    const FAKE_REGION_FILE_OFFSET: u64 = 4096;
116    const FAKE_REGION_VADDR: u64 = 12288;
117    const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
118
119    #[test_case(hashmap! {} ; "empty")]
120    #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
121    #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
122    #[fasync::run_singlethreaded(test)]
123    async fn test_streamer(allocations: HashMap<u64, u64>) {
124        let (receiver_proxy, receiver_stream) =
125            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
126        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
127
128        // Transmit a snapshot with the given `allocations`, all referencing the same thread info
129        // and stack trace, with a single executable region.
130        let mut streamer = Streamer::new(receiver_proxy)
131            .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
132                fheapdump_client::ThreadInfo {
133                    thread_info_key: Some(FAKE_THREAD_KEY),
134                    koid: Some(FAKE_THREAD_KOID),
135                    name: Some(FAKE_THREAD_NAME.to_string()),
136                    ..Default::default()
137                },
138            ))
139            .await
140            .unwrap()
141            .push_element(fheapdump_client::SnapshotElement::StackTrace(
142                fheapdump_client::StackTrace {
143                    stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
144                    program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
145                    ..Default::default()
146                },
147            ))
148            .await
149            .unwrap()
150            .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
151                fheapdump_client::ExecutableRegion {
152                    address: Some(FAKE_REGION_ADDRESS),
153                    name: Some(FAKE_REGION_NAME.to_string()),
154                    size: Some(FAKE_REGION_SIZE),
155                    file_offset: Some(FAKE_REGION_FILE_OFFSET),
156                    vaddr: Some(FAKE_REGION_VADDR),
157                    build_id: Some(fheapdump_client::BuildId {
158                        value: FAKE_REGION_BUILD_ID.to_vec(),
159                    }),
160                    ..Default::default()
161                },
162            ))
163            .await
164            .unwrap();
165        for (address, size) in &allocations {
166            streamer = streamer
167                .push_element(fheapdump_client::SnapshotElement::Allocation(
168                    fheapdump_client::Allocation {
169                        address: Some(*address),
170                        size: Some(*size),
171                        thread_info_key: Some(FAKE_THREAD_KEY),
172                        stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
173                        timestamp: Some(FAKE_TIMESTAMP),
174                        ..Default::default()
175                    },
176                ))
177                .await
178                .unwrap();
179        }
180        streamer.end_of_stream().await.unwrap();
181
182        // Receive the snapshot we just transmitted and verify that the allocations and the
183        // executable region we received match those that were sent.
184        let mut received_snapshot = receive_worker.await.unwrap();
185        let mut received_allocations: HashMap<u64, &crate::snapshot::Allocation> =
186            received_snapshot
187                .allocations
188                .iter()
189                .map(|alloc| (alloc.address.unwrap(), alloc))
190                .collect();
191        for (address, size) in &allocations {
192            let allocation = received_allocations.remove(address).unwrap();
193
194            assert_eq!(allocation.size, *size);
195            assert_eq!(
196                allocation.thread_info,
197                Some(Rc::new(ThreadInfo {
198                    koid: FAKE_THREAD_KOID,
199                    name: FAKE_THREAD_NAME.to_owned()
200                }))
201            );
202            assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
203            assert_eq!(allocation.timestamp, Some(FAKE_TIMESTAMP));
204        }
205        assert!(received_allocations.is_empty(), "all the entries have been removed");
206        let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
207        assert_eq!(region.name, FAKE_REGION_NAME);
208        assert_eq!(region.size, FAKE_REGION_SIZE);
209        assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
210        assert_eq!(region.vaddr.unwrap(), FAKE_REGION_VADDR);
211        assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
212        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
213    }
214}