heapdump_snapshot/
snapshot.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use futures::StreamExt;
7use std::collections::HashMap;
8use std::rc::Rc;
9
10use crate::Error;
11
12/// Contains all the data received over a `SnapshotReceiver` channel.
13#[derive(Debug)]
14pub struct Snapshot {
15    /// All the live allocations in the analyzed process, indexed by memory address.
16    pub allocations: HashMap<u64, Allocation>,
17
18    /// All the executable memory regions in the analyzed process, indexed by start address.
19    pub executable_regions: HashMap<u64, ExecutableRegion>,
20}
21
22/// Information about an allocated memory block and, optionally, its contents.
23#[derive(Debug)]
24pub struct Allocation {
25    /// Block size, in bytes.
26    pub size: u64,
27
28    /// The allocating thread.
29    pub thread_info: Rc<ThreadInfo>,
30
31    /// The stack trace of the allocation site.
32    pub stack_trace: Rc<StackTrace>,
33
34    /// Allocation timestamp, in nanoseconds.
35    pub timestamp: fidl::MonotonicInstant,
36
37    /// Memory dump of this block's contents.
38    pub contents: Option<Vec<u8>>,
39}
40
41/// A stack trace.
42#[derive(Debug)]
43pub struct StackTrace {
44    /// Code addresses at each call frame. The first entry corresponds to the leaf call.
45    pub program_addresses: Vec<u64>,
46}
47
48/// A memory region containing code loaded from an ELF file.
49#[derive(Debug)]
50pub struct ExecutableRegion {
51    /// Region name for human consumption (usually either the ELF soname or the VMO name), if known.
52    pub name: String,
53
54    /// Region size, in bytes.
55    pub size: u64,
56
57    /// The corresponding offset in the ELF file.
58    pub file_offset: u64,
59
60    /// The corresponding relative address in the ELF file.
61    ///
62    /// Note: this field is not populated if the snapshot was generated by a build from before the
63    /// corresponding FIDL field was introduced.
64    pub vaddr: Option<u64>,
65
66    /// The Build ID of the ELF file.
67    pub build_id: Vec<u8>,
68}
69
70/// Information identifying a specific thread.
71#[derive(Debug)]
72pub struct ThreadInfo {
73    /// The thread's koid.
74    pub koid: zx_types::zx_koid_t,
75
76    /// The thread's name.
77    pub name: String,
78}
79
80/// Gets the value of a field in a FIDL table as a `Result<T, Error>`.
81///
82/// An `Err(Error::MissingField { .. })` is returned if the field's value is `None`.
83///
84/// Usage: `read_field!(container_expression => ContainerType, field_name)`
85///
86/// # Example
87///
88/// ```
89/// struct MyFidlTable { field: Option<u32>, .. }
90/// let table = MyFidlTable { field: Some(44), .. };
91///
92/// let val = read_field!(table => MyFidlTable, field)?;
93/// ```
94macro_rules! read_field {
95    ($e:expr => $c:ident, $f:ident) => {
96        $e.$f.ok_or(Error::MissingField {
97            container: std::stringify!($c),
98            field: std::stringify!($f),
99        })
100    };
101}
102
103impl Snapshot {
104    /// Receives data over a `SnapshotReceiver` channel and reassembles it.
105    pub async fn receive_from(
106        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
107    ) -> Result<Snapshot, Error> {
108        let mut allocations: HashMap<u64, (u64, u64, u64, fidl::MonotonicInstant)> = HashMap::new();
109        let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
110        let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
111        let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
112        let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
113
114        loop {
115            // Wait for the next batch of elements.
116            let batch = match stream.next().await.transpose()? {
117                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
118                    // Send acknowledgment as quickly as possible, then keep processing the received batch.
119                    responder.send()?;
120                    batch
121                }
122                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
123                    error,
124                    responder,
125                }) => {
126                    let _ = responder.send(); // Ignore the result of the acknowledgment.
127                    return Err(Error::CollectorError(error));
128                }
129                None => return Err(Error::UnexpectedEndOfStream),
130            };
131
132            // Process data. An empty batch signals the end of the stream.
133            if !batch.is_empty() {
134                for element in batch {
135                    match element {
136                        fheapdump_client::SnapshotElement::Allocation(allocation) => {
137                            let address = read_field!(allocation => Allocation, address)?;
138                            let size = read_field!(allocation => Allocation, size)?;
139                            let timestamp = read_field!(allocation => Allocation, timestamp)?;
140                            let thread_info_key =
141                                read_field!(allocation => Allocation, thread_info_key)?;
142                            let stack_trace_key =
143                                read_field!(allocation => Allocation, stack_trace_key)?;
144                            if allocations
145                                .insert(
146                                    address,
147                                    (size, thread_info_key, stack_trace_key, timestamp),
148                                )
149                                .is_some()
150                            {
151                                return Err(Error::ConflictingElement {
152                                    element_type: "Allocation",
153                                });
154                            }
155                        }
156                        fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
157                            let stack_trace_key =
158                                read_field!(stack_trace => StackTrace, stack_trace_key)?;
159                            let mut program_addresses =
160                                read_field!(stack_trace => StackTrace, program_addresses)?;
161                            stack_traces
162                                .entry(stack_trace_key)
163                                .or_default()
164                                .append(&mut program_addresses);
165                        }
166                        fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
167                            let thread_info_key =
168                                read_field!(thread_info => ThreadInfo, thread_info_key)?;
169                            let koid = read_field!(thread_info => ThreadInfo, koid)?;
170                            let name = read_field!(thread_info => ThreadInfo, name)?;
171                            if thread_infos
172                                .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
173                                .is_some()
174                            {
175                                return Err(Error::ConflictingElement {
176                                    element_type: "ThreadInfo",
177                                });
178                            }
179                        }
180                        fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
181                            let address = read_field!(region => ExecutableRegion, address)?;
182                            #[cfg(fuchsia_api_level_at_least = "NEXT")]
183                            let name = region.name.unwrap_or_else(|| String::new());
184                            #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
185                            let name = String::new();
186                            let size = read_field!(region => ExecutableRegion, size)?;
187                            let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
188                            #[cfg(fuchsia_api_level_at_least = "NEXT")]
189                            let vaddr = region.vaddr;
190                            #[cfg(not(fuchsia_api_level_at_least = "NEXT"))]
191                            let vaddr = None;
192                            let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
193                            let region =
194                                ExecutableRegion { name, size, file_offset, vaddr, build_id };
195                            if executable_regions.insert(address, region).is_some() {
196                                return Err(Error::ConflictingElement {
197                                    element_type: "ExecutableRegion",
198                                });
199                            }
200                        }
201                        fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
202                            let address = read_field!(block_contents => BlockContents, address)?;
203                            let mut chunk = read_field!(block_contents => BlockContents, contents)?;
204                            contents.entry(address).or_default().append(&mut chunk);
205                        }
206                        _ => return Err(Error::UnexpectedElementType),
207                    }
208                }
209            } else {
210                // We are at the end of the stream. Convert to the final types and resolve
211                // cross-references.
212                let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
213                    .into_iter()
214                    .map(|(key, program_addresses)| {
215                        (key, Rc::new(StackTrace { program_addresses }))
216                    })
217                    .collect();
218                let mut final_allocations = HashMap::new();
219                for (address, (size, thread_info_key, stack_trace_key, timestamp)) in allocations {
220                    let thread_info = thread_infos
221                        .get(&thread_info_key)
222                        .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
223                        .clone();
224                    let stack_trace = final_stack_traces
225                        .get(&stack_trace_key)
226                        .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
227                        .clone();
228                    let contents = match contents.remove(&address) {
229                        Some(data) if data.len() as u64 != size => {
230                            return Err(Error::ConflictingElement { element_type: "BlockContents" })
231                        }
232                        other => other,
233                    };
234                    final_allocations.insert(
235                        address,
236                        Allocation { size, thread_info, stack_trace, timestamp, contents },
237                    );
238                }
239
240                return Ok(Snapshot { allocations: final_allocations, executable_regions });
241            }
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use assert_matches::assert_matches;
250    use fidl::endpoints::create_proxy_and_stream;
251    use fuchsia_async as fasync;
252    use test_case::test_case;
253
254    // Constants used by some of the tests below:
255    const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
256    const FAKE_ALLOCATION_1_SIZE: u64 = 8;
257    const FAKE_ALLOCATION_1_TIMESTAMP: fidl::MonotonicInstant =
258        fidl::MonotonicInstant::from_nanos(888888888);
259    const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
260    const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
261    const FAKE_ALLOCATION_2_SIZE: u64 = 4;
262    const FAKE_ALLOCATION_2_TIMESTAMP: fidl::MonotonicInstant =
263        fidl::MonotonicInstant::from_nanos(-777777777); // test negative value too
264    const FAKE_THREAD_1_KOID: u64 = 1212;
265    const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
266    const FAKE_THREAD_1_KEY: u64 = 4567;
267    const FAKE_THREAD_2_KOID: u64 = 1213;
268    const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
269    const FAKE_THREAD_2_KEY: u64 = 7654;
270    const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
271    const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
272    const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
273    const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
274    const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
275    const FAKE_REGION_1_NAME: &str = "region-1";
276    const FAKE_REGION_1_SIZE: u64 = 0x80000;
277    const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
278    const FAKE_REGION_1_VADDR: u64 = 0x3000;
279    const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
280    const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
281    const FAKE_REGION_2_SIZE: u64 = 0x200000;
282    const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
283    const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
284
285    #[fasync::run_singlethreaded(test)]
286    async fn test_empty() {
287        let (receiver_proxy, receiver_stream) =
288            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
289        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
290
291        // Send the end of stream marker.
292        let fut = receiver_proxy.batch(&[]);
293        fut.await.unwrap();
294
295        // Receive the snapshot we just transmitted and verify that it is empty.
296        let received_snapshot = receive_worker.await.unwrap();
297        assert!(received_snapshot.allocations.is_empty());
298        assert!(received_snapshot.executable_regions.is_empty());
299    }
300
301    #[fasync::run_singlethreaded(test)]
302    async fn test_one_batch() {
303        let (receiver_proxy, receiver_stream) =
304            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
305        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
306
307        // Send a batch containing two allocations - whose threads, stack traces and contents can be
308        // listed before or after the allocation(s) that reference them - and two executable
309        // regions.
310        let fut = receiver_proxy.batch(&[
311            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
312                address: Some(FAKE_ALLOCATION_1_ADDRESS),
313                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
314                ..Default::default()
315            }),
316            fheapdump_client::SnapshotElement::ExecutableRegion(
317                fheapdump_client::ExecutableRegion {
318                    address: Some(FAKE_REGION_1_ADDRESS),
319                    name: Some(FAKE_REGION_1_NAME.to_string()),
320                    size: Some(FAKE_REGION_1_SIZE),
321                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
322                    vaddr: Some(FAKE_REGION_1_VADDR),
323                    build_id: Some(fheapdump_client::BuildId {
324                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
325                    }),
326                    ..Default::default()
327                },
328            ),
329            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
330                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
331                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
332                ..Default::default()
333            }),
334            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
335                address: Some(FAKE_ALLOCATION_1_ADDRESS),
336                size: Some(FAKE_ALLOCATION_1_SIZE),
337                thread_info_key: Some(FAKE_THREAD_1_KEY),
338                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
339                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
340                ..Default::default()
341            }),
342            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
343                thread_info_key: Some(FAKE_THREAD_1_KEY),
344                koid: Some(FAKE_THREAD_1_KOID),
345                name: Some(FAKE_THREAD_1_NAME.to_string()),
346                ..Default::default()
347            }),
348            fheapdump_client::SnapshotElement::ExecutableRegion(
349                fheapdump_client::ExecutableRegion {
350                    address: Some(FAKE_REGION_2_ADDRESS),
351                    size: Some(FAKE_REGION_2_SIZE),
352                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
353                    build_id: Some(fheapdump_client::BuildId {
354                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
355                    }),
356                    ..Default::default()
357                },
358            ),
359            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
360                thread_info_key: Some(FAKE_THREAD_2_KEY),
361                koid: Some(FAKE_THREAD_2_KOID),
362                name: Some(FAKE_THREAD_2_NAME.to_string()),
363                ..Default::default()
364            }),
365            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
366                address: Some(FAKE_ALLOCATION_2_ADDRESS),
367                size: Some(FAKE_ALLOCATION_2_SIZE),
368                thread_info_key: Some(FAKE_THREAD_2_KEY),
369                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
370                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
371                ..Default::default()
372            }),
373            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
374                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
375                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
376                ..Default::default()
377            }),
378        ]);
379        fut.await.unwrap();
380
381        // Send the end of stream marker.
382        let fut = receiver_proxy.batch(&[]);
383        fut.await.unwrap();
384
385        // Receive the snapshot we just transmitted and verify its contents.
386        let mut received_snapshot = receive_worker.await.unwrap();
387        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
388        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
389        assert_eq!(allocation1.thread_info.koid, FAKE_THREAD_1_KOID);
390        assert_eq!(allocation1.thread_info.name, FAKE_THREAD_1_NAME);
391        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
392        assert_eq!(allocation1.timestamp, FAKE_ALLOCATION_1_TIMESTAMP);
393        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
394        let allocation2 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_2_ADDRESS).unwrap();
395        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
396        assert_eq!(allocation2.thread_info.koid, FAKE_THREAD_2_KOID);
397        assert_eq!(allocation2.thread_info.name, FAKE_THREAD_2_NAME);
398        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
399        assert_eq!(allocation2.timestamp, FAKE_ALLOCATION_2_TIMESTAMP);
400        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
401        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
402        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
403        assert_eq!(region1.name, FAKE_REGION_1_NAME);
404        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
405        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
406        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
407        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
408        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
409        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
410        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
411        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
412        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
413    }
414
415    #[fasync::run_singlethreaded(test)]
416    async fn test_two_batches() {
417        let (receiver_proxy, receiver_stream) =
418            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
419        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
420
421        // Send a first batch.
422        let fut = receiver_proxy.batch(&[
423            fheapdump_client::SnapshotElement::ExecutableRegion(
424                fheapdump_client::ExecutableRegion {
425                    address: Some(FAKE_REGION_2_ADDRESS),
426                    size: Some(FAKE_REGION_2_SIZE),
427                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
428                    build_id: Some(fheapdump_client::BuildId {
429                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
430                    }),
431                    ..Default::default()
432                },
433            ),
434            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
435                address: Some(FAKE_ALLOCATION_1_ADDRESS),
436                size: Some(FAKE_ALLOCATION_1_SIZE),
437                thread_info_key: Some(FAKE_THREAD_1_KEY),
438                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
439                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
440                ..Default::default()
441            }),
442            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
443                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
444                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
445                ..Default::default()
446            }),
447            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
448                thread_info_key: Some(FAKE_THREAD_2_KEY),
449                koid: Some(FAKE_THREAD_2_KOID),
450                name: Some(FAKE_THREAD_2_NAME.to_string()),
451                ..Default::default()
452            }),
453        ]);
454        fut.await.unwrap();
455
456        // Send another batch.
457        let fut = receiver_proxy.batch(&[
458            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
459                thread_info_key: Some(FAKE_THREAD_1_KEY),
460                koid: Some(FAKE_THREAD_1_KOID),
461                name: Some(FAKE_THREAD_1_NAME.to_string()),
462                ..Default::default()
463            }),
464            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
465                address: Some(FAKE_ALLOCATION_2_ADDRESS),
466                size: Some(FAKE_ALLOCATION_2_SIZE),
467                thread_info_key: Some(FAKE_THREAD_2_KEY),
468                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
469                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
470                ..Default::default()
471            }),
472            fheapdump_client::SnapshotElement::ExecutableRegion(
473                fheapdump_client::ExecutableRegion {
474                    address: Some(FAKE_REGION_1_ADDRESS),
475                    name: Some(FAKE_REGION_1_NAME.to_string()),
476                    size: Some(FAKE_REGION_1_SIZE),
477                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
478                    vaddr: Some(FAKE_REGION_1_VADDR),
479                    build_id: Some(fheapdump_client::BuildId {
480                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
481                    }),
482                    ..Default::default()
483                },
484            ),
485            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
486                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
487                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
488                ..Default::default()
489            }),
490            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
491                address: Some(FAKE_ALLOCATION_1_ADDRESS),
492                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
493                ..Default::default()
494            }),
495        ]);
496        fut.await.unwrap();
497
498        // Send the end of stream marker.
499        let fut = receiver_proxy.batch(&[]);
500        fut.await.unwrap();
501
502        // Receive the snapshot we just transmitted and verify its contents.
503        let mut received_snapshot = receive_worker.await.unwrap();
504        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
505        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
506        assert_eq!(allocation1.thread_info.koid, FAKE_THREAD_1_KOID);
507        assert_eq!(allocation1.thread_info.name, FAKE_THREAD_1_NAME);
508        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
509        assert_eq!(allocation1.timestamp, FAKE_ALLOCATION_1_TIMESTAMP);
510        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
511        let allocation2 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_2_ADDRESS).unwrap();
512        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
513        assert_eq!(allocation2.thread_info.koid, FAKE_THREAD_2_KOID);
514        assert_eq!(allocation2.thread_info.name, FAKE_THREAD_2_NAME);
515        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
516        assert_eq!(allocation2.timestamp, FAKE_ALLOCATION_2_TIMESTAMP);
517        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
518        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
519        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
520        assert_eq!(region1.name, FAKE_REGION_1_NAME);
521        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
522        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
523        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
524        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
525        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
526        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
527        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
528        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
529        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
530    }
531
532    #[test_case(|allocation| allocation.address = None => matches
533        Err(Error::MissingField { container: "Allocation", field: "address" }) ; "address")]
534    #[test_case(|allocation| allocation.size = None => matches
535        Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
536    #[test_case(|allocation| allocation.thread_info_key = None => matches
537        Err(Error::MissingField { container: "Allocation", field: "thread_info_key" }) ; "thread_info_key")]
538    #[test_case(|allocation| allocation.stack_trace_key = None => matches
539        Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
540    #[test_case(|allocation| allocation.timestamp = None => matches
541        Err(Error::MissingField { container: "Allocation", field: "timestamp" }) ; "timestamp")]
542    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
543        Ok(_) ; "success")]
544    #[fasync::run_singlethreaded(test)]
545    async fn test_allocation_required_fields(
546        set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
547    ) -> Result<Snapshot, Error> {
548        let (receiver_proxy, receiver_stream) =
549            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
550        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
551
552        // Start with an Allocation with all the required fields set.
553        let mut allocation = fheapdump_client::Allocation {
554            address: Some(FAKE_ALLOCATION_1_ADDRESS),
555            size: Some(FAKE_ALLOCATION_1_SIZE),
556            thread_info_key: Some(FAKE_THREAD_1_KEY),
557            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
558            timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
559            ..Default::default()
560        };
561
562        // Set one of the fields to None, according to the case being tested.
563        set_one_field_to_none(&mut allocation);
564
565        // Send it to the SnapshotReceiver along with the thread info and stack trace it references.
566        let fut = receiver_proxy.batch(&[
567            fheapdump_client::SnapshotElement::Allocation(allocation),
568            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
569                thread_info_key: Some(FAKE_THREAD_1_KEY),
570                koid: Some(FAKE_THREAD_1_KOID),
571                name: Some(FAKE_THREAD_1_NAME.to_string()),
572                ..Default::default()
573            }),
574            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
575                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
576                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
577                ..Default::default()
578            }),
579        ]);
580        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
581
582        // Send the end of stream marker.
583        let fut = receiver_proxy.batch(&[]);
584        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
585
586        // Return the result.
587        receive_worker.await
588    }
589
590    #[test_case(|thread_info| thread_info.thread_info_key = None => matches
591        Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
592    #[test_case(|thread_info| thread_info.koid = None => matches
593        Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
594    #[test_case(|thread_info| thread_info.name = None => matches
595        Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
596    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
597        Ok(_) ; "success")]
598    #[fasync::run_singlethreaded(test)]
599    async fn test_thread_info_required_fields(
600        set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
601    ) -> Result<Snapshot, Error> {
602        let (receiver_proxy, receiver_stream) =
603            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
604        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
605
606        // Start with a ThreadInfo with all the required fields set.
607        let mut thread_info = fheapdump_client::ThreadInfo {
608            thread_info_key: Some(FAKE_THREAD_1_KEY),
609            koid: Some(FAKE_THREAD_1_KOID),
610            name: Some(FAKE_THREAD_1_NAME.to_string()),
611            ..Default::default()
612        };
613
614        // Set one of the fields to None, according to the case being tested.
615        set_one_field_to_none(&mut thread_info);
616
617        // Send it to the SnapshotReceiver.
618        let fut =
619            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
620        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
621
622        // Send the end of stream marker.
623        let fut = receiver_proxy.batch(&[]);
624        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
625
626        // Return the result.
627        receive_worker.await
628    }
629
630    #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
631        Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
632    #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
633        Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
634    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
635        Ok(_) ; "success")]
636    #[fasync::run_singlethreaded(test)]
637    async fn test_stack_trace_required_fields(
638        set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
639    ) -> Result<Snapshot, Error> {
640        let (receiver_proxy, receiver_stream) =
641            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
642        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
643
644        // Start with a StackTrace with all the required fields set.
645        let mut stack_trace = fheapdump_client::StackTrace {
646            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
647            program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
648            ..Default::default()
649        };
650
651        // Set one of the fields to None, according to the case being tested.
652        set_one_field_to_none(&mut stack_trace);
653
654        // Send it to the SnapshotReceiver.
655        let fut =
656            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
657        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
658
659        // Send the end of stream marker.
660        let fut = receiver_proxy.batch(&[]);
661        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
662
663        // Return the result.
664        receive_worker.await
665    }
666
667    #[test_case(|region| region.address = None => matches
668        Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
669    #[test_case(|region| region.size = None => matches
670        Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
671    #[test_case(|region| region.file_offset = None => matches
672        Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
673    #[test_case(|region| region.build_id = None => matches
674        Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
675    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
676        Ok(_) ; "success")]
677    #[fasync::run_singlethreaded(test)]
678    async fn test_executable_region_required_fields(
679        set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
680    ) -> Result<Snapshot, Error> {
681        let (receiver_proxy, receiver_stream) =
682            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
683        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
684
685        // Start with an ExecutableRegion with all the required fields set.
686        let mut region = fheapdump_client::ExecutableRegion {
687            address: Some(FAKE_REGION_1_ADDRESS),
688            size: Some(FAKE_REGION_1_SIZE),
689            file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
690            build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
691            ..Default::default()
692        };
693
694        // Set one of the fields to None, according to the case being tested.
695        set_one_field_to_none(&mut region);
696
697        // Send it to the SnapshotReceiver.
698        let fut =
699            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
700        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
701
702        // Send the end of stream marker.
703        let fut = receiver_proxy.batch(&[]);
704        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
705
706        // Return the result.
707        receive_worker.await
708    }
709
710    #[test_case(|block_contents| block_contents.address = None => matches
711        Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
712    #[test_case(|block_contents| block_contents.contents = None => matches
713        Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
714    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
715        Ok(_) ; "success")]
716    #[fasync::run_singlethreaded(test)]
717    async fn test_block_contents_required_fields(
718        set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
719    ) -> Result<Snapshot, Error> {
720        let (receiver_proxy, receiver_stream) =
721            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
722        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
723
724        // Start with a BlockContents with all the required fields set.
725        let mut block_contents = fheapdump_client::BlockContents {
726            address: Some(FAKE_ALLOCATION_1_ADDRESS),
727            contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
728            ..Default::default()
729        };
730
731        // Set one of the fields to None, according to the case being tested.
732        set_one_field_to_none(&mut block_contents);
733
734        // Send it to the SnapshotReceiver along with the allocation it references.
735        let fut = receiver_proxy.batch(&[
736            fheapdump_client::SnapshotElement::BlockContents(block_contents),
737            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
738                address: Some(FAKE_ALLOCATION_1_ADDRESS),
739                size: Some(FAKE_ALLOCATION_1_SIZE),
740                thread_info_key: Some(FAKE_THREAD_1_KEY),
741                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
742                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
743                ..Default::default()
744            }),
745            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
746                thread_info_key: Some(FAKE_THREAD_1_KEY),
747                koid: Some(FAKE_THREAD_1_KOID),
748                name: Some(FAKE_THREAD_1_NAME.to_string()),
749                ..Default::default()
750            }),
751            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
752                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
753                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
754                ..Default::default()
755            }),
756        ]);
757        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
758
759        // Send the end of stream marker.
760        let fut = receiver_proxy.batch(&[]);
761        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
762
763        // Return the result.
764        receive_worker.await
765    }
766
767    #[fasync::run_singlethreaded(test)]
768    async fn test_conflicting_allocations() {
769        let (receiver_proxy, receiver_stream) =
770            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
771        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
772
773        // Send two allocations with the same address along with the stack trace they reference.
774        let fut = receiver_proxy.batch(&[
775            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
776                address: Some(FAKE_ALLOCATION_1_ADDRESS),
777                size: Some(FAKE_ALLOCATION_1_SIZE),
778                thread_info_key: Some(FAKE_THREAD_1_KEY),
779                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
780                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
781                ..Default::default()
782            }),
783            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
784                address: Some(FAKE_ALLOCATION_1_ADDRESS),
785                size: Some(FAKE_ALLOCATION_1_SIZE),
786                thread_info_key: Some(FAKE_THREAD_1_KEY),
787                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
788                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
789                ..Default::default()
790            }),
791            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
792                thread_info_key: Some(FAKE_THREAD_1_KEY),
793                koid: Some(FAKE_THREAD_1_KOID),
794                name: Some(FAKE_THREAD_1_NAME.to_string()),
795                ..Default::default()
796            }),
797            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
798                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
799                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
800                ..Default::default()
801            }),
802        ]);
803        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
804
805        // Send the end of stream marker.
806        let fut = receiver_proxy.batch(&[]);
807        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
808
809        // Verify expected error.
810        assert_matches!(
811            receive_worker.await,
812            Err(Error::ConflictingElement { element_type: "Allocation" })
813        );
814    }
815
816    #[fasync::run_singlethreaded(test)]
817    async fn test_conflicting_executable_regions() {
818        let (receiver_proxy, receiver_stream) =
819            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
820        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
821
822        // Send two executable regions with the same address.
823        let fut = receiver_proxy.batch(&[
824            fheapdump_client::SnapshotElement::ExecutableRegion(
825                fheapdump_client::ExecutableRegion {
826                    address: Some(FAKE_REGION_1_ADDRESS),
827                    size: Some(FAKE_REGION_1_SIZE),
828                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
829                    build_id: Some(fheapdump_client::BuildId {
830                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
831                    }),
832                    ..Default::default()
833                },
834            ),
835            fheapdump_client::SnapshotElement::ExecutableRegion(
836                fheapdump_client::ExecutableRegion {
837                    address: Some(FAKE_REGION_1_ADDRESS),
838                    size: Some(FAKE_REGION_1_SIZE),
839                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
840                    build_id: Some(fheapdump_client::BuildId {
841                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
842                    }),
843                    ..Default::default()
844                },
845            ),
846        ]);
847        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
848
849        // Send the end of stream marker.
850        let fut = receiver_proxy.batch(&[]);
851        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
852
853        // Verify expected error.
854        assert_matches!(
855            receive_worker.await,
856            Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
857        );
858    }
859
860    #[fasync::run_singlethreaded(test)]
861    async fn test_block_contents_wrong_size() {
862        let (receiver_proxy, receiver_stream) =
863            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
864        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
865
866        // Send an allocation whose BlockContents has the wrong size.
867        let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
868        let fut = receiver_proxy.batch(&[
869            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
870                address: Some(FAKE_ALLOCATION_1_ADDRESS),
871                size: Some(FAKE_ALLOCATION_1_SIZE),
872                thread_info_key: Some(FAKE_THREAD_1_KEY),
873                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
874                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
875                ..Default::default()
876            }),
877            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
878                address: Some(FAKE_ALLOCATION_1_ADDRESS),
879                contents: Some(contents_with_wrong_size),
880                ..Default::default()
881            }),
882            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
883                thread_info_key: Some(FAKE_THREAD_1_KEY),
884                koid: Some(FAKE_THREAD_1_KOID),
885                name: Some(FAKE_THREAD_1_NAME.to_string()),
886                ..Default::default()
887            }),
888            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
889                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
890                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
891                ..Default::default()
892            }),
893        ]);
894        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
895
896        // Send the end of stream marker.
897        let fut = receiver_proxy.batch(&[]);
898        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
899
900        // Verify expected error.
901        assert_matches!(
902            receive_worker.await,
903            Err(Error::ConflictingElement { element_type: "BlockContents" })
904        );
905    }
906
907    #[fasync::run_singlethreaded(test)]
908    async fn test_empty_stack_trace() {
909        let (receiver_proxy, receiver_stream) =
910            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
911        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
912
913        // Send an allocation that references an empty stack trace.
914        let fut = receiver_proxy.batch(&[
915            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
916                address: Some(FAKE_ALLOCATION_1_ADDRESS),
917                size: Some(FAKE_ALLOCATION_1_SIZE),
918                thread_info_key: Some(FAKE_THREAD_1_KEY),
919                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
920                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
921                ..Default::default()
922            }),
923            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
924                thread_info_key: Some(FAKE_THREAD_1_KEY),
925                koid: Some(FAKE_THREAD_1_KOID),
926                name: Some(FAKE_THREAD_1_NAME.to_string()),
927                ..Default::default()
928            }),
929            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
930                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
931                program_addresses: Some(vec![]),
932                ..Default::default()
933            }),
934        ]);
935        fut.await.unwrap();
936
937        // Send the end of stream marker.
938        let fut = receiver_proxy.batch(&[]);
939        fut.await.unwrap();
940
941        // Verify that the stack trace has been reconstructed correctly.
942        let mut received_snapshot = receive_worker.await.unwrap();
943        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
944        assert_eq!(allocation1.stack_trace.program_addresses, []);
945    }
946
947    #[fasync::run_singlethreaded(test)]
948    async fn test_chunked_stack_trace() {
949        let (receiver_proxy, receiver_stream) =
950            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
951        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
952
953        // Send an allocation and the first chunk of its stack trace.
954        let fut = receiver_proxy.batch(&[
955            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
956                address: Some(FAKE_ALLOCATION_1_ADDRESS),
957                size: Some(FAKE_ALLOCATION_1_SIZE),
958                thread_info_key: Some(FAKE_THREAD_1_KEY),
959                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
960                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
961                ..Default::default()
962            }),
963            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
964                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
965                program_addresses: Some(vec![1111, 2222]),
966                ..Default::default()
967            }),
968            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
969                thread_info_key: Some(FAKE_THREAD_1_KEY),
970                koid: Some(FAKE_THREAD_1_KOID),
971                name: Some(FAKE_THREAD_1_NAME.to_string()),
972                ..Default::default()
973            }),
974        ]);
975        fut.await.unwrap();
976
977        // Send the second chunk.
978        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
979            fheapdump_client::StackTrace {
980                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
981                program_addresses: Some(vec![3333]),
982                ..Default::default()
983            },
984        )]);
985        fut.await.unwrap();
986
987        // Send the end of stream marker.
988        let fut = receiver_proxy.batch(&[]);
989        fut.await.unwrap();
990
991        // Verify that the stack trace has been reconstructed correctly.
992        let mut received_snapshot = receive_worker.await.unwrap();
993        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
994        assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
995    }
996
997    #[fasync::run_singlethreaded(test)]
998    async fn test_empty_block_contents() {
999        let (receiver_proxy, receiver_stream) =
1000            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1001        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1002
1003        // Send a zero-sized allocation and its empty contents.
1004        let fut = receiver_proxy.batch(&[
1005            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1006                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1007                size: Some(0),
1008                thread_info_key: Some(FAKE_THREAD_1_KEY),
1009                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1010                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1011                ..Default::default()
1012            }),
1013            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1014                thread_info_key: Some(FAKE_THREAD_1_KEY),
1015                koid: Some(FAKE_THREAD_1_KOID),
1016                name: Some(FAKE_THREAD_1_NAME.to_string()),
1017                ..Default::default()
1018            }),
1019            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1020                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1021                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1022                ..Default::default()
1023            }),
1024            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1025                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1026                contents: Some(vec![]),
1027                ..Default::default()
1028            }),
1029        ]);
1030        fut.await.unwrap();
1031
1032        // Send the end of stream marker.
1033        let fut = receiver_proxy.batch(&[]);
1034        fut.await.unwrap();
1035
1036        // Verify that the allocation has been reconstructed correctly.
1037        let mut received_snapshot = receive_worker.await.unwrap();
1038        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
1039        assert_eq!(allocation1.contents.expect("contents must be set"), []);
1040    }
1041
1042    #[fasync::run_singlethreaded(test)]
1043    async fn test_chunked_block_contents() {
1044        let (receiver_proxy, receiver_stream) =
1045            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1046        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1047
1048        // Split the contents in two halves.
1049        let (content_first_chunk, contents_second_chunk) =
1050            FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1051
1052        // Send an allocation and the first chunk of its contents.
1053        let fut = receiver_proxy.batch(&[
1054            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1055                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1056                size: Some(FAKE_ALLOCATION_1_SIZE),
1057                thread_info_key: Some(FAKE_THREAD_1_KEY),
1058                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1059                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1060                ..Default::default()
1061            }),
1062            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1063                thread_info_key: Some(FAKE_THREAD_1_KEY),
1064                koid: Some(FAKE_THREAD_1_KOID),
1065                name: Some(FAKE_THREAD_1_NAME.to_string()),
1066                ..Default::default()
1067            }),
1068            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1069                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1070                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1071                ..Default::default()
1072            }),
1073            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1074                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1075                contents: Some(content_first_chunk.to_vec()),
1076                ..Default::default()
1077            }),
1078        ]);
1079        fut.await.unwrap();
1080
1081        // Send the second chunk.
1082        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1083            fheapdump_client::BlockContents {
1084                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1085                contents: Some(contents_second_chunk.to_vec()),
1086                ..Default::default()
1087            },
1088        )]);
1089        fut.await.unwrap();
1090
1091        // Send the end of stream marker.
1092        let fut = receiver_proxy.batch(&[]);
1093        fut.await.unwrap();
1094
1095        // Verify that the allocation's block contents have been reconstructed correctly.
1096        let mut received_snapshot = receive_worker.await.unwrap();
1097        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
1098        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
1099    }
1100
1101    #[fasync::run_singlethreaded(test)]
1102    async fn test_missing_end_of_stream() {
1103        let (receiver_proxy, receiver_stream) =
1104            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1105        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1106
1107        // Send an allocation and its stack trace.
1108        let fut = receiver_proxy.batch(&[
1109            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1110                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1111                size: Some(FAKE_ALLOCATION_1_SIZE),
1112                thread_info_key: Some(FAKE_THREAD_1_KEY),
1113                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1114                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1115                ..Default::default()
1116            }),
1117            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1118                thread_info_key: Some(FAKE_THREAD_1_KEY),
1119                koid: Some(FAKE_THREAD_1_KOID),
1120                name: Some(FAKE_THREAD_1_NAME.to_string()),
1121                ..Default::default()
1122            }),
1123            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1124                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1125                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1126                ..Default::default()
1127            }),
1128        ]);
1129        fut.await.unwrap();
1130
1131        // Close the channel without sending an end of stream marker.
1132        std::mem::drop(receiver_proxy);
1133
1134        // Expect an UnexpectedEndOfStream error.
1135        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1136    }
1137}