heapdump_snapshot/
snapshot.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use futures::StreamExt;
7use std::collections::{HashMap, HashSet};
8use std::rc::Rc;
9
10use crate::Error;
11
12/// Contains all the data received over a `SnapshotReceiver` channel.
13#[derive(Debug)]
14pub struct Snapshot {
15    /// All the live allocations in the analyzed process, indexed by memory address.
16    pub allocations: Vec<Allocation>,
17
18    /// All the executable memory regions in the analyzed process, indexed by start address.
19    pub executable_regions: HashMap<u64, ExecutableRegion>,
20}
21
22/// Information about one or more allocated memory blocks.
23#[derive(Debug)]
24pub struct Allocation {
25    pub address: Option<u64>,
26
27    /// Number of allocations that have been aggregated into this `Allocation` instance.
28    pub count: u64,
29
30    /// Total size, in bytes.
31    pub size: u64,
32
33    /// Allocating thread.
34    pub thread_info: Option<Rc<ThreadInfo>>,
35
36    /// Stack trace of the allocation site.
37    pub stack_trace: Rc<StackTrace>,
38
39    /// Allocation timestamp, in nanoseconds.
40    pub timestamp: Option<fidl::MonotonicInstant>,
41
42    /// Memory dump of this block's contents.
43    pub contents: Option<Vec<u8>>,
44}
45
46/// A stack trace.
47#[derive(Debug)]
48pub struct StackTrace {
49    /// Code addresses at each call frame. The first entry corresponds to the leaf call.
50    pub program_addresses: Vec<u64>,
51}
52
53/// A memory region containing code loaded from an ELF file.
54#[derive(Debug)]
55pub struct ExecutableRegion {
56    /// Region name for human consumption (usually either the ELF soname or the VMO name), if known.
57    pub name: String,
58
59    /// Region size, in bytes.
60    pub size: u64,
61
62    /// The corresponding offset in the ELF file.
63    pub file_offset: u64,
64
65    /// The corresponding relative address in the ELF file.
66    ///
67    /// Note: this field is not populated if the snapshot was generated by a build from before the
68    /// corresponding FIDL field was introduced.
69    pub vaddr: Option<u64>,
70
71    /// The Build ID of the ELF file.
72    pub build_id: Vec<u8>,
73}
74
75/// Information identifying a specific thread.
76#[derive(Debug, PartialEq)]
77pub struct ThreadInfo {
78    /// The thread's koid.
79    pub koid: zx_types::zx_koid_t,
80
81    /// The thread's name.
82    pub name: String,
83}
84
85/// Gets the value of a field in a FIDL table as a `Result<T, Error>`.
86///
87/// An `Err(Error::MissingField { .. })` is returned if the field's value is `None`.
88///
89/// Usage: `read_field!(container_expression => ContainerType, field_name)`
90///
91/// # Example
92///
93/// ```
94/// struct MyFidlTable { field: Option<u32>, .. }
95/// let table = MyFidlTable { field: Some(44), .. };
96///
97/// let val = read_field!(table => MyFidlTable, field)?;
98/// ```
99macro_rules! read_field {
100    ($e:expr => $c:ident, $f:ident) => {
101        $e.$f.ok_or(Error::MissingField {
102            container: std::stringify!($c),
103            field: std::stringify!($f),
104        })
105    };
106}
107
108impl Snapshot {
109    /// Receives data over a `SnapshotReceiver` channel and reassembles it.
110    pub async fn receive_from(
111        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
112    ) -> Result<Snapshot, Error> {
113        struct AllocationValue {
114            address: Option<u64>,
115            count: u64,
116            size: u64,
117            thread_info_key: Option<u64>,
118            stack_trace_key: u64,
119            timestamp: Option<fidl::MonotonicInstant>,
120        }
121        let mut allocation_addresses: HashSet<u64> = HashSet::new();
122        let mut allocations: Vec<AllocationValue> = vec![];
123        let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
124        let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
125        let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
126        let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
127
128        loop {
129            // Wait for the next batch of elements.
130            let batch = match stream.next().await.transpose()? {
131                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
132                    // Send acknowledgment as quickly as possible, then keep processing the received batch.
133                    responder.send()?;
134                    batch
135                }
136                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
137                    error,
138                    responder,
139                }) => {
140                    let _ = responder.send(); // Ignore the result of the acknowledgment.
141                    return Err(Error::CollectorError(error));
142                }
143                None => return Err(Error::UnexpectedEndOfStream),
144            };
145
146            // Process data. An empty batch signals the end of the stream.
147            if !batch.is_empty() {
148                for element in batch {
149                    match element {
150                        fheapdump_client::SnapshotElement::Allocation(allocation) => {
151                            if let Some(address) = allocation.address {
152                                if !allocation_addresses.insert(address) {
153                                    return Err(Error::ConflictingElement {
154                                        element_type: "Allocation",
155                                    });
156                                }
157                            }
158
159                            #[cfg(not(fuchsia_api_level_at_least = "29"))]
160                            let count = 1;
161                            #[cfg(fuchsia_api_level_at_least = "29")]
162                            let count = allocation.count.unwrap_or(1);
163
164                            let size = read_field!(allocation => Allocation, size)?;
165                            let stack_trace_key =
166                                read_field!(allocation => Allocation, stack_trace_key)?;
167                            allocations.push(AllocationValue {
168                                address: allocation.address,
169                                count,
170                                size,
171                                thread_info_key: allocation.thread_info_key,
172                                stack_trace_key,
173                                timestamp: allocation.timestamp,
174                            });
175                        }
176                        fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
177                            let stack_trace_key =
178                                read_field!(stack_trace => StackTrace, stack_trace_key)?;
179                            let mut program_addresses =
180                                read_field!(stack_trace => StackTrace, program_addresses)?;
181                            stack_traces
182                                .entry(stack_trace_key)
183                                .or_default()
184                                .append(&mut program_addresses);
185                        }
186                        fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
187                            let thread_info_key =
188                                read_field!(thread_info => ThreadInfo, thread_info_key)?;
189                            let koid = read_field!(thread_info => ThreadInfo, koid)?;
190                            let name = read_field!(thread_info => ThreadInfo, name)?;
191                            if thread_infos
192                                .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
193                                .is_some()
194                            {
195                                return Err(Error::ConflictingElement {
196                                    element_type: "ThreadInfo",
197                                });
198                            }
199                        }
200                        fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
201                            let address = read_field!(region => ExecutableRegion, address)?;
202                            #[cfg(fuchsia_api_level_at_least = "27")]
203                            let name = region.name.unwrap_or_else(|| String::new());
204                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
205                            let name = String::new();
206                            let size = read_field!(region => ExecutableRegion, size)?;
207                            let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
208                            #[cfg(fuchsia_api_level_at_least = "27")]
209                            let vaddr = region.vaddr;
210                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
211                            let vaddr = None;
212                            let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
213                            let region =
214                                ExecutableRegion { name, size, file_offset, vaddr, build_id };
215                            if executable_regions.insert(address, region).is_some() {
216                                return Err(Error::ConflictingElement {
217                                    element_type: "ExecutableRegion",
218                                });
219                            }
220                        }
221                        fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
222                            let address = read_field!(block_contents => BlockContents, address)?;
223                            let mut chunk = read_field!(block_contents => BlockContents, contents)?;
224                            contents.entry(address).or_default().append(&mut chunk);
225                        }
226                        _ => return Err(Error::UnexpectedElementType),
227                    }
228                }
229            } else {
230                // We are at the end of the stream. Convert to the final types and resolve
231                // cross-references.
232                let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
233                    .into_iter()
234                    .map(|(key, program_addresses)| {
235                        (key, Rc::new(StackTrace { program_addresses }))
236                    })
237                    .collect();
238                let mut final_allocations = vec![];
239                for AllocationValue {
240                    address,
241                    count,
242                    size,
243                    thread_info_key,
244                    stack_trace_key,
245                    timestamp,
246                } in allocations
247                {
248                    let thread_info = match thread_info_key {
249                        Some(key) => Some(
250                            thread_infos
251                                .get(&key)
252                                .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
253                                .clone(),
254                        ),
255                        None => None,
256                    };
257                    let stack_trace = final_stack_traces
258                        .get(&stack_trace_key)
259                        .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
260                        .clone();
261                    let contents = address.and_then(|address| contents.remove(&address));
262                    if let Some(data) = &contents {
263                        if data.len() as u64 != size {
264                            return Err(Error::ConflictingElement {
265                                element_type: "BlockContents",
266                            });
267                        }
268                    }
269                    final_allocations.push(Allocation {
270                        address,
271                        count,
272                        size,
273                        thread_info,
274                        stack_trace,
275                        timestamp,
276                        contents,
277                    });
278                }
279
280                return Ok(Snapshot { allocations: final_allocations, executable_regions });
281            }
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use assert_matches::assert_matches;
290    use fidl::endpoints::create_proxy_and_stream;
291    use fuchsia_async as fasync;
292    use test_case::test_case;
293
294    // Constants used by some of the tests below:
295    const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
296    const FAKE_ALLOCATION_1_SIZE: u64 = 8;
297    const FAKE_ALLOCATION_1_TIMESTAMP: fidl::MonotonicInstant =
298        fidl::MonotonicInstant::from_nanos(888888888);
299    const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
300    const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
301    const FAKE_ALLOCATION_2_SIZE: u64 = 4;
302    const FAKE_ALLOCATION_2_TIMESTAMP: fidl::MonotonicInstant =
303        fidl::MonotonicInstant::from_nanos(-777777777); // test negative value too
304    const FAKE_THREAD_1_KOID: u64 = 1212;
305    const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
306    const FAKE_THREAD_1_KEY: u64 = 4567;
307    const FAKE_THREAD_2_KOID: u64 = 1213;
308    const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
309    const FAKE_THREAD_2_KEY: u64 = 7654;
310    const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
311    const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
312    const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
313    const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
314    const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
315    const FAKE_REGION_1_NAME: &str = "region-1";
316    const FAKE_REGION_1_SIZE: u64 = 0x80000;
317    const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
318    const FAKE_REGION_1_VADDR: u64 = 0x3000;
319    const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
320    const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
321    const FAKE_REGION_2_SIZE: u64 = 0x200000;
322    const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
323    const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
324
325    #[fasync::run_singlethreaded(test)]
326    async fn test_empty() {
327        let (receiver_proxy, receiver_stream) =
328            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
329        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
330
331        // Send the end of stream marker.
332        let fut = receiver_proxy.batch(&[]);
333        fut.await.unwrap();
334
335        // Receive the snapshot we just transmitted and verify that it is empty.
336        let received_snapshot = receive_worker.await.unwrap();
337        assert!(received_snapshot.allocations.is_empty());
338        assert!(received_snapshot.executable_regions.is_empty());
339    }
340
341    #[fasync::run_singlethreaded(test)]
342    async fn test_one_batch() {
343        let (receiver_proxy, receiver_stream) =
344            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
345        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
346
347        // Send a batch containing two allocations - whose threads, stack traces and contents can be
348        // listed before or after the allocation(s) that reference them - and two executable
349        // regions.
350        let fut = receiver_proxy.batch(&[
351            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
352                address: Some(FAKE_ALLOCATION_1_ADDRESS),
353                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
354                ..Default::default()
355            }),
356            fheapdump_client::SnapshotElement::ExecutableRegion(
357                fheapdump_client::ExecutableRegion {
358                    address: Some(FAKE_REGION_1_ADDRESS),
359                    name: Some(FAKE_REGION_1_NAME.to_string()),
360                    size: Some(FAKE_REGION_1_SIZE),
361                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
362                    vaddr: Some(FAKE_REGION_1_VADDR),
363                    build_id: Some(fheapdump_client::BuildId {
364                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
365                    }),
366                    ..Default::default()
367                },
368            ),
369            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
370                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
371                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
372                ..Default::default()
373            }),
374            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
375                address: Some(FAKE_ALLOCATION_1_ADDRESS),
376                size: Some(FAKE_ALLOCATION_1_SIZE),
377                thread_info_key: Some(FAKE_THREAD_1_KEY),
378                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
379                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
380                ..Default::default()
381            }),
382            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
383                thread_info_key: Some(FAKE_THREAD_1_KEY),
384                koid: Some(FAKE_THREAD_1_KOID),
385                name: Some(FAKE_THREAD_1_NAME.to_string()),
386                ..Default::default()
387            }),
388            fheapdump_client::SnapshotElement::ExecutableRegion(
389                fheapdump_client::ExecutableRegion {
390                    address: Some(FAKE_REGION_2_ADDRESS),
391                    size: Some(FAKE_REGION_2_SIZE),
392                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
393                    build_id: Some(fheapdump_client::BuildId {
394                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
395                    }),
396                    ..Default::default()
397                },
398            ),
399            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
400                thread_info_key: Some(FAKE_THREAD_2_KEY),
401                koid: Some(FAKE_THREAD_2_KOID),
402                name: Some(FAKE_THREAD_2_NAME.to_string()),
403                ..Default::default()
404            }),
405            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
406                address: Some(FAKE_ALLOCATION_2_ADDRESS),
407                size: Some(FAKE_ALLOCATION_2_SIZE),
408                thread_info_key: Some(FAKE_THREAD_2_KEY),
409                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
410                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
411                ..Default::default()
412            }),
413            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
414                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
415                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
416                ..Default::default()
417            }),
418        ]);
419        fut.await.unwrap();
420
421        // Send the end of stream marker.
422        let fut = receiver_proxy.batch(&[]);
423        fut.await.unwrap();
424
425        // Receive the snapshot we just transmitted and verify its contents.
426        let mut received_snapshot = receive_worker.await.unwrap();
427        let allocation1 = received_snapshot.allocations.swap_remove(
428            received_snapshot
429                .allocations
430                .iter()
431                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
432                .unwrap(),
433        );
434        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
435        assert_eq!(
436            allocation1.thread_info,
437            Some(Rc::new(ThreadInfo {
438                koid: FAKE_THREAD_1_KOID,
439                name: FAKE_THREAD_1_NAME.to_owned()
440            }))
441        );
442        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
443        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
444        assert_eq!(
445            allocation1.contents.as_ref().expect("contents must be set"),
446            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
447        );
448        let allocation2 = received_snapshot.allocations.swap_remove(
449            received_snapshot
450                .allocations
451                .iter()
452                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
453                .unwrap(),
454        );
455        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
456        assert_eq!(
457            allocation2.thread_info,
458            Some(Rc::new(ThreadInfo {
459                koid: FAKE_THREAD_2_KOID,
460                name: FAKE_THREAD_2_NAME.to_owned()
461            }))
462        );
463        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
464        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
465        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
466        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
467        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
468        assert_eq!(region1.name, FAKE_REGION_1_NAME);
469        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
470        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
471        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
472        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
473        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
474        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
475        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
476        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
477        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
478    }
479
480    #[fasync::run_singlethreaded(test)]
481    async fn test_two_batches() {
482        let (receiver_proxy, receiver_stream) =
483            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
484        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
485
486        // Send a first batch.
487        let fut = receiver_proxy.batch(&[
488            fheapdump_client::SnapshotElement::ExecutableRegion(
489                fheapdump_client::ExecutableRegion {
490                    address: Some(FAKE_REGION_2_ADDRESS),
491                    size: Some(FAKE_REGION_2_SIZE),
492                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
493                    build_id: Some(fheapdump_client::BuildId {
494                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
495                    }),
496                    ..Default::default()
497                },
498            ),
499            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
500                address: Some(FAKE_ALLOCATION_1_ADDRESS),
501                size: Some(FAKE_ALLOCATION_1_SIZE),
502                thread_info_key: Some(FAKE_THREAD_1_KEY),
503                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
504                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
505                ..Default::default()
506            }),
507            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
508                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
509                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
510                ..Default::default()
511            }),
512            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
513                thread_info_key: Some(FAKE_THREAD_2_KEY),
514                koid: Some(FAKE_THREAD_2_KOID),
515                name: Some(FAKE_THREAD_2_NAME.to_string()),
516                ..Default::default()
517            }),
518        ]);
519        fut.await.unwrap();
520
521        // Send another batch.
522        let fut = receiver_proxy.batch(&[
523            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
524                thread_info_key: Some(FAKE_THREAD_1_KEY),
525                koid: Some(FAKE_THREAD_1_KOID),
526                name: Some(FAKE_THREAD_1_NAME.to_string()),
527                ..Default::default()
528            }),
529            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
530                address: Some(FAKE_ALLOCATION_2_ADDRESS),
531                size: Some(FAKE_ALLOCATION_2_SIZE),
532                thread_info_key: Some(FAKE_THREAD_2_KEY),
533                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
534                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
535                ..Default::default()
536            }),
537            fheapdump_client::SnapshotElement::ExecutableRegion(
538                fheapdump_client::ExecutableRegion {
539                    address: Some(FAKE_REGION_1_ADDRESS),
540                    name: Some(FAKE_REGION_1_NAME.to_string()),
541                    size: Some(FAKE_REGION_1_SIZE),
542                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
543                    vaddr: Some(FAKE_REGION_1_VADDR),
544                    build_id: Some(fheapdump_client::BuildId {
545                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
546                    }),
547                    ..Default::default()
548                },
549            ),
550            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
551                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
552                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
553                ..Default::default()
554            }),
555            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
556                address: Some(FAKE_ALLOCATION_1_ADDRESS),
557                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
558                ..Default::default()
559            }),
560        ]);
561        fut.await.unwrap();
562
563        // Send the end of stream marker.
564        let fut = receiver_proxy.batch(&[]);
565        fut.await.unwrap();
566
567        // Receive the snapshot we just transmitted and verify its contents.
568        let mut received_snapshot = receive_worker.await.unwrap();
569        let allocation1 = received_snapshot.allocations.swap_remove(
570            received_snapshot
571                .allocations
572                .iter()
573                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
574                .unwrap(),
575        );
576        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
577        assert_eq!(
578            allocation1.thread_info,
579            Some(Rc::new(ThreadInfo {
580                koid: FAKE_THREAD_1_KOID,
581                name: FAKE_THREAD_1_NAME.to_owned()
582            }))
583        );
584        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
585        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
586        assert_eq!(
587            allocation1.contents.as_ref().expect("contents must be set"),
588            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
589        );
590        let allocation2 = received_snapshot.allocations.swap_remove(
591            received_snapshot
592                .allocations
593                .iter()
594                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
595                .unwrap(),
596        );
597        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
598        assert_eq!(
599            allocation2.thread_info,
600            Some(Rc::new(ThreadInfo {
601                koid: FAKE_THREAD_2_KOID,
602                name: FAKE_THREAD_2_NAME.to_owned()
603            }))
604        );
605        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
606        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
607        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
608        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
609        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
610        assert_eq!(region1.name, FAKE_REGION_1_NAME);
611        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
612        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
613        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
614        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
615        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
616        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
617        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
618        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
619        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
620    }
621
622    #[test_case(|allocation| allocation.size = None => matches
623        Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
624    #[test_case(|allocation| allocation.stack_trace_key = None => matches
625        Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
626    #[test_case(|allocation| allocation.address = None => matches
627        Ok(_) ; "address_is_optional")]
628    #[test_case(|allocation| allocation.thread_info_key = None => matches
629        Ok(_) ; "thread_info_is_optional")]
630    #[test_case(|allocation| allocation.timestamp = None => matches
631        Ok(_) ; "timestamp_is_optional")]
632    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
633        Ok(_) ; "success")]
634    #[fasync::run_singlethreaded(test)]
635    async fn test_allocation_required_fields(
636        set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
637    ) -> Result<Snapshot, Error> {
638        let (receiver_proxy, receiver_stream) =
639            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
640        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
641
642        // Start with an Allocation with all the required fields set.
643        let mut allocation = fheapdump_client::Allocation {
644            address: Some(FAKE_ALLOCATION_1_ADDRESS),
645            size: Some(FAKE_ALLOCATION_1_SIZE),
646            thread_info_key: Some(FAKE_THREAD_1_KEY),
647            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
648            timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
649            ..Default::default()
650        };
651
652        // Set one of the fields to None, according to the case being tested.
653        set_one_field_to_none(&mut allocation);
654
655        // Send it to the SnapshotReceiver along with the thread info and stack trace it references.
656        let fut = receiver_proxy.batch(&[
657            fheapdump_client::SnapshotElement::Allocation(allocation),
658            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
659                thread_info_key: Some(FAKE_THREAD_1_KEY),
660                koid: Some(FAKE_THREAD_1_KOID),
661                name: Some(FAKE_THREAD_1_NAME.to_string()),
662                ..Default::default()
663            }),
664            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
665                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
666                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
667                ..Default::default()
668            }),
669        ]);
670        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
671
672        // Send the end of stream marker.
673        let fut = receiver_proxy.batch(&[]);
674        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
675
676        // Return the result.
677        receive_worker.await
678    }
679
680    #[test_case(|thread_info| thread_info.thread_info_key = None => matches
681        Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
682    #[test_case(|thread_info| thread_info.koid = None => matches
683        Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
684    #[test_case(|thread_info| thread_info.name = None => matches
685        Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
686    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
687        Ok(_) ; "success")]
688    #[fasync::run_singlethreaded(test)]
689    async fn test_thread_info_required_fields(
690        set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
691    ) -> Result<Snapshot, Error> {
692        let (receiver_proxy, receiver_stream) =
693            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
694        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
695
696        // Start with a ThreadInfo with all the required fields set.
697        let mut thread_info = fheapdump_client::ThreadInfo {
698            thread_info_key: Some(FAKE_THREAD_1_KEY),
699            koid: Some(FAKE_THREAD_1_KOID),
700            name: Some(FAKE_THREAD_1_NAME.to_string()),
701            ..Default::default()
702        };
703
704        // Set one of the fields to None, according to the case being tested.
705        set_one_field_to_none(&mut thread_info);
706
707        // Send it to the SnapshotReceiver.
708        let fut =
709            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
710        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
711
712        // Send the end of stream marker.
713        let fut = receiver_proxy.batch(&[]);
714        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
715
716        // Return the result.
717        receive_worker.await
718    }
719
720    #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
721        Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
722    #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
723        Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
724    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
725        Ok(_) ; "success")]
726    #[fasync::run_singlethreaded(test)]
727    async fn test_stack_trace_required_fields(
728        set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
729    ) -> Result<Snapshot, Error> {
730        let (receiver_proxy, receiver_stream) =
731            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
732        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
733
734        // Start with a StackTrace with all the required fields set.
735        let mut stack_trace = fheapdump_client::StackTrace {
736            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
737            program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
738            ..Default::default()
739        };
740
741        // Set one of the fields to None, according to the case being tested.
742        set_one_field_to_none(&mut stack_trace);
743
744        // Send it to the SnapshotReceiver.
745        let fut =
746            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
747        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
748
749        // Send the end of stream marker.
750        let fut = receiver_proxy.batch(&[]);
751        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
752
753        // Return the result.
754        receive_worker.await
755    }
756
757    #[test_case(|region| region.address = None => matches
758        Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
759    #[test_case(|region| region.size = None => matches
760        Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
761    #[test_case(|region| region.file_offset = None => matches
762        Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
763    #[test_case(|region| region.build_id = None => matches
764        Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
765    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
766        Ok(_) ; "success")]
767    #[fasync::run_singlethreaded(test)]
768    async fn test_executable_region_required_fields(
769        set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
770    ) -> Result<Snapshot, Error> {
771        let (receiver_proxy, receiver_stream) =
772            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
773        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
774
775        // Start with an ExecutableRegion with all the required fields set.
776        let mut region = fheapdump_client::ExecutableRegion {
777            address: Some(FAKE_REGION_1_ADDRESS),
778            size: Some(FAKE_REGION_1_SIZE),
779            file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
780            build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
781            ..Default::default()
782        };
783
784        // Set one of the fields to None, according to the case being tested.
785        set_one_field_to_none(&mut region);
786
787        // Send it to the SnapshotReceiver.
788        let fut =
789            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
790        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
791
792        // Send the end of stream marker.
793        let fut = receiver_proxy.batch(&[]);
794        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
795
796        // Return the result.
797        receive_worker.await
798    }
799
800    #[test_case(|block_contents| block_contents.address = None => matches
801        Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
802    #[test_case(|block_contents| block_contents.contents = None => matches
803        Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
804    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
805        Ok(_) ; "success")]
806    #[fasync::run_singlethreaded(test)]
807    async fn test_block_contents_required_fields(
808        set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
809    ) -> Result<Snapshot, Error> {
810        let (receiver_proxy, receiver_stream) =
811            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
812        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
813
814        // Start with a BlockContents with all the required fields set.
815        let mut block_contents = fheapdump_client::BlockContents {
816            address: Some(FAKE_ALLOCATION_1_ADDRESS),
817            contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
818            ..Default::default()
819        };
820
821        // Set one of the fields to None, according to the case being tested.
822        set_one_field_to_none(&mut block_contents);
823
824        // Send it to the SnapshotReceiver along with the allocation it references.
825        let fut = receiver_proxy.batch(&[
826            fheapdump_client::SnapshotElement::BlockContents(block_contents),
827            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
828                address: Some(FAKE_ALLOCATION_1_ADDRESS),
829                size: Some(FAKE_ALLOCATION_1_SIZE),
830                thread_info_key: Some(FAKE_THREAD_1_KEY),
831                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
832                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
833                ..Default::default()
834            }),
835            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
836                thread_info_key: Some(FAKE_THREAD_1_KEY),
837                koid: Some(FAKE_THREAD_1_KOID),
838                name: Some(FAKE_THREAD_1_NAME.to_string()),
839                ..Default::default()
840            }),
841            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
842                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
843                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
844                ..Default::default()
845            }),
846        ]);
847        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
848
849        // Send the end of stream marker.
850        let fut = receiver_proxy.batch(&[]);
851        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
852
853        // Return the result.
854        receive_worker.await
855    }
856
857    #[fasync::run_singlethreaded(test)]
858    async fn test_conflicting_allocations() {
859        let (receiver_proxy, receiver_stream) =
860            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
861        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
862
863        // Send two allocations with the same address along with the stack trace they reference.
864        let fut = receiver_proxy.batch(&[
865            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
866                address: Some(FAKE_ALLOCATION_1_ADDRESS),
867                size: Some(FAKE_ALLOCATION_1_SIZE),
868                thread_info_key: Some(FAKE_THREAD_1_KEY),
869                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
870                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
871                ..Default::default()
872            }),
873            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
874                address: Some(FAKE_ALLOCATION_1_ADDRESS),
875                size: Some(FAKE_ALLOCATION_1_SIZE),
876                thread_info_key: Some(FAKE_THREAD_1_KEY),
877                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
878                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
879                ..Default::default()
880            }),
881            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
882                thread_info_key: Some(FAKE_THREAD_1_KEY),
883                koid: Some(FAKE_THREAD_1_KOID),
884                name: Some(FAKE_THREAD_1_NAME.to_string()),
885                ..Default::default()
886            }),
887            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
888                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
889                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
890                ..Default::default()
891            }),
892        ]);
893        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
894
895        // Send the end of stream marker.
896        let fut = receiver_proxy.batch(&[]);
897        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
898
899        // Verify expected error.
900        assert_matches!(
901            receive_worker.await,
902            Err(Error::ConflictingElement { element_type: "Allocation" })
903        );
904    }
905
906    #[fasync::run_singlethreaded(test)]
907    async fn test_conflicting_executable_regions() {
908        let (receiver_proxy, receiver_stream) =
909            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
910        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
911
912        // Send two executable regions with the same address.
913        let fut = receiver_proxy.batch(&[
914            fheapdump_client::SnapshotElement::ExecutableRegion(
915                fheapdump_client::ExecutableRegion {
916                    address: Some(FAKE_REGION_1_ADDRESS),
917                    size: Some(FAKE_REGION_1_SIZE),
918                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
919                    build_id: Some(fheapdump_client::BuildId {
920                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
921                    }),
922                    ..Default::default()
923                },
924            ),
925            fheapdump_client::SnapshotElement::ExecutableRegion(
926                fheapdump_client::ExecutableRegion {
927                    address: Some(FAKE_REGION_1_ADDRESS),
928                    size: Some(FAKE_REGION_1_SIZE),
929                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
930                    build_id: Some(fheapdump_client::BuildId {
931                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
932                    }),
933                    ..Default::default()
934                },
935            ),
936        ]);
937        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
938
939        // Send the end of stream marker.
940        let fut = receiver_proxy.batch(&[]);
941        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
942
943        // Verify expected error.
944        assert_matches!(
945            receive_worker.await,
946            Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
947        );
948    }
949
950    #[fasync::run_singlethreaded(test)]
951    async fn test_block_contents_wrong_size() {
952        let (receiver_proxy, receiver_stream) =
953            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
954        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
955
956        // Send an allocation whose BlockContents has the wrong size.
957        let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
958        let fut = receiver_proxy.batch(&[
959            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
960                address: Some(FAKE_ALLOCATION_1_ADDRESS),
961                size: Some(FAKE_ALLOCATION_1_SIZE),
962                thread_info_key: Some(FAKE_THREAD_1_KEY),
963                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
964                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
965                ..Default::default()
966            }),
967            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
968                address: Some(FAKE_ALLOCATION_1_ADDRESS),
969                contents: Some(contents_with_wrong_size),
970                ..Default::default()
971            }),
972            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
973                thread_info_key: Some(FAKE_THREAD_1_KEY),
974                koid: Some(FAKE_THREAD_1_KOID),
975                name: Some(FAKE_THREAD_1_NAME.to_string()),
976                ..Default::default()
977            }),
978            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
979                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
980                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
981                ..Default::default()
982            }),
983        ]);
984        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
985
986        // Send the end of stream marker.
987        let fut = receiver_proxy.batch(&[]);
988        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
989
990        // Verify expected error.
991        assert_matches!(
992            receive_worker.await,
993            Err(Error::ConflictingElement { element_type: "BlockContents" })
994        );
995    }
996
997    #[fasync::run_singlethreaded(test)]
998    async fn test_empty_stack_trace() {
999        let (receiver_proxy, receiver_stream) =
1000            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1001        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1002
1003        // Send an allocation that references an empty stack trace.
1004        let fut = receiver_proxy.batch(&[
1005            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1006                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1007                size: Some(FAKE_ALLOCATION_1_SIZE),
1008                thread_info_key: Some(FAKE_THREAD_1_KEY),
1009                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1010                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1011                ..Default::default()
1012            }),
1013            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1014                thread_info_key: Some(FAKE_THREAD_1_KEY),
1015                koid: Some(FAKE_THREAD_1_KOID),
1016                name: Some(FAKE_THREAD_1_NAME.to_string()),
1017                ..Default::default()
1018            }),
1019            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1020                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1021                program_addresses: Some(vec![]),
1022                ..Default::default()
1023            }),
1024        ]);
1025        fut.await.unwrap();
1026
1027        // Send the end of stream marker.
1028        let fut = receiver_proxy.batch(&[]);
1029        fut.await.unwrap();
1030
1031        // Verify that the stack trace has been reconstructed correctly.
1032        let received_snapshot = receive_worker.await.unwrap();
1033        let allocation1 = received_snapshot
1034            .allocations
1035            .iter()
1036            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1037            .unwrap();
1038        assert_eq!(allocation1.stack_trace.program_addresses, []);
1039    }
1040
1041    #[fasync::run_singlethreaded(test)]
1042    async fn test_chunked_stack_trace() {
1043        let (receiver_proxy, receiver_stream) =
1044            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1045        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1046
1047        // Send an allocation and the first chunk of its stack trace.
1048        let fut = receiver_proxy.batch(&[
1049            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1050                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1051                size: Some(FAKE_ALLOCATION_1_SIZE),
1052                thread_info_key: Some(FAKE_THREAD_1_KEY),
1053                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1054                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1055                ..Default::default()
1056            }),
1057            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1058                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1059                program_addresses: Some(vec![1111, 2222]),
1060                ..Default::default()
1061            }),
1062            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1063                thread_info_key: Some(FAKE_THREAD_1_KEY),
1064                koid: Some(FAKE_THREAD_1_KOID),
1065                name: Some(FAKE_THREAD_1_NAME.to_string()),
1066                ..Default::default()
1067            }),
1068        ]);
1069        fut.await.unwrap();
1070
1071        // Send the second chunk.
1072        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
1073            fheapdump_client::StackTrace {
1074                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1075                program_addresses: Some(vec![3333]),
1076                ..Default::default()
1077            },
1078        )]);
1079        fut.await.unwrap();
1080
1081        // Send the end of stream marker.
1082        let fut = receiver_proxy.batch(&[]);
1083        fut.await.unwrap();
1084
1085        // Verify that the stack trace has been reconstructed correctly.
1086        let received_snapshot = receive_worker.await.unwrap();
1087        let allocation1 = received_snapshot
1088            .allocations
1089            .iter()
1090            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1091            .unwrap();
1092        assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
1093    }
1094
1095    #[fasync::run_singlethreaded(test)]
1096    async fn test_empty_block_contents() {
1097        let (receiver_proxy, receiver_stream) =
1098            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1099        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1100
1101        // Send a zero-sized allocation and its empty contents.
1102        let fut = receiver_proxy.batch(&[
1103            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1104                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1105                size: Some(0),
1106                thread_info_key: Some(FAKE_THREAD_1_KEY),
1107                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1108                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1109                ..Default::default()
1110            }),
1111            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1112                thread_info_key: Some(FAKE_THREAD_1_KEY),
1113                koid: Some(FAKE_THREAD_1_KOID),
1114                name: Some(FAKE_THREAD_1_NAME.to_string()),
1115                ..Default::default()
1116            }),
1117            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1118                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1119                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1120                ..Default::default()
1121            }),
1122            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1123                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1124                contents: Some(vec![]),
1125                ..Default::default()
1126            }),
1127        ]);
1128        fut.await.unwrap();
1129
1130        // Send the end of stream marker.
1131        let fut = receiver_proxy.batch(&[]);
1132        fut.await.unwrap();
1133
1134        // Verify that the allocation has been reconstructed correctly.
1135        let received_snapshot = receive_worker.await.unwrap();
1136        let allocation1 = received_snapshot
1137            .allocations
1138            .iter()
1139            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1140            .unwrap();
1141        assert_eq!(allocation1.contents.as_ref().expect("contents must be set"), &vec![]);
1142    }
1143
1144    #[fasync::run_singlethreaded(test)]
1145    async fn test_chunked_block_contents() {
1146        let (receiver_proxy, receiver_stream) =
1147            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1148        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1149
1150        // Split the contents in two halves.
1151        let (content_first_chunk, contents_second_chunk) =
1152            FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1153
1154        // Send an allocation and the first chunk of its contents.
1155        let fut = receiver_proxy.batch(&[
1156            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1157                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1158                size: Some(FAKE_ALLOCATION_1_SIZE),
1159                thread_info_key: Some(FAKE_THREAD_1_KEY),
1160                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1161                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1162                ..Default::default()
1163            }),
1164            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1165                thread_info_key: Some(FAKE_THREAD_1_KEY),
1166                koid: Some(FAKE_THREAD_1_KOID),
1167                name: Some(FAKE_THREAD_1_NAME.to_string()),
1168                ..Default::default()
1169            }),
1170            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1171                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1172                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1173                ..Default::default()
1174            }),
1175            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1176                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1177                contents: Some(content_first_chunk.to_vec()),
1178                ..Default::default()
1179            }),
1180        ]);
1181        fut.await.unwrap();
1182
1183        // Send the second chunk.
1184        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1185            fheapdump_client::BlockContents {
1186                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1187                contents: Some(contents_second_chunk.to_vec()),
1188                ..Default::default()
1189            },
1190        )]);
1191        fut.await.unwrap();
1192
1193        // Send the end of stream marker.
1194        let fut = receiver_proxy.batch(&[]);
1195        fut.await.unwrap();
1196
1197        // Verify that the allocation's block contents have been reconstructed correctly.
1198        let received_snapshot = receive_worker.await.unwrap();
1199        let allocation1 = received_snapshot
1200            .allocations
1201            .iter()
1202            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1203            .unwrap();
1204        assert_eq!(allocation1.contents, Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()));
1205    }
1206
1207    #[fasync::run_singlethreaded(test)]
1208    async fn test_missing_end_of_stream() {
1209        let (receiver_proxy, receiver_stream) =
1210            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1211        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1212
1213        // Send an allocation and its stack trace.
1214        let fut = receiver_proxy.batch(&[
1215            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1216                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1217                size: Some(FAKE_ALLOCATION_1_SIZE),
1218                thread_info_key: Some(FAKE_THREAD_1_KEY),
1219                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1220                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1221                ..Default::default()
1222            }),
1223            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1224                thread_info_key: Some(FAKE_THREAD_1_KEY),
1225                koid: Some(FAKE_THREAD_1_KOID),
1226                name: Some(FAKE_THREAD_1_NAME.to_string()),
1227                ..Default::default()
1228            }),
1229            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1230                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1231                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1232                ..Default::default()
1233            }),
1234        ]);
1235        fut.await.unwrap();
1236
1237        // Close the channel without sending an end of stream marker.
1238        std::mem::drop(receiver_proxy);
1239
1240        // Expect an UnexpectedEndOfStream error.
1241        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1242    }
1243}