heapdump_snapshot/
streamer.rs1use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15const FIDL_HEADER_BYTES: usize = 16;
18
19const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23pub struct Streamer {
25 dest: fheapdump_client::SnapshotReceiverProxy,
26 buffer: Vec<fheapdump_client::SnapshotElement>,
27 buffer_size: usize,
28}
29
30impl Streamer {
31 pub fn new(dest: fheapdump_client::SnapshotReceiverProxy) -> Streamer {
32 Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
33 }
34
35 pub async fn push_element(
40 mut self,
41 elem: fheapdump_client::SnapshotElement,
42 ) -> Result<Streamer, Error> {
43 let elem_size = elem.measure().num_bytes;
44
45 if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
47 self.flush_buffer().await?;
48 }
49
50 self.buffer.push(elem);
52 self.buffer_size += elem_size;
53
54 Ok(self)
55 }
56
57 pub async fn end_of_stream(mut self) -> Result<(), Error> {
59 if !self.buffer.is_empty() {
61 self.flush_buffer().await?;
62 }
63
64 self.flush_buffer().await?;
66
67 Ok(())
68 }
69
70 async fn flush_buffer(&mut self) -> Result<(), Error> {
71 let buffer = std::mem::replace(&mut self.buffer, Vec::new());
73 self.buffer_size = EMPTY_BUFFER_SIZE;
74
75 let fut = self.dest.batch(&buffer);
77 Ok(fut.await?)
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use fidl::endpoints::create_proxy_and_stream;
85 use fuchsia_async as fasync;
86 use maplit::hashmap;
87 use std::collections::HashMap;
88 use std::rc::Rc;
89 use test_case::test_case;
90
91 use crate::ThreadInfo;
92 use crate::snapshot::Snapshot;
93
94 fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
97 let mut result = HashMap::new();
98 let mut addr = 0;
99 for size in 0..1000000 {
100 result.insert(addr, size);
101 addr += size;
102 }
103 result
104 }
105
106 const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
107 const FAKE_THREAD_KOID: u64 = 8989;
108 const FAKE_THREAD_NAME: &str = "fake-thread-name";
109 const FAKE_THREAD_KEY: u64 = 1212;
110 const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
111 const FAKE_STACK_TRACE_KEY: u64 = 1234;
112 const FAKE_REGION_ADDRESS: u64 = 8192;
113 const FAKE_REGION_NAME: &str = "fake-region-name";
114 const FAKE_REGION_SIZE: u64 = 28672;
115 const FAKE_REGION_FILE_OFFSET: u64 = 4096;
116 const FAKE_REGION_VADDR: u64 = 12288;
117 const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
118
119 #[test_case(hashmap! {} ; "empty")]
120 #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
121 #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
122 #[fasync::run_singlethreaded(test)]
123 async fn test_streamer(allocations: HashMap<u64, u64>) {
124 let (receiver_proxy, receiver_stream) =
125 create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
126 let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
127
128 let mut streamer = Streamer::new(receiver_proxy)
131 .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
132 fheapdump_client::ThreadInfo {
133 thread_info_key: Some(FAKE_THREAD_KEY),
134 koid: Some(FAKE_THREAD_KOID),
135 name: Some(FAKE_THREAD_NAME.to_string()),
136 ..Default::default()
137 },
138 ))
139 .await
140 .unwrap()
141 .push_element(fheapdump_client::SnapshotElement::StackTrace(
142 fheapdump_client::StackTrace {
143 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
144 program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
145 ..Default::default()
146 },
147 ))
148 .await
149 .unwrap()
150 .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
151 fheapdump_client::ExecutableRegion {
152 address: Some(FAKE_REGION_ADDRESS),
153 name: Some(FAKE_REGION_NAME.to_string()),
154 size: Some(FAKE_REGION_SIZE),
155 file_offset: Some(FAKE_REGION_FILE_OFFSET),
156 vaddr: Some(FAKE_REGION_VADDR),
157 build_id: Some(fheapdump_client::BuildId {
158 value: FAKE_REGION_BUILD_ID.to_vec(),
159 }),
160 ..Default::default()
161 },
162 ))
163 .await
164 .unwrap();
165 for (address, size) in &allocations {
166 streamer = streamer
167 .push_element(fheapdump_client::SnapshotElement::Allocation(
168 fheapdump_client::Allocation {
169 address: Some(*address),
170 size: Some(*size),
171 thread_info_key: Some(FAKE_THREAD_KEY),
172 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
173 timestamp: Some(FAKE_TIMESTAMP),
174 ..Default::default()
175 },
176 ))
177 .await
178 .unwrap();
179 }
180 streamer.end_of_stream().await.unwrap();
181
182 let mut received_snapshot = receive_worker.await.unwrap();
185 let mut received_allocations: HashMap<u64, &crate::snapshot::Allocation> =
186 received_snapshot
187 .allocations
188 .iter()
189 .map(|alloc| (alloc.address.unwrap(), alloc))
190 .collect();
191 for (address, size) in &allocations {
192 let allocation = received_allocations.remove(address).unwrap();
193
194 assert_eq!(allocation.size, *size);
195 assert_eq!(
196 allocation.thread_info,
197 Some(Rc::new(ThreadInfo {
198 koid: FAKE_THREAD_KOID,
199 name: FAKE_THREAD_NAME.to_owned()
200 }))
201 );
202 assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
203 assert_eq!(allocation.timestamp, Some(FAKE_TIMESTAMP));
204 }
205 assert!(received_allocations.is_empty(), "all the entries have been removed");
206 let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
207 assert_eq!(region.name, FAKE_REGION_NAME);
208 assert_eq!(region.size, FAKE_REGION_SIZE);
209 assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
210 assert_eq!(region.vaddr.unwrap(), FAKE_REGION_VADDR);
211 assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
212 assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
213 }
214}