diff --git a/CHANGELOG.md b/CHANGELOG.md index c02f10ac..5d3b6df1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,24 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Breaking changes + +* **`RawMrtRecord` field renamed**: `raw_bytes` is now `message_bytes` to clarify it contains only the message body + - Added new `header_bytes` field containing the raw header bytes as read from the wire + - The `raw_bytes()` method now returns complete MRT record bytes (header + body) without re-encoding + ### New features +* Add MRT record debugging and raw bytes export capabilities + - `RawMrtRecord` now stores both `header_bytes` and `message_bytes` to enable exact byte-for-byte export without re-encoding + - New methods on `RawMrtRecord`: `raw_bytes()`, `write_raw_bytes()`, `append_raw_bytes()`, `total_bytes_len()` + - Added `Display` implementations for `MrtRecord`, `CommonHeader`, and `MrtMessage` for human-readable debug output + - New examples: `mrt_debug.rs` and `extract_problematic_records.rs` demonstrating debug features +* CLI now supports record-level output and multiple output formats + - New `--level` (`-L`) option: `elems` (default) or `records` to control output granularity + - New `--format` (`-F`) option: `default`, `json`, `json-pretty`, or `psv` + - Record-level output with `--level records` outputs MRT records instead of per-prefix elements + - JSON output for records provides full structured data for debugging * Add `UpdateIterator` and `FallibleUpdateIterator` for iterating over BGP announcements ([#250](https://github.com/bgpkit/bgpkit-parser/issues/250)) - New `MrtUpdate` enum supporting both BGP4MP UPDATE messages and TableDumpV2 RIB entries - `Bgp4MpUpdate` struct for BGP4MP UPDATE messages with metadata (timestamp, peer_ip, peer_asn) diff --git a/examples/README.md b/examples/README.md index 9d404130..d421fe26 100644 --- a/examples/README.md +++ b/examples/README.md @@ -38,5 +38,9 @@ This directory contains runnable examples for bgpkit_parser. They demonstrate ba ## Error Handling and Robustness - [fallible_parsing.rs](fallible_parsing.rs) — Demonstrate fallible record/element iterators that let you handle parse errors explicitly while continuing to process. +## Debugging and Analysis +- [mrt_debug.rs](mrt_debug.rs) — Demonstrate MRT debugging features: debug display for MRT records, raw byte export, and the new `Display` implementation. +- [extract_problematic_records.rs](extract_problematic_records.rs) — Find and export MRT records that fail to parse for further analysis with other tools. + ## Local-only and Misc - [local_only/src/main.rs](local_only/src/main.rs) — Minimal example that reads a local updates.bz2 file; intended for local experimentation (not network fetching). diff --git a/examples/extract_problematic_records.rs b/examples/extract_problematic_records.rs new file mode 100644 index 00000000..37d69935 --- /dev/null +++ b/examples/extract_problematic_records.rs @@ -0,0 +1,116 @@ +//! Example demonstrating how to find and extract problematic MRT records. +//! +//! This example shows how to: +//! 1. Iterate over raw MRT records +//! 2. Attempt to parse each record +//! 3. Export records that fail to parse for debugging +//! +//! This is useful for identifying malformed or unusual MRT records that +//! cause parsing issues, allowing you to analyze them with other tools +//! or report them for investigation. +//! +//! Run with: cargo run --example extract_problematic_records -- [output_file] + +use bgpkit_parser::BgpkitParser; +use std::env; + +fn main() { + let args: Vec = env::args().collect(); + + if args.len() < 2 { + eprintln!("Usage: {} [output_file]", args[0]); + eprintln!(); + eprintln!("Arguments:"); + eprintln!(" mrt_file - Path or URL to the MRT file to analyze"); + eprintln!(" output_file - Optional path to export problematic records (default: problematic_records.mrt)"); + eprintln!(); + eprintln!("Example:"); + eprintln!( + " {} https://data.ris.ripe.net/rrc00/latest-update.gz", + args[0] + ); + std::process::exit(1); + } + + let input_file = &args[1]; + let output_file = args + .get(2) + .map(|s| s.as_str()) + .unwrap_or("problematic_records.mrt"); + + println!("Analyzing MRT file: {}", input_file); + println!("Problematic records will be saved to: {}", output_file); + println!(); + + let parser = match BgpkitParser::new(input_file) { + Ok(p) => p, + Err(e) => { + eprintln!("Failed to open MRT file: {}", e); + std::process::exit(1); + } + }; + + let mut total_records = 0; + let mut parsed_ok = 0; + let mut parse_errors = 0; + let mut export_errors = 0; + + for raw_record in parser.into_raw_record_iter() { + total_records += 1; + + // Try to parse the record + match raw_record.clone().parse() { + Ok(_parsed) => { + parsed_ok += 1; + } + Err(e) => { + parse_errors += 1; + println!( + "Record #{}: Parse error at timestamp {}", + total_records, raw_record.common_header.timestamp + ); + println!(" Error: {}", e); + println!(" Header: {}", raw_record.common_header); + println!(" Size: {} bytes", raw_record.total_bytes_len()); + + // Export the problematic record + if let Err(write_err) = raw_record.append_raw_bytes(output_file) { + eprintln!(" Failed to export record: {}", write_err); + export_errors += 1; + } else { + println!(" -> Exported to {}", output_file); + } + println!(); + } + } + + // Progress indicator every 100,000 records + if total_records % 100_000 == 0 { + eprintln!( + "Progress: {} records processed ({} errors so far)", + total_records, parse_errors + ); + } + } + + println!("=== Summary ==="); + println!("Total records processed: {}", total_records); + println!("Successfully parsed: {}", parsed_ok); + println!("Parse errors: {}", parse_errors); + if export_errors > 0 { + println!("Export errors: {}", export_errors); + } + + if parse_errors > 0 { + println!(); + println!("Problematic records exported to: {}", output_file); + println!(); + println!("You can analyze the exported records with:"); + println!(" - bgpdump -m {}", output_file); + println!(" - This parser with verbose debugging"); + println!(" - Hex editor for raw byte analysis"); + } else { + println!(); + println!("No problematic records found!"); + } +} diff --git a/examples/mrt_debug.rs b/examples/mrt_debug.rs new file mode 100644 index 00000000..45d77e80 --- /dev/null +++ b/examples/mrt_debug.rs @@ -0,0 +1,114 @@ +//! Example demonstrating MRT debug features. +//! +//! This example shows how to: +//! 1. Display MRT records and BGP elements in JSON format for debugging +//! 2. Display MRT records in a debug-friendly format +//! 3. Export raw MRT record bytes to files for debugging +//! +//! Run with: cargo run --example mrt_debug --features serde + +use bgpkit_parser::BgpkitParser; + +fn main() { + let url = "https://spaces.bgpkit.org/parser/update-example.gz"; + println!("Parsing: {}\n", url); + + println!("=== MRT Record JSON Format Examples ===\n"); + + // Show first 3 MRT records in JSON format + for (idx, record) in BgpkitParser::new(url) + .unwrap() + .into_record_iter() + .take(3) + .enumerate() + { + println!("[Record {}]", idx + 1); + #[cfg(feature = "serde")] + println!("{}", serde_json::to_string_pretty(&record).unwrap()); + #[cfg(not(feature = "serde"))] + println!("{:?}", record); + println!(); + } + + println!("=== BGP Element JSON Format Examples ===\n"); + + // Show first 5 elements in JSON format + for (idx, elem) in BgpkitParser::new(url) + .unwrap() + .into_iter() + .take(5) + .enumerate() + { + println!("[Element {}]", idx + 1); + #[cfg(feature = "serde")] + println!("{}", serde_json::to_string(&elem).unwrap()); + #[cfg(not(feature = "serde"))] + println!("{:?}", elem); + } + + println!("\n=== MRT Record Debug Display ===\n"); + + // Show first 5 MRT records with debug display + let parser = BgpkitParser::new(url).unwrap(); + for (idx, record) in parser.into_record_iter().take(5).enumerate() { + println!("[{}] {}", idx + 1, record); + } + + println!("\n=== Raw MRT Record Iteration ===\n"); + + // Demonstrate raw record iteration and byte export + let parser = BgpkitParser::new(url).unwrap(); + for (idx, raw_record) in parser.into_raw_record_iter().take(3).enumerate() { + println!("[{}] Raw Record:", idx + 1); + println!(" Header: {}", raw_record.common_header); + println!(" Total bytes: {} bytes", raw_record.total_bytes_len()); + println!(" Header size: {} bytes", raw_record.header_bytes.len()); + println!( + " Message body size: {} bytes", + raw_record.message_bytes.len() + ); + + // Demonstrate parsing the raw record + match raw_record.clone().parse() { + Ok(parsed) => { + println!(" Parsed: {}", parsed); + } + Err(e) => { + println!(" Parse error: {}", e); + // In case of error, you could export the problematic record: + // raw_record.write_raw_bytes(format!("problematic_{}.mrt", idx)).unwrap(); + } + } + println!(); + } + + println!("=== Exporting Raw Bytes Example ===\n"); + + // Export a few records to demonstrate the functionality + let parser = BgpkitParser::new(url).unwrap(); + let output_file = "/tmp/debug_records.mrt"; + + let mut count = 0; + for raw_record in parser.into_raw_record_iter().take(10) { + // Append each record to the same file + if let Err(e) = raw_record.append_raw_bytes(output_file) { + eprintln!("Failed to write record: {}", e); + } + count += 1; + } + + println!("Exported {} records to {}", count, output_file); + + // Verify by reading back + let verify_parser = BgpkitParser::new(output_file).unwrap(); + let verify_count = verify_parser.into_record_iter().count(); + println!( + "Verification: read back {} records from exported file", + verify_count + ); + + // Clean up + let _ = std::fs::remove_file(output_file); + + println!("\nDone!"); +} diff --git a/src/bin/main.rs b/src/bin/main.rs index c2db69b4..193b5f4d 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -5,9 +5,33 @@ use std::net::IpAddr; use std::path::PathBuf; use bgpkit_parser::{BgpElem, BgpkitParser, Elementor}; -use clap::Parser; +use clap::{Parser, ValueEnum}; use ipnet::IpNet; +/// Output format for the parser +#[derive(Debug, Clone, Copy, Default, ValueEnum)] +enum OutputFormat { + /// Default pipe-separated format + #[default] + Default, + /// JSON format (one object per line) + Json, + /// Pretty-printed JSON format + JsonPretty, + /// PSV format with header + Psv, +} + +/// Output level granularity +#[derive(Debug, Clone, Copy, Default, ValueEnum)] +enum OutputLevel { + /// Output BGP elements (per-prefix) + #[default] + Elems, + /// Output MRT records + Records, +} + /// bgpkit-parser-cli is a simple cli tool that allow parsing of individual MRT files. #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -20,17 +44,25 @@ struct Opts { #[clap(short, long)] cache_dir: Option, - /// Output as JSON objects + /// Output format + #[clap(short = 'F', long, value_enum, default_value_t = OutputFormat::Default)] + format: OutputFormat, + + /// Output level: elems (per-prefix) or records (MRT records) + #[clap(short = 'L', long, value_enum, default_value_t = OutputLevel::Elems)] + level: OutputLevel, + + /// Output as JSON objects (shorthand for --format json) #[clap(long)] json: bool, - /// Output as full PSV entries with header + /// Pretty-print JSON output (shorthand for --format json-pretty) #[clap(long)] - psv: bool, + pretty: bool, - /// Pretty-print JSON output + /// Output as full PSV entries with header (shorthand for --format psv) #[clap(long)] - pretty: bool, + psv: bool, /// Count BGP elems #[clap(short, long)] @@ -178,6 +210,17 @@ fn main() { } } + // Determine final output format (shorthand flags override --format) + let output_format = if opts.pretty { + OutputFormat::JsonPretty + } else if opts.json { + OutputFormat::Json + } else if opts.psv { + OutputFormat::Psv + } else { + opts.format + }; + match (opts.elems_count, opts.records_count) { (true, true) => { let mut elementor = Elementor::new(); @@ -193,34 +236,73 @@ fn main() { println!("total records: {}", parser.into_record_iter().count()); } (true, false) => { - println!("total records: {}", parser.into_elem_iter().count()); + println!("total elems: {}", parser.into_elem_iter().count()); } (false, false) => { let mut stdout = std::io::stdout(); - for (index, elem) in parser.into_elem_iter().enumerate() { - let output_str = if opts.json { - let val = json!(elem); - if opts.pretty { - serde_json::to_string_pretty(&val).unwrap() - } else { - val.to_string() - } - } else if opts.psv { - if index == 0 { - format!("{}\n{}", BgpElem::get_psv_header(), elem.to_psv()) - } else { - elem.to_psv() + + match opts.level { + OutputLevel::Elems => { + for (index, elem) in parser.into_elem_iter().enumerate() { + let output_str = format_elem(&elem, output_format, index); + if let Err(e) = writeln!(stdout, "{}", &output_str) { + if e.kind() != std::io::ErrorKind::BrokenPipe { + eprintln!("{e}"); + } + std::process::exit(1); + } } - } else { - elem.to_string() - }; - if let Err(e) = writeln!(stdout, "{}", &output_str) { - if e.kind() != std::io::ErrorKind::BrokenPipe { - eprintln!("{e}"); + } + OutputLevel::Records => { + for record in parser.into_record_iter() { + let output_str = format_record(&record, output_format); + if let Err(e) = writeln!(stdout, "{}", &output_str) { + if e.kind() != std::io::ErrorKind::BrokenPipe { + eprintln!("{e}"); + } + std::process::exit(1); + } } - std::process::exit(1); } } } } } + +fn format_elem(elem: &BgpElem, format: OutputFormat, index: usize) -> String { + match format { + OutputFormat::Json => { + let val = json!(elem); + val.to_string() + } + OutputFormat::JsonPretty => { + let val = json!(elem); + serde_json::to_string_pretty(&val).unwrap() + } + OutputFormat::Psv => { + if index == 0 { + format!("{}\n{}", BgpElem::get_psv_header(), elem.to_psv()) + } else { + elem.to_psv() + } + } + OutputFormat::Default => elem.to_string(), + } +} + +fn format_record(record: &bgpkit_parser::MrtRecord, format: OutputFormat) -> String { + match format { + OutputFormat::Json => { + let val = json!(record); + val.to_string() + } + OutputFormat::JsonPretty => { + let val = json!(record); + serde_json::to_string_pretty(&val).unwrap() + } + OutputFormat::Psv | OutputFormat::Default => { + // Use the Display implementation for MrtRecord + format!("{}", record) + } + } +} diff --git a/src/models/mrt/mod.rs b/src/models/mrt/mod.rs index cd08df58..2be15efc 100644 --- a/src/models/mrt/mod.rs +++ b/src/models/mrt/mod.rs @@ -6,6 +6,7 @@ pub mod table_dump_v2; pub use bgp4mp::*; use num_enum::{IntoPrimitive, TryFromPrimitive}; +use std::fmt::{Display, Formatter}; pub use table_dump::*; pub use table_dump_v2::*; @@ -92,6 +93,91 @@ impl PartialEq for CommonHeader { } } +impl Display for CommonHeader { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let ts = match self.microsecond_timestamp { + Some(us) => format!("{}.{:06}", self.timestamp, us), + None => self.timestamp.to_string(), + }; + write!( + f, + "MRT|{}|{:?}|{}|{}", + ts, self.entry_type, self.entry_subtype, self.length + ) + } +} + +impl Display for MrtRecord { + /// Formats the MRT record in a debug-friendly format. + /// + /// The format is: `MRT||||` + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let ts = match self.common_header.microsecond_timestamp { + Some(us) => format!("{}.{:06}", self.common_header.timestamp, us), + None => self.common_header.timestamp.to_string(), + }; + write!( + f, + "MRT|{}|{:?}|{}|{}", + ts, self.common_header.entry_type, self.common_header.entry_subtype, self.message + ) + } +} + +impl Display for MrtMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MrtMessage::TableDumpMessage(msg) => { + write!(f, "TABLE_DUMP|{}|{}", msg.prefix, msg.peer_ip) + } + MrtMessage::TableDumpV2Message(msg) => match msg { + TableDumpV2Message::PeerIndexTable(pit) => { + write!(f, "PEER_INDEX_TABLE|{}", pit.id_peer_map.len()) + } + TableDumpV2Message::RibAfi(rib) => { + write!( + f, + "RIB|{:?}|{}|{} entries", + rib.rib_type, + rib.prefix, + rib.rib_entries.len() + ) + } + TableDumpV2Message::RibGeneric(rib) => { + write!( + f, + "RIB_GENERIC|AFI {:?}|SAFI {:?}|{} entries", + rib.afi, + rib.safi, + rib.rib_entries.len() + ) + } + TableDumpV2Message::GeoPeerTable(gpt) => { + write!(f, "GEO_PEER_TABLE|{} peers", gpt.geo_peers.len()) + } + }, + MrtMessage::Bgp4Mp(bgp4mp) => match bgp4mp { + Bgp4MpEnum::StateChange(sc) => { + write!( + f, + "STATE_CHANGE|{}|{}|{:?}->{:?}", + sc.peer_ip, sc.peer_asn, sc.old_state, sc.new_state + ) + } + Bgp4MpEnum::Message(msg) => { + let msg_type = match &msg.bgp_message { + crate::models::BgpMessage::Open(_) => "OPEN", + crate::models::BgpMessage::Update(_) => "UPDATE", + crate::models::BgpMessage::Notification(_) => "NOTIFICATION", + crate::models::BgpMessage::KeepAlive => "KEEPALIVE", + }; + write!(f, "BGP4MP|{}|{}|{}", msg.peer_ip, msg.peer_asn, msg_type) + } + }, + } + } +} + #[derive(Debug, PartialEq, Clone, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum MrtMessage { @@ -154,6 +240,59 @@ pub enum EntryType { #[cfg(test)] mod tests { + #[test] + fn test_mrt_record_display() { + use super::*; + use crate::models::Asn; + use std::net::IpAddr; + use std::str::FromStr; + + let mrt_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1609459200, + microsecond_timestamp: None, + entry_type: EntryType::BGP4MP, + entry_subtype: 0, + length: 0, + }, + message: MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(Bgp4MpStateChange { + msg_type: Bgp4MpType::StateChange, + peer_asn: Asn::new_32bit(65000), + local_asn: Asn::new_32bit(65001), + interface_index: 1, + peer_ip: IpAddr::from_str("10.0.0.1").unwrap(), + local_addr: IpAddr::from_str("10.0.0.2").unwrap(), + old_state: BgpState::Idle, + new_state: BgpState::Connect, + })), + }; + + let display = format!("{}", mrt_record); + assert!(display.contains("1609459200")); + assert!(display.contains("BGP4MP")); + assert!(display.contains("STATE_CHANGE")); + assert!(display.contains("10.0.0.1")); + assert!(display.contains("65000")); + } + + #[test] + fn test_common_header_display() { + use super::*; + + let header = CommonHeader { + timestamp: 1609459200, + microsecond_timestamp: Some(500000), + entry_type: EntryType::BGP4MP_ET, + entry_subtype: 4, + length: 128, + }; + + let display = format!("{}", header); + assert!(display.contains("1609459200.500000")); + assert!(display.contains("BGP4MP_ET")); + assert!(display.contains("128")); + } + #[test] #[cfg(feature = "serde")] fn test_entry_type_serialize_and_deserialize() { diff --git a/src/parser/mrt/mrt_header.rs b/src/parser/mrt/mrt_header.rs index 58d240b6..d6277d91 100644 --- a/src/parser/mrt/mrt_header.rs +++ b/src/parser/mrt/mrt_header.rs @@ -3,6 +3,12 @@ use crate::ParserError; use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::io::Read; +/// Result of parsing a common header, including the raw bytes. +pub struct ParsedHeader { + pub header: CommonHeader, + pub raw_bytes: Bytes, +} + /// MRT common header [RFC6396][header]. /// /// [header]: https://tools.ietf.org/html/rfc6396#section-4.1 @@ -40,9 +46,17 @@ use std::io::Read; /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// ``` pub fn parse_common_header(input: &mut T) -> Result { - let mut raw_bytes = [0u8; 12]; - input.read_exact(&mut raw_bytes)?; - let mut data = &raw_bytes[..]; + Ok(parse_common_header_with_bytes(input)?.header) +} + +/// Parse the MRT common header and return both the parsed header and raw bytes. +/// +/// This is useful when you need to preserve the original bytes for debugging +/// or exporting problematic records without re-encoding. +pub fn parse_common_header_with_bytes(input: &mut T) -> Result { + let mut base_bytes = [0u8; 12]; + input.read_exact(&mut base_bytes)?; + let mut data = &base_bytes[..]; let timestamp = data.get_u32(); let entry_type_raw = data.get_u16(); @@ -51,7 +65,7 @@ pub fn parse_common_header(input: &mut T) -> Result { // For ET records, the on-wire length includes the extra 4-byte microsecond timestamp // that lives in the header. Internally we store `length` as the message length only, @@ -62,19 +76,28 @@ pub fn parse_common_header(input: &mut T) -> Result None, + _ => (None, Bytes::copy_from_slice(&base_bytes)), }; - Ok(CommonHeader { - timestamp, - microsecond_timestamp, - entry_type, - entry_subtype, - length, + Ok(ParsedHeader { + header: CommonHeader { + timestamp, + microsecond_timestamp, + entry_type, + entry_subtype, + length, + }, + raw_bytes, }) } @@ -104,6 +127,46 @@ mod tests { use crate::models::EntryType; use bytes::Buf; + #[test] + fn test_parse_common_header_with_bytes() { + let input = Bytes::from_static(&[ + 0, 0, 0, 1, // timestamp + 0, 16, // entry type + 0, 4, // entry subtype + 0, 0, 0, 5, // length + ]); + + let mut reader = input.clone().reader(); + let result = parse_common_header_with_bytes(&mut reader).unwrap(); + + assert_eq!(result.header.timestamp, 1); + assert_eq!(result.header.entry_type, EntryType::BGP4MP); + assert_eq!(result.header.entry_subtype, 4); + assert_eq!(result.header.length, 5); + assert_eq!(result.raw_bytes, input); + } + + #[test] + fn test_parse_common_header_with_bytes_et() { + let input = Bytes::from_static(&[ + 0, 0, 0, 1, // timestamp + 0, 17, // entry type = BGP4MP_ET + 0, 4, // entry subtype + 0, 0, 0, 9, // length (includes 4 bytes for microsecond) + 0, 3, 130, 112, // microsecond timestamp + ]); + + let mut reader = input.clone().reader(); + let result = parse_common_header_with_bytes(&mut reader).unwrap(); + + assert_eq!(result.header.timestamp, 1); + assert_eq!(result.header.entry_type, EntryType::BGP4MP_ET); + assert_eq!(result.header.entry_subtype, 4); + assert_eq!(result.header.length, 5); // adjusted length + assert_eq!(result.header.microsecond_timestamp, Some(230_000)); + assert_eq!(result.raw_bytes, input); + } + /// Test that the length is not adjusted when the microsecond timestamp is not present. #[test] fn test_encode_common_header() { diff --git a/src/parser/mrt/mrt_record.rs b/src/parser/mrt/mrt_record.rs index a3b1a62b..45153d35 100644 --- a/src/parser/mrt/mrt_record.rs +++ b/src/parser/mrt/mrt_record.rs @@ -1,4 +1,4 @@ -use super::mrt_header::parse_common_header; +use super::mrt_header::parse_common_header_with_bytes; use crate::bmp::messages::{BmpMessage, BmpMessageBody}; use crate::error::ParserError; use crate::models::*; @@ -9,16 +9,22 @@ use crate::utils::convert_timestamp; use bytes::{BufMut, Bytes, BytesMut}; use log::warn; use std::convert::TryFrom; -use std::io::Read; +use std::fs::File; +use std::io::{Read, Write}; use std::net::IpAddr; +use std::path::Path; use std::str::FromStr; /// Raw MRT record containing the common header and unparsed message bytes. -/// This allows for lazy parsing of the MRT message body. +/// This allows for lazy parsing of the MRT message body, and provides +/// utilities for debugging and exporting problematic records. #[derive(Debug, Clone)] pub struct RawMrtRecord { pub common_header: CommonHeader, - pub raw_bytes: Bytes, + /// The raw bytes of the MRT common header (as read from the wire). + pub header_bytes: Bytes, + /// The raw bytes of the MRT message body (excluding the common header). + pub message_bytes: Bytes, } impl RawMrtRecord { @@ -28,7 +34,7 @@ impl RawMrtRecord { let message = parse_mrt_body( self.common_header.entry_type as u16, self.common_header.entry_subtype, - self.raw_bytes, + self.message_bytes, )?; Ok(MrtRecord { @@ -36,11 +42,80 @@ impl RawMrtRecord { message, }) } + + /// Returns the complete MRT record as raw bytes (header + message body). + /// + /// This returns the exact bytes as they were read from the wire, + /// without any re-encoding. This is useful for debugging problematic + /// MRT records by exporting them as-is to a file for further analysis. + /// + /// # Example + /// ```ignore + /// let raw_record = parser.into_raw_record_iter().next().unwrap(); + /// let bytes = raw_record.raw_bytes(); + /// std::fs::write("record.mrt", &bytes).unwrap(); + /// ``` + pub fn raw_bytes(&self) -> Bytes { + let mut bytes = BytesMut::with_capacity(self.header_bytes.len() + self.message_bytes.len()); + bytes.put_slice(&self.header_bytes); + bytes.put_slice(&self.message_bytes); + bytes.freeze() + } + + /// Writes the raw MRT record (header + message body) to a file. + /// + /// This is useful for extracting problematic MRT records for debugging + /// or further analysis with other tools. + /// + /// # Arguments + /// * `path` - The path to write the raw bytes to. + /// + /// # Example + /// ```ignore + /// let raw_record = parser.into_raw_record_iter().next().unwrap(); + /// raw_record.write_raw_bytes("problematic_record.mrt").unwrap(); + /// ``` + pub fn write_raw_bytes>(&self, path: P) -> std::io::Result<()> { + let mut file = File::create(path)?; + file.write_all(&self.header_bytes)?; + file.write_all(&self.message_bytes)?; + Ok(()) + } + + /// Appends the raw MRT record (header + message body) to a file. + /// + /// This is useful for collecting multiple problematic records into a single file. + /// + /// # Arguments + /// * `path` - The path to append the raw bytes to. + /// + /// # Example + /// ```ignore + /// for raw_record in parser.into_raw_record_iter() { + /// if is_problematic(&raw_record) { + /// raw_record.append_raw_bytes("problematic_records.mrt").unwrap(); + /// } + /// } + /// ``` + pub fn append_raw_bytes>(&self, path: P) -> std::io::Result<()> { + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + file.write_all(&self.header_bytes)?; + file.write_all(&self.message_bytes)?; + Ok(()) + } + + /// Returns the total length of the complete MRT record in bytes (header + body). + pub fn total_bytes_len(&self) -> usize { + self.header_bytes.len() + self.message_bytes.len() + } } pub fn chunk_mrt_record(input: &mut impl Read) -> Result { - // parse common header - let common_header = match parse_common_header(input) { + // parse common header and capture raw bytes + let parsed_header = match parse_common_header_with_bytes(input) { Ok(v) => v, Err(e) => { if let ParserError::EofError(e) = &e { @@ -55,6 +130,9 @@ pub fn chunk_mrt_record(input: &mut impl Read) -> Result MAX_MRT_MESSAGE_LEN { @@ -80,7 +158,8 @@ pub fn chunk_mrt_record(input: &mut impl Read) -> Result