1use crate::error::ParseWarning;
6use crate::init::{InitRecord, Ticks};
7use crate::metadata::{
8 MetadataRecord, Provider, ProviderEventMetadataRecord, ProviderInfoMetadataRecord,
9 ProviderSectionMetadataRecord, TraceInfoMetadataRecord,
10};
11use crate::string::{StringRecord, StringRef};
12use crate::thread::{ProcessKoid, ProcessRef, ThreadKoid, ThreadRecord, ThreadRef};
13use crate::{ParseError, ParsedWithOriginalBytes, RawTraceRecord, TraceRecord};
14use flyweights::FlyStr;
15use futures::{AsyncRead, AsyncReadExt, SinkExt, Stream};
16use std::collections::BTreeMap;
17use std::num::{NonZeroU16, NonZeroU8};
18
19pub fn parse_full_session<'a>(
20 buf: &'a [u8],
21) -> Result<(Vec<TraceRecord>, Vec<ParseWarning>), ParseError> {
22 let mut parser = SessionParser::new(std::io::Cursor::new(buf));
23 let mut records = vec![];
24 while let Some(record) = parser.next() {
25 records.push(record?);
26 }
27 Ok((records, parser.warnings().to_owned()))
28}
29
30#[derive(Debug, PartialEq)]
31pub struct SessionParser<R> {
32 buffer: Vec<u8>,
33 reader: R,
34 resolver: ResolveCtx,
35 reader_is_eof: bool,
36 have_seen_magic_number: bool,
37 parsed_bytes: Vec<u8>,
38}
39
40impl<R: std::io::Read> SessionParser<R> {
41 pub fn new(reader: R) -> Self {
42 Self {
43 buffer: vec![],
44 reader,
45 resolver: ResolveCtx::new(),
46 reader_is_eof: false,
47 have_seen_magic_number: false,
48 parsed_bytes: vec![],
49 }
50 }
51}
52
53impl<R> SessionParser<R> {
54 pub fn warnings(&self) -> &[ParseWarning] {
55 self.resolver.warnings()
56 }
57
58 pub fn parsed_bytes(&self) -> &[u8] {
59 return &self.parsed_bytes;
60 }
61
62 fn parse_next(&mut self) -> ParseOutcome {
63 match RawTraceRecord::parse(&self.buffer) {
64 Ok((rem, ParsedWithOriginalBytes { parsed: raw_record, bytes })) => {
65 self.parsed_bytes.extend(bytes);
66 if raw_record.is_magic_number() {
68 self.have_seen_magic_number = true;
69 } else {
70 if !self.have_seen_magic_number {
71 return ParseOutcome::Error(ParseError::MissingMagicNumber);
72 }
73 }
74
75 let resolve_res = TraceRecord::resolve(&mut self.resolver, raw_record);
77
78 let unused_len = rem.len();
80 let parsed_len = self.buffer.len() - unused_len;
81 self.buffer.copy_within(parsed_len.., 0);
82 self.buffer.truncate(unused_len);
83
84 match resolve_res {
85 Ok(None) => ParseOutcome::Continue,
88 Ok(Some(resolved)) => ParseOutcome::GotRecord(resolved),
89 Err(e) => ParseOutcome::Error(e),
90 }
91 }
92 Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
93 self.buffer = vec![];
94 ParseOutcome::Error(e)
95 }
96 Err(nom::Err::Incomplete(needed)) => {
97 ParseOutcome::NeedMoreBytes(match needed {
98 nom::Needed::Unknown => 32768,
101 nom::Needed::Size(n) => n.into(),
102 })
103 }
104 }
105 }
106}
107
108enum ParseOutcome {
109 GotRecord(TraceRecord),
110 Continue,
111 Error(ParseError),
112 NeedMoreBytes(usize),
113}
114
115macro_rules! fill_buffer {
117 ($self:ident, $original_len:ident, $needed:ident, $bytes_read:expr) => {{
118 if $self.reader_is_eof {
119 return None;
122 } else {
123 let $original_len = $self.buffer.len();
124 $self.buffer.resize($original_len + $needed, 0);
125 let bytes_read = $bytes_read;
126 if bytes_read == 0 {
127 $self.reader_is_eof = true;
128 }
129 $self.buffer.truncate($original_len + bytes_read);
130 }
131 }};
132}
133
134impl<R: std::io::Read> Iterator for SessionParser<R> {
135 type Item = Result<TraceRecord, ParseError>;
136 fn next(&mut self) -> Option<Self::Item> {
137 self.parsed_bytes.clear();
139 loop {
140 match self.parse_next() {
141 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
142 ParseOutcome::Error(e) => return Some(Err(e)),
143 ParseOutcome::Continue => continue,
144 ParseOutcome::NeedMoreBytes(needed) => {
145 fill_buffer!(
146 self,
147 original_len,
148 needed,
149 match self.reader.read(&mut self.buffer[original_len..]) {
150 Ok(b) => b,
151 Err(e) => return Some(Err(ParseError::Io(e))),
152 }
153 );
154 }
155 }
156 }
157 }
158}
159
160impl<R: AsyncRead + Send + Unpin + 'static> SessionParser<R> {
161 pub fn new_async(
162 reader: R,
163 ) -> (impl Stream<Item = Result<TraceRecord, ParseError>>, fuchsia_async::Task<Vec<ParseWarning>>)
164 {
165 let (mut send, recv) = futures::channel::mpsc::channel(1);
167 let pump_task = fuchsia_async::Task::spawn(async move {
168 let mut parser = Self {
169 buffer: vec![],
170 reader,
171 resolver: ResolveCtx::new(),
172 reader_is_eof: false,
173 have_seen_magic_number: false,
174 parsed_bytes: vec![],
175 };
176
177 while let Some(next) = parser.next_async().await {
178 if send.send(next).await.is_err() {
179 break;
181 }
182 }
183
184 parser.warnings().to_owned()
185 });
186
187 (recv, pump_task)
188 }
189
190 pub async fn next_async(&mut self) -> Option<Result<TraceRecord, ParseError>> {
191 self.parsed_bytes.clear();
193 loop {
194 match self.parse_next() {
195 ParseOutcome::GotRecord(r) => return Some(Ok(r)),
196 ParseOutcome::Error(e) => return Some(Err(e)),
197 ParseOutcome::Continue => continue,
198 ParseOutcome::NeedMoreBytes(needed) => {
199 fill_buffer!(
200 self,
201 original_len,
202 needed,
203 match self.reader.read(&mut self.buffer[original_len..]).await {
204 Ok(b) => b,
205 Err(e) => return Some(Err(ParseError::Io(e))),
206 }
207 );
208 }
209 }
210 }
211 }
212}
213
214#[derive(Debug, PartialEq)]
215pub(crate) struct ResolveCtx {
216 ticks_per_second: u64,
217 current_provider: Option<Provider>,
218 providers: BTreeMap<u32, FlyStr>,
219 strings: BTreeMap<NonZeroU16, FlyStr>,
220 threads: BTreeMap<NonZeroU8, (ProcessKoid, ThreadKoid)>,
221 warnings: Vec<ParseWarning>,
222}
223
224impl ResolveCtx {
225 pub fn new() -> Self {
226 Self {
227 ticks_per_second: 1,
228 current_provider: None,
229 providers: Default::default(),
230 strings: Default::default(),
231 threads: Default::default(),
232 warnings: Default::default(),
233 }
234 }
235
236 pub fn add_warning(&mut self, warning: ParseWarning) {
237 self.warnings.push(warning);
238 }
239
240 pub fn warnings(&self) -> &[ParseWarning] {
241 &self.warnings
242 }
243
244 pub fn current_provider(&self) -> Option<Provider> {
245 self.current_provider.clone()
246 }
247
248 pub fn get_provider(&mut self, id: u32) -> Result<Provider, ParseError> {
249 let name = if let Some(name) = self.providers.get(&id).cloned() {
250 name
251 } else {
252 self.add_warning(ParseWarning::UnknownProviderId(id));
253 "<unknown>".into()
254 };
255
256 Ok(Provider { id, name })
257 }
258
259 pub fn on_metadata_record(
260 &mut self,
261 m: MetadataRecord,
262 ) -> Result<Option<TraceRecord>, ParseError> {
263 Ok(match m {
264 MetadataRecord::TraceInfo(TraceInfoMetadataRecord::MagicNumber) => None,
266
267 MetadataRecord::ProviderInfo(ProviderInfoMetadataRecord { provider_id, name }) => {
268 self.providers.insert(provider_id, name.clone());
269 self.current_provider = Some(Provider { id: provider_id, name: name });
270 None
271 }
272 MetadataRecord::ProviderSection(ProviderSectionMetadataRecord { provider_id }) => {
273 let new_provider = self.get_provider(provider_id)?;
274 self.current_provider = Some(new_provider);
275 None
276 }
277 MetadataRecord::ProviderEvent(ProviderEventMetadataRecord { provider_id, event }) => {
278 Some(TraceRecord::ProviderEvent {
279 provider: self.get_provider(provider_id)?,
280 event,
281 })
282 }
283 MetadataRecord::Unknown { raw_type } => {
284 self.add_warning(ParseWarning::UnknownMetadataRecordType(raw_type));
285 None
286 }
287 })
288 }
289
290 pub fn on_init_record(&mut self, InitRecord { ticks_per_second }: InitRecord) {
291 self.ticks_per_second = ticks_per_second;
292 }
293
294 pub fn on_string_record(&mut self, s: StringRecord<'_>) {
295 if let Some(idx) = NonZeroU16::new(s.index) {
296 self.strings.insert(idx, s.value.into());
297 } else {
298 self.add_warning(ParseWarning::RecordForZeroStringId);
299 }
300 }
301
302 pub fn on_thread_record(&mut self, t: ThreadRecord) {
303 self.threads.insert(t.index, (t.process_koid, t.thread_koid));
304 }
305
306 pub fn resolve_str(&mut self, s: StringRef<'_>) -> FlyStr {
307 match s {
308 StringRef::Empty => FlyStr::default(),
309 StringRef::Inline(inline) => FlyStr::from(inline),
310 StringRef::Index(id) => {
311 if let Some(s) = self.strings.get(&id).cloned() {
312 s
313 } else {
314 self.add_warning(ParseWarning::UnknownStringId(id));
315 "<unknown>".into()
316 }
317 }
318 }
319 }
320
321 pub fn resolve_process(&mut self, p: ProcessRef) -> ProcessKoid {
322 match p {
323 ProcessRef::Index(id) => {
324 if let Some(process) = self.threads.get(&id).map(|(process, _thread)| *process) {
325 process
326 } else {
327 self.add_warning(ParseWarning::UnknownProcessRef(p));
328 ProcessKoid(u64::MAX)
329 }
330 }
331 ProcessRef::Inline(inline) => inline,
332 }
333 }
334
335 pub fn resolve_thread(&mut self, t: ThreadRef) -> ThreadKoid {
336 match t {
337 ThreadRef::Index(id) => {
338 if let Some(thread) = self.threads.get(&id).map(|(_process, thread)| *thread) {
339 thread
340 } else {
341 self.warnings.push(ParseWarning::UnknownThreadRef(t));
342 ThreadKoid(u64::MAX)
343 }
344 }
345 ThreadRef::Inline(inline) => inline,
346 }
347 }
348
349 pub fn resolve_ticks(&self, t: Ticks) -> i64 {
350 t.scale(self.ticks_per_second)
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::event::{EventPayload, EventRecord};
358 use crate::fxt_builder::FxtBuilder;
359 use crate::scheduling::{LegacyContextSwitchEvent, SchedulingRecord, ThreadState};
360 use futures::{StreamExt, TryStreamExt};
361
362 static SIMPLE_TRACE_FXT: &[u8] =
363 include_bytes!("../../../../trace2json/test_data/simple_trace.fxt");
364
365 #[test]
366 fn test_parse_full_session() {
367 let session = parse_full_session(SIMPLE_TRACE_FXT).unwrap();
368 assert_eq!(session, expected_simple_trace_records());
369 }
370
371 #[fuchsia::test]
372 async fn test_async_parse() {
373 let (mut send_chunks, recv_chunks) = futures::channel::mpsc::unbounded();
374
375 let parse_trace_session = fuchsia_async::Task::spawn(async move {
376 let (records, parse_task) = SessionParser::new_async(recv_chunks.into_async_read());
377 let records = records.map(|res| res.unwrap()).collect::<Vec<_>>().await;
378 (records, parse_task.await)
379 });
380
381 for chunk in SIMPLE_TRACE_FXT.chunks(1) {
383 send_chunks.send(Ok(chunk)).await.unwrap();
384 }
385 drop(send_chunks);
386
387 assert_eq!(parse_trace_session.await, expected_simple_trace_records());
388 }
389
390 #[fuchsia::test]
391 fn session_with_unknown_record_in_middle() {
392 let mut session = vec![];
393
394 session.extend(&SIMPLE_TRACE_FXT[..8]);
397
398 let mut header = crate::BaseTraceHeader::empty();
400 header.set_raw_type(14); session.extend(FxtBuilder::new(header).atom(&(0u8..27u8).collect::<Vec<u8>>()).build());
402
403 session.extend(&SIMPLE_TRACE_FXT[8..]);
405
406 let (observed_parsed, observed_warnings) = parse_full_session(&session).unwrap();
407 let (expected_parsed, expected_warnings) =
408 (expected_simple_trace_records().0, vec![ParseWarning::UnknownTraceRecordType(14)]);
409 assert_eq!(observed_parsed, expected_parsed);
410 assert_eq!(observed_warnings, expected_warnings);
411 }
412
413 #[fuchsia::test]
414 fn session_with_invalid_record_in_middle() {
415 let mut session = vec![];
416 session.extend(&SIMPLE_TRACE_FXT[..8]);
419 let invalid_record = vec![
421 103, 0, 2, 15, 128, 1, 0, 0, 229, 253, 9, 0, 0, 0, 0, 0, 98, 105, 110, 100, 101, 114,
422 58, 57, 51, 54, 95, 68, 255, 255, 255, 0, 40, 0, 166, 0, 0, 0, 0, 0, 125, 125, 4, 0, 0,
423 0, 0, 0,
424 ];
425 session.extend(invalid_record);
426 session.extend(&SIMPLE_TRACE_FXT[8..]);
427 let mut parser = SessionParser::new(std::io::Cursor::new(session));
428 let mut records = vec![];
429 let mut had_error_record = false;
430 while let Some(record) = parser.next() {
431 match record {
432 Ok(record) => records.push(record),
433 Err(_) => had_error_record = true,
434 }
435 }
436 assert_eq!(records, expected_simple_trace_records().0);
439 assert_eq!(had_error_record, true);
440 }
441
442 #[fuchsia::test]
443 fn sessioninvalid_recordwith_incomplete_trailing_record() {
444 use crate::string::STRING_REF_INLINE_BIT;
445
446 let mut session = SIMPLE_TRACE_FXT.to_vec();
447
448 let category = "test_category";
450 let name = "test_instant";
451 let mut header = crate::event::EventHeader::empty();
452 header.set_category_ref(category.len() as u16 | STRING_REF_INLINE_BIT);
453 header.set_name_ref(name.len() as u16 | STRING_REF_INLINE_BIT);
454 header.set_event_type(crate::event::INSTANT_EVENT_TYPE);
455
456 let mut final_record = FxtBuilder::new(header)
457 .atom(2048u64.to_le_bytes()) .atom(512u64.to_le_bytes()) .atom(513u64.to_le_bytes()) .atom(category)
461 .atom(name)
462 .build();
463 let byte_to_make_valid = final_record.pop().unwrap();
464
465 for byte in final_record {
466 session.push(byte);
467 assert_eq!(
468 parse_full_session(&session).expect("should parse without final incomplete record"),
469 expected_simple_trace_records(),
470 );
471 }
472
473 let (mut expected_with_final_record, expected_warnings) = expected_simple_trace_records();
474 expected_with_final_record.push(TraceRecord::Event(EventRecord {
475 provider: Some(Provider { id: 1, name: "test_provider".into() }),
476 timestamp: 85333,
477 process: ProcessKoid(512),
478 thread: ThreadKoid(513),
479 category: category.into(),
480 name: name.into(),
481 args: vec![],
482 payload: EventPayload::Instant,
483 }));
484
485 session.push(byte_to_make_valid);
486 assert_eq!(
487 parse_full_session(&session).unwrap(),
488 (expected_with_final_record, expected_warnings)
489 );
490 }
491
492 fn expected_simple_trace_records() -> (Vec<TraceRecord>, Vec<ParseWarning>) {
493 (
494 vec![
495 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
496 LegacyContextSwitchEvent {
497 provider: Some(Provider { id: 1, name: "test_provider".into() }),
498 timestamp: 41,
499 cpu_id: 0,
500 outgoing_thread_state: ThreadState::Suspended,
501 outgoing_process: ProcessKoid(4660),
502 outgoing_thread: ThreadKoid(17185),
503 outgoing_thread_priority: 0,
504 incoming_process: ProcessKoid(1000),
505 incoming_thread: ThreadKoid(1001),
506 incoming_thread_priority: 20,
507 },
508 )),
509 TraceRecord::Event(EventRecord {
510 provider: Some(Provider { id: 1, name: "test_provider".into() }),
511 timestamp: 0,
512 process: ProcessKoid(1000),
513 thread: ThreadKoid(1001),
514 category: "test".into(),
515 name: "begin_end_ref".into(),
516 args: vec![],
517 payload: EventPayload::DurationBegin,
518 }),
519 TraceRecord::Event(EventRecord {
520 provider: Some(Provider { id: 1, name: "test_provider".into() }),
521 timestamp: 110000000,
522 process: ProcessKoid(1000),
523 thread: ThreadKoid(1001),
524 category: "test".into(),
525 name: "complete_inline".into(),
526 args: vec![],
527 payload: EventPayload::DurationComplete { end_timestamp: 150000000 },
528 }),
529 TraceRecord::Event(EventRecord {
530 provider: Some(Provider { id: 1, name: "test_provider".into() }),
531 timestamp: 200000000,
532 process: ProcessKoid(1000),
533 thread: ThreadKoid(1001),
534 category: "test".into(),
535 name: "begin_end_inline".into(),
536 args: vec![],
537 payload: EventPayload::DurationBegin,
538 }),
539 TraceRecord::Event(EventRecord {
540 provider: Some(Provider { id: 1, name: "test_provider".into() }),
541 timestamp: 450000000,
542 process: ProcessKoid(1000),
543 thread: ThreadKoid(1001),
544 category: "test".into(),
545 name: "begin_end_inline".into(),
546 args: vec![],
547 payload: EventPayload::DurationEnd,
548 }),
549 TraceRecord::Event(EventRecord {
550 provider: Some(Provider { id: 1, name: "test_provider".into() }),
551 timestamp: 100000000,
552 process: ProcessKoid(1000),
553 thread: ThreadKoid(1001),
554 category: "test".into(),
555 name: "complete_ref".into(),
556 args: vec![],
557 payload: EventPayload::DurationComplete { end_timestamp: 500000000 },
558 }),
559 TraceRecord::Event(EventRecord {
560 provider: Some(Provider { id: 1, name: "test_provider".into() }),
561 timestamp: 500000208,
562 process: ProcessKoid(1000),
563 thread: ThreadKoid(1001),
564 category: "test".into(),
565 name: "async".into(),
566 args: vec![],
567 payload: EventPayload::AsyncBegin { id: 1 },
568 }),
569 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
570 LegacyContextSwitchEvent {
571 provider: Some(Provider { id: 1, name: "test_provider".into() }),
572 timestamp: 500000416,
573 cpu_id: 0,
574 outgoing_thread_state: ThreadState::Suspended,
575 outgoing_process: ProcessKoid(1000),
576 outgoing_thread: ThreadKoid(1001),
577 outgoing_thread_priority: 20,
578 incoming_process: ProcessKoid(1000),
579 incoming_thread: ThreadKoid(1002),
580 incoming_thread_priority: 20,
581 },
582 )),
583 TraceRecord::Event(EventRecord {
584 provider: Some(Provider { id: 1, name: "test_provider".into() }),
585 timestamp: 500000458,
586 process: ProcessKoid(1000),
587 thread: ThreadKoid(1002),
588 category: "test".into(),
589 name: "complete_ref".into(),
590 args: vec![],
591 payload: EventPayload::DurationComplete { end_timestamp: 600000000 },
592 }),
593 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
594 LegacyContextSwitchEvent {
595 provider: Some(Provider { id: 1, name: "test_provider".into() }),
596 timestamp: 600010666,
597 cpu_id: 0,
598 outgoing_thread_state: ThreadState::Suspended,
599 outgoing_process: ProcessKoid(1000),
600 outgoing_thread: ThreadKoid(1002),
601 outgoing_thread_priority: 20,
602 incoming_process: ProcessKoid(1000),
603 incoming_thread: ThreadKoid(1001),
604 incoming_thread_priority: 20,
605 },
606 )),
607 TraceRecord::Event(EventRecord {
608 provider: Some(Provider { id: 1, name: "test_provider".into() }),
609 timestamp: 600016000,
610 process: ProcessKoid(1000),
611 thread: ThreadKoid(1001),
612 category: "test".into(),
613 name: "async".into(),
614 args: vec![],
615 payload: EventPayload::AsyncEnd { id: 1 },
616 }),
617 TraceRecord::Event(EventRecord {
618 provider: Some(Provider { id: 1, name: "test_provider".into() }),
619 timestamp: 630000000,
620 process: ProcessKoid(1000),
621 thread: ThreadKoid(1001),
622 category: "test".into(),
623 name: "begin_end_ref".into(),
624 args: vec![],
625 payload: EventPayload::DurationBegin,
626 }),
627 TraceRecord::Event(EventRecord {
628 provider: Some(Provider { id: 1, name: "test_provider".into() }),
629 timestamp: 950000000,
630 process: ProcessKoid(1000),
631 thread: ThreadKoid(1001),
632 category: "test".into(),
633 name: "begin_end_ref".into(),
634 args: vec![],
635 payload: EventPayload::DurationEnd,
636 }),
637 TraceRecord::Event(EventRecord {
638 provider: Some(Provider { id: 1, name: "test_provider".into() }),
639 timestamp: 1000000000,
640 process: ProcessKoid(1000),
641 thread: ThreadKoid(1001),
642 category: "test".into(),
643 name: "begin_end_ref".into(),
644 args: vec![],
645 payload: EventPayload::DurationEnd,
646 }),
647 TraceRecord::Scheduling(SchedulingRecord::LegacyContextSwitch(
648 LegacyContextSwitchEvent {
649 provider: Some(Provider { id: 1, name: "test_provider".into() }),
650 timestamp: 1000000666,
651 cpu_id: 0,
652 outgoing_thread_state: ThreadState::Suspended,
653 outgoing_process: ProcessKoid(1000),
654 outgoing_thread: ThreadKoid(1001),
655 outgoing_thread_priority: 20,
656 incoming_process: ProcessKoid(4660),
657 incoming_thread: ThreadKoid(17185),
658 incoming_thread_priority: 0,
659 },
660 )),
661 ],
662 vec![],
664 )
665 }
666}